ACE_Select_Reactor的Notify功能实现浅析
ACE_Select_Reactor的Notify功能实现浅析作者 : ydogg 如需转载 , 请注明
1.
前言
本文关注平台为Win32,但在其它的类Unix
平台,就实现
框架而言并没有太多变
化
,惟一不同的是用于底层
机制的ACE_Pipe
的实现
。
但是,为了解决某些特别的原因,如大量的通知存储、规避悬空处理器指针等, ACE 也提供了一种有别于 Pipe 的解决方案,其采用消息排队的方式工作。当采取这种方式时,需定义 ACE_HAS_REACTOR_NOTIFICATION_QUEUE 宏并重生成 ACE 。
2.
Notify的能力、风险和风险规避
Notify
机制最重要的能力是: Reactor的notify()让用户直接提供给Reactor待通知反应器的指针,而这些处理器无需注册到反应器上,从而提供了无限的扩展能力。但是,在使用ACE_Pipe的实现中,如果使用不当,可能会造成严重的后果。
潜在的风险:
不过,在采用队列的Notify实现中,ACE
给提供了问题的解决方法。 在采用ACE_Pipe方式中,puerge_pending_notifications()被实现为空方法。
3.
ACE_Select_Reactor类图
可以看出,ACE_Select_Reactor_T
是具体的实现负责
者。
4.
参与Notify机制的类
参与Notify
机制的类
有:
ACE_Select_Reactor_Impl
、ACE_Select_Reactor_Notify
和ACE_Pipe
。从ACE_Select_Reactor
开始,采用自上而向下的顺
序进
行分析。
5.
实现原理
各类
型Reactor
的实现
方法有所不同,比如Select_Reactor
是通过
管道来实现notify
功能,而WFMO则
通过ACE_Event类
型
的的事件加通知队列来实现
。
Reactor
在初始化时
,会创
建
一个pipe
。然后将该pipe
的句柄注册到Reactor
。这样
,这
个Pipe
就处
于Reactor
的事件分派句柄集中,像其他普通句柄一样
,被Reactor
所监测
和分派。当用户调
用notify()时
,notify()
会把参数信息写入这
个
管道,通过事件监测,从而Reactor
就能得知有新notify
到来,从管道读
取
相关数据后,就可以进
行
相应
的分派处
理
。从而实现
了通知和事件分派的序列化。
详细
的来说
,ACE为
所有的Reactor
都定义
了专为
它们
自己服务
的Notify类
,如
ACE_Select_Reactor_Notify
和ACE_WFO_Reactor_Notify
。
在类Unix
平台上,支持管道并且select
可用于监测
管道事件。但是在Win32
平台并不支持管道功能,而且Winsock
的select
只能支持对socket
句柄的事件监测
。因此,ACE
在Win32
平台用一个本地连
接
来模拟Pipe
的功能,其中接受连
接
的socket
句柄作为Pipe读
句柄,而发
起连
接
的socket
句柄作为Pipe
的写句柄。
可以看出,notify
功能的实现机制类似于ACE_Reactor
的标
准
用法,并
且也有一个类
似于ACE_Event_Handler
的类
存在,只不过
是被包含在Reactor
中。在逻辑
意义
上,就是一个本地连
接
,用于和进
程
内的其他部分的通信
(略有差别的地方在于事件分派的方式)
。
对
于ACE_Select_Reactor,ACE_Pipe
充当了ACE_Sock_Stream
的角色,而ACE_Select_Reactor_Notify则
充当了ACE_Event_Handler
的角色。ACE_Select_Reactor_Notify
派生自ACE_Reactor_Notify,而ACE_Reactor_Notify
正是派生自ACE_Event_Handler.
6.
相关变量
ACE_Select_Reactor_Impl类
中,定义
了类
型为ACE_Reactor_Notify*
的成员变
量
/
CallbackobjectthatunblockstheACE_Select_Reactorifit's
/// sleeping. ACE_Reactor_Notify * notify_handler_; 7. 实现分析
7.1
初始化
ACE_Select_Reactor_T
的open
函数定义
如下:
virtual
int
open(size_tmax_number_of_handles
=
DEFAULT_SIZE,
restart 0 , ACE_Sig_Handler * disable_notify_pipe ACE_DISABLE_NOTIFY_PIPE_DEFAULT, ACE_Reactor_Notify ); 其中ACE_DISABLE_NOTIFY_PIPE_DEFAULT的值为0,表示默认要使用Notify功能。
if
(result
!=
-
1
&&
this
->
notify_handler_
==
0
)
{ ACE_NEW_RETURN(this->notify_handler_, ACE_Select_Reactor_Notify, -1); if(notify_handler_==0) result=; else delete_notify_handler_true; } handler_rep_.open(size) ) result = ; else ( open( disable_notify_pipe) ) { ACE_ERROR((LM_ERROR, ACE_TEXT("%p ),0);">notificationpipeopenfailed))); result; } 如果没有使用外部的notify时,ACE_Select_Reactor_T将会让notify_handler指向一个new出来的ACE_Select_Reactor_Notify类型的对象,并调用它的open()方法进行初始化。
ACE_Select_Reactor_Notify的open()实现如下(删除了非关键代码):
(disable_notify_pipe
)
{ select_reactor_ dynamic_cast < ACE_Select_Reactor_Impl *> (r); (select_reactor_ ) { errno EINVAL; return ; } notification_pipe_.open() ) ; (ACE::set_flags( notification_pipe_.read_handle(), ACE_NONBLOCK) register_handler ( { ; }
注意,这里的disable_notify_pipe就是ACE_Select_Reactor_T的open()方法中传递的参数,通过它,可以设置不使用notify功能。 ACE_Pipe的open()实现如下(Win32平台并删除了非关键代码):
ACE_INET_Addrmy_addr;
ACE_SOCK_Acceptoracceptor; ACE_SOCK_Connectorconnector; ACE_SOCK_Streamreader; ACE_SOCK_Streamwriter; int result ; # defined(ACE_WIN32) ACE_INET_Addrlocal_any(static_cast u_short > #endif (acceptor.open(local_any) || acceptor.get_local_addr(my_addr) { ACE_INET_Addrsv_addr(my_addr.get_port_number(), ACE_LOCALHOST); //Establishaconnectionwithinthesameprocess.(connector.connect(writer,sv_addr)(acceptor.accept(reader)) { writer.close(); result; } } acceptor.close(); handles_[ ] reader.get_handle(); writer.get_handle(); 可以看到, ACE_Pipe 通 过 了一个自身的 Tcp 连 接来模 拟 Pipe , 读 写句柄分 别 就是两端的 socket 句柄,很巧妙的 Pipe 移植。 而 ACE_Pipe 的 read_handle () 函数返回的是 handles_ [ 0 ] ,也就是说,注册到 Reactor 是读 socket, 而 handles_ [1] 将会被 客户所使用。
7.2 方法调用
在用户使用notify相关方法时,ACE_Select_Reactor_T充当中介,会将将它们直接交由内部的notify_handler对象处理。 ssize_t
const
n
notify(eh,mask,imeout);
? : ; ACE_Select_Reactor_Nofity的notify()函数代码:
;
ACE_Event_Handler_varsafe_handler(event_handler); (event_handler) event_handler add_reference(); ACE_Notification_Bufferbuffer(event_handler,mask); ssize_t ACE::send( notification_pipe_.write_handle(), ( char * ) & buffer,255);">sizeof (n // Nofailures. safe_handler.release(); 这段代码将用户传递的“目的Handler指针“和”掩码“参数,写入了”管道,实则是通过socket句柄发送了数据。ACE_Notification_Buffer是一个简单的数据包裹类。
7.3 事件分派
Reactor()的事件多路分离流程: ACE_Select_Reactor_T::()代码如下:
ACE_SEH_TRY
{ Weusethedatamemberdispatch_set_asthecurrentdispatch set. Weneedtostartfromacleandispatch_set dispatch_set_.rd_mask_.reset(); dispatch_set_.wr_mask_.reset(); dispatch_set_.ex_mask_.reset(); number_of_active_handles wait_for_multiple_events( dispatch_set_, max_wait_time); result dispatch(number_of_active_handles,0);">dispatch_set_); } ACE_SEH_EXCEPT( release_token()) { … } Reactor在监测事件时,不区分Noftiy用句柄和其它句柄,一视同仁。所以无需关心wait_for_multiple_events的实现。 可以看出,Select_Reactor分派事件时是按照:定时器、通知、IO事件的顺序进行的。
ACE_Select_Reactor_T::dispatch_notifications函数代码:ACE_HANDLE
read_handle
notification_pipe_.read_handle();
到了这里,handle_input出现了,下面的处理和ACE_Event_Handler相近了。
number_dispatched
;
ACE_Notification_Bufferbuffer; while ((result read_notify_pipe(handle,buffer)) dispatch_notify(buffer) ++ number_dispatched; (number_dispatched max_notify_iterations_) ; }
代码逻辑很清楚,从Pipe的缓冲区中依次读取所有接受到的Notify事件信息,并且交给dispatch_notify去处理,如果达到设定值max_notify_iterations_(ACE_Reactor:: max_notify_iterations函数的功能),就停止分派。
(buffer.eh_
)
{ ACE_Event_Handler event_handler buffer.eh_; bool requires_reference_counting event_handler reference_counting_policy().value() ACE_Event_Handler::Reference_Counting_Policy::ENABLED; switch (buffer.mask_) { case ACE_Event_Handler::READ_MASK: ACE_Event_Handler::ACCEPT_MASK: result handle_input(ACE_INVALID_HANDLE); ACE_Event_Handler::WRITE_MASK: result handle_output(ACE_INVALID_HANDLE); ACE_Event_Handler::EXCEPT_MASK: result handle_exception(ACE_INVALID_HANDLE); ACE_Event_Handler::QOS_MASK: result handle_qos(ACE_INVALID_HANDLE); ACE_Event_Handler::GROUP_QOS_MASK: result handle_group_qos(ACE_INVALID_HANDLE); default : Shouldwebailoutifwegetaninvalidmask? ACE_ERROR((LM_ERROR, ACE_TEXT( " invalidmask=%d ) event_handler handle_close(ACE_INVALID_HANDLE, ACE_Event_Handler::EXCEPT_MASK); } 这里是分派的终点,用户传递的用于处理Notify事件的Handler根据mask的类型而被调用。 8.序列图 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |