探讨erlang消息选择性接收和改进
发布时间:2020-12-13 20:16:13 所属栏目:PHP教程 来源:网络整理
导读:从 rabbitMQ 代码中找到 gen_server2 , 对gen_server进行了1些优化。 看到先辈写的博文也提到这个 ,引发了我的思考。见 gen_server2 - OTP gen_server优化版 。 gen_server2 引发的思考 正如 litaocheng 所说的: gen_server 和 gen_server2 最大的不同是
从 rabbitMQ 代码中找到 gen_server2 , 对gen_server进行了1些优化。看到先辈写的博文也提到这个,引发了我的思考。见 gen_server2 - OTP gen_server优化版 。 gen_server2 引发的思考正如 litaocheng 所说的: gen_server 和 gen_server2 最大的不同是: gen_server2 收到任何1条消息放到外部的队列中,当VM内部消息队列为空后,才进行消息处理,继续循环 gen_server 收到任何1条消息后,立即进行处理,处理完成后继续循环 其次,还有1个很重要的不同点: gen_server2 使用的外部队列是带优先级排序的,功能模块本身可以定制消息优先级,乃至直接抛弃消息。(导出 prioritise_call/prioritise_cast/prioritise_info 几个函数实现定制,返回的数值越大优先级越高,返回drop就抛弃消息) 最高优先级是 infinity, 在处理 {system,_From,_Req} 和 {'EXIT',Parent,_R} 使用了这个优先级。 但在博文也援用了Joe' Bog 在merle 所做的测试,看了merle 的代码,没有用到 prioritise_XXX 函数,说明没有明显利用到 gen_server2 优先级控制的好处,那为何能取得不错的效果?(见下图) 讨论 gen_server2 的测试通过浏览 Joe 在Github写的 merle 代码,很快发现问题: 可以看出,merle 在 handle_call 时都会调用 send_generic_cmd 、send_get_cmd 等类似函数。这些函数实现上都会阻塞进程直到接收到某些特定消息。 下面以 send_generic_cmd 为例做说明:
另外,gen_tcp:send 在实现上也 receive 等待某个特定信息,见 prim_inet:send(Socket,Packet,Opts)
也许可能有读者不明白,这里说的等待某个特定消息是指选择性接收。具体例子以下:
前面提到,merle 在 handle_call 时都会 receive 住,等待某个特定消息。这个的代价就是每次receive住,erlang VM都要扫描进程全部信箱队列。特别像 Joe 在做此类测试时,消息处理速度远远低于消息投递速度,换句话说,gen_server进程信箱前面所有大部份的信件都是作者自己发的 gen_server:call 要求消息,然后每次 receive 住都要匹配这些消息。 比如, Joe 测试的是 merle:getkey 操作,那末信箱大部份消息就是 gen_server:call 投递的 getkey 消息,而 handle_call 在处理时就要扫描完前面的getkey消息,才能得到想要的 {tcp,Data} 消息。进程信箱消息队列以下所示:
换成 gen_server2 的方式,gen_server2 会清空消息队列,那末进程信箱消息队列以下所示:
前面还有2个getkey表示 gen_server2清空后在 handle_call 处理进程中 gen_server:call 又投递了新的 getkey 消息,数据量对照 gen_server来讲可以说是极少了,所以,消息匹配的次数就少了很多,这就会出现 Joe 测试的结果。 讨论erlang消息选择性接收在讨论这个问题之前,先援用 learnyousomeerlang 对消息选择性接收的介绍(原文),很生动具体。
所以,对选择性接收,这里取3个问题出来说: 1、上面提到的,消息 Save Queue 是不是存在入列出列开消 2、当选择性接收时,新消息到来时会不会重复扫描信箱前面匹配不上的消息 3、假定第2点不存在重复扫描,那末如果消息已匹配到了,再匹配多1次这个消息,会不会重复扫描前面的消息 带着上面的疑问,下面以1个简单的例子做说明
保存为test.erl,然后编译,生成opcode
在目录下找到生成的 test.dis,t() 函数opcode以下:
逐行解释这段代码:
以上, i_is_eq_exact_immed_frc 和 move_return_cr 在 beam_hot.h实现,其他在 beam_emu.c 实现,都可以找相干代码。 /*
* beam_emu.c process_main() 线程入口函数,实现VM调度
* 以下截取 i_loop_rec_fr 处理进程
* 作用是从信箱取出1条消息放到 x(0) 寄存器;没消息则跳到 wait或 wait_timeout指令
*/
OpCase(i_loop_rec_fr):
{
BeamInstr *next;
ErlMessage* msgp;
loop_rec__:
PROCESS_MAIN_CHK_LOCKS(c_p);
// 取出“当前位置”的消息
msgp = PEEK_MESSAGE(c_p);
if (!msgp) { //如果消息不存在,尝试从SMP下public queue获得消息
#ifdef ERTS_SMP
erts_smp_proc_lock(c_p,ERTS_PROC_LOCKS_MSG_RECEIVE);
if (ERTS_PROC_PENDING_EXIT(c_p)) {
// 如果进程准备退出,则不处理消息了
erts_smp_proc_unlock(c_p,ERTS_PROC_LOCKS_MSG_RECEIVE);
SWAPOUT;
goto do_schedule; // 等待下1次调度
}
// SMP下把消息移到进程私有堆尾部(纯指针操作)
ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p);
// 再尝试取出“当前位置”的消息
msgp = PEEK_MESSAGE(c_p);
if (msgp)
erts_smp_proc_unlock(c_p,ERTS_PROC_LOCKS_MSG_RECEIVE);
else
#endif
{
// 信箱没消息则跳到 wait或 wait_timeout指令(实际上就是履行下1条履行)
SET_I((BeamInstr *) Arg(0));
Goto(*I);
}
}
// 解析散布式消息,把消息附加的数据复制到进程私有堆
ErtsMoveMsgAttachmentIntoProc(msgp,c_p,E,HTOP,FCALLS,{
SWAPOUT;
reg[0] = r(0);
PROCESS_MAIN_CHK_LOCKS(c_p);
},{
ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
PROCESS_MAIN_CHK_LOCKS(c_p);
r(0) = reg[0];
SWAPIN;
});
if (is_non_value(ERL_MESSAGE_TERM(msgp))) {
/*
* 如果消息破坏就移除(出现这类情况是散布式消息解码出现毛病)
*/
ASSERT(!msgp->data.attached);
UNLINK_MESSAGE(c_p,msgp); // 移除消息,侧重将“当前”位置指向下1条消息
free_message(msgp); // 烧毁消息
goto loop_rec__; // 跳到上面继续
}
PreFetch(1,next); // 标记下1条指令位置
r(0) = ERL_MESSAGE_TERM(msgp);
NextPF(1,next); // 履行下1条指令
} 来看下这两个宏定义:/* Get "current" message */
#define PEEK_MESSAGE(p) (*(p)->msg.save) 从字面上就知道这个宏是取"当前的"消息,取了 msg.save 的值#define UNLINK_MESSAGE(p,msgp) do {
ErlMessage* __mp = (msgp)->next;
*(p)->msg.save = __mp;
(p)->msg.len--;
if (__mp == NULL)
(p)->msg.last = (p)->msg.save;
(p)->msg.mark = 0;
} while(0) 这个宏就是移除消息操作,消息队列长度⑴,把 msg.save 指向了 msgp的下1条消息;如果 msgp->next 为 NULL,表示这是最后1条消息,就把 msg.last 等于了 msg.save/*
* beam_emu.c process_main() 线程入口函数,实现VM调度
* 以下截取 remove_message 处理进程(已删除没必要要的代码)
* 作用是将消息从信箱队列中移除
*/
OpCase(remove_message): {
BeamInstr *next;
ErlMessage* msgp;
PROCESS_MAIN_CHK_LOCKS(c_p);
PreFetch(0,next);
msgp = PEEK_MESSAGE(c_p); // 取出当前的消息
if (ERTS_PROC_GET_SAVED_CALLS_BUF(c_p)) {
save_calls(c_p,&exp_receive);
}
if (ERL_MESSAGE_TOKEN(msgp) == NIL) {
SEQ_TRACE_TOKEN(c_p) = NIL;
} else if (ERL_MESSAGE_TOKEN(msgp) != am_undefined) {
// 追踪调试内容,可以疏忽
Eterm msg;
SEQ_TRACE_TOKEN(c_p) = ERL_MESSAGE_TOKEN(msgp);
c_p->seq_trace_lastcnt = unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p));
if (c_p->seq_trace_clock < unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p))) {
c_p->seq_trace_clock = unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p));
}
msg = ERL_MESSAGE_TERM(msgp);
seq_trace_output(SEQ_TRACE_TOKEN(c_p),msg,SEQ_TRACE_RECEIVE,c_p->common.id,c_p);
}
UNLINK_MESSAGE(c_p,msgp); // 移除消息,侧重队列长度⑴
JOIN_MESSAGE(c_p); // 重置“当前”位置,指向了队列第1条消息
CANCEL_TIMER(c_p);
free_message(msgp); // 烧毁消息
ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
PROCESS_MAIN_CHK_LOCKS(c_p);
NextPF(0,next); // 履行下1条指令
} 所以,当消息匹配时,就会重新指向了信箱第1条消息,这样,第3个问题就有了答案,会重新扫描信箱。再来看看这个宏: /* Reset message save point (after receive match) */
#define JOIN_MESSAGE(p)
(p)->msg.save = &(p)->msg.first 这个宏就是讲 msg.save 指向了 msg.first ,就是第1个消息下面看下消息不匹配的情况就是 loop_rec_end_f /*
* beam_emu.c process_main() 线程入口函数,实现VM调度
* 以下截取 loop_rec_end_f 处理进程
* 作用是继续取出最新的消息匹配
*/
/*
* Advance the save pointer to the next message (the current
* message didn't match),then jump to the loop_rec instruction.
*/
OpCase(loop_rec_end_f): {
SET_I((BeamInstr *) Arg(0));
SAVE_MESSAGE(c_p); // “当前”位置指向下1个位置
goto loop_rec__; // 继续取出消息匹配
} 这个opcode实现了不断取消息出来匹配的进程,直到失去调度机会,等待下1次调度。也看下这个宏: /* Save current message */
#define SAVE_MESSAGE(p)
(p)->msg.save = &(*(p)->msg.save)->next
这个宏就是将 msg.save 指向了下1个位置。到这里第1个问题和第2个问题都有答案了,前面说到的 Save Queue 只是“形象化”的队列,实际不存在,所以不存在消息入列出列的开消问题。然后第2个问题,消息选择性接收,当消息匹配不上,有新消息到来时不会重复扫描信箱前面匹配不上的消息。 总结针对erlang选择性接收的问题,gen_server2给我们1个方向,通过外部队列减少了消息的匹配,而且控制优先级来控制消息的处理。 这里也说说 gen_server2 的副作用: gen_server2会带来1种问题,erlang原来会利用进程信箱长度来抑制发送者进程(通过减少消息发送者进程的调度机会 Reduction,可以参考这篇文章《erlang send剖析及参数意义》)。但是,gen_server2 每次都会清空进程信箱的消息队列,没法利用到 VM 提供的抑制消息队列过快暴涨的保护机制。 针对这个问题,gen_server2 通过 prioritise_XXX 函数向外部模块暴露消息队列长度,使调用者可以根据消息队列长度控制是不是抛弃消息,以实现对消息的抑制。 参考:http://blog.csdn.net/mycwq/article/details/44049749 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |