ACE_Reactor(五)ACE_TP_Reactor和ACE_Select_Reactor的区别
在ACE_Select_Reactor_T的handle_events中,进去就会获取Token,调到ACE_Guard直至ACE_Token的share_acquire函数,会调用一个sleepHook,这个hook的代码是: 00058 // Used to wakeup the reactor.
00059
00060 template <class ACE_TOKEN_TYPE> void
00061 ACE_Reactor_Token_T<ACE_TOKEN_TYPE>::sleep_hook (void)
00062 {
00063 ACE_TRACE ("ACE_Reactor_Token_T::sleep_hook");
00064 if (this->reactor_->notify () == -1)
00065 ACE_ERROR ((LM_ERROR,00066 ACE_LIB_TEXT ("%pn"),00067 ACE_LIB_TEXT ("sleep_hook failed")));
00068 }
00069
由于这个Nofity实际是在handle_events 01385 template <class ACE_SELECT_REACTOR_TOKEN> int
01386 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::handle_events
01387 (ACE_Time_Value *max_wait_time)
01388 {
01389 ACE_TRACE ("ACE_Select_Reactor_T::handle_events");
01390
01391 #if defined (ACE_MT_SAFE) && (ACE_MT_SAFE != 0)
01392
01393 // Stash the current time -- the destructor of this object will
01394 // automatically compute how much time elapsed since this method was
01395 // called.
01396 ACE_Countdown_Time countdown (max_wait_time);
01397
01398 ACE_GUARD_RETURN (ACE_SELECT_REACTOR_TOKEN,ace_mon,this->token_,-1);
。。。。。。。
以及 00143 ACE_TP_Reactor::handle_events (ACE_Time_Value *max_wait_time)
00144 {
00145 ACE_TRACE ("ACE_TP_Reactor::handle_events");
00146
00147 // Stash the current time -- the destructor of this object will
00148 // automatically compute how much time elapsed since this method was
00149 // called.
00150 ACE_Countdown_Time countdown (max_wait_time);
00151
00152 //
00153 // The order of these events is very subtle,modify with care.
00154 //
00155
00156 // Instantiate the token guard which will try grabbing the token for
00157 // this thread.
00158 ACE_TP_Token_Guard guard (this->token_);
00159
00160 int const result = guard.acquire_read_token (max_wait_time);
00161
这两个函数功能基本相同,所以后半部分类似,但是前面这个地方对于获取Token,则有较大区别。 00681 /// Synchronization token for the MT_SAFE ACE_Select_Reactor.
00682 ACE_SELECT_REACTOR_TOKEN token_;
不过ACE_Select_Reactor_T中的这个Guard宏展开就是 ACE_Guard< ACE_SELECT_REACTOR_TOKEN> ace_mon(this->token_);
if (ace_mon.locked () != 0) { ;; }
else { return -1; }
而在这个ACE_Guard的构造函数中,会预先调用acquire函数,在这个函数里会引发调用token_的acquire()函数继而调用到了ACE_Token的shared_acquire,因为其token类型是ACE_REACTOR_TOKEN_T最终调用了的就是这个类中的sleep_hook,其特点就是会调用ACE_Select_Reactor_Notify的notify接口。 00183 int
00184 ACE_Token::shared_acquire (void (*sleep_hook_func)(void *),00185 void *arg,00186 ACE_Time_Value *timeout,00187 ACE_Token_Op_Type op_type)
00188 {
00189 ACE_TRACE ("ACE_Token::shared_acquire");
00190 ACE_GUARD_RETURN (ACE_Thread_Mutex,this->lock_,-1);
00191
00192 #if defined (DEBUGGING)
00193 this->dump ();
00194 #endif /* DEBUGGING */
00195
00196 ACE_thread_t thr_id = ACE_Thread::self ();
00197
00198 // Nobody holds the token.
00199 if (!this->in_use_)
00200 {
00201 // Its mine!
00202 this->in_use_ = op_type;
00203 this->owner_ = thr_id;
00204 return 0;
00205 }
00206
00207 //
00208 // Someone already holds the token.
00209 //
00210
00211 // Check if it is us.
00212 if (ACE_OS::thr_equal (thr_id,this->owner_))
00213 {
00214 ++this->nesting_level_;
00215 return 0;
00216 }
00217
00218 // Do a quick check for "polling" behavior.
00219 if (timeout != 0 && timeout->sec () == 0 && timeout->usec () == 0)
00220 {
00221 errno = ETIME;
00222 return -1;
00223 }
00224
00225 //
00226 // We've got to sleep until we get the token.
00227 //
00228
00229 // Which queue we should end up in...
00230 ACE_Token_Queue *queue = (op_type == ACE_Token::READ_TOKEN
00231 ? &this->readers_
00232 : &this->writers_);
00233
00234 // Allocate queue entry on stack. This works since we don't exit
00235 // this method's activation record until we've got the token.
00236 ACE_Token::ACE_Token_Queue_Entry my_entry (this->lock_,00237 thr_id,00238 this->attributes_);
00239 queue->insert_entry (my_entry,this->queueing_strategy_);
00240 ++this->waiters_;
00241
00242 // Execute appropriate <sleep_hook> callback. (@@ should these
00243 // methods return a success/failure status,and if so,what should
00244 // we do with it?)
00245 int ret = 0;
00246 if (sleep_hook_func)
00247 {
00248 (*sleep_hook_func) (arg);
00249 ++ret;
00250 }
00251 else
00252 {
00253 // Execute virtual method.
00254 this->sleep_hook ();
00255 ++ret;
00256 }
。。。。。。。。
从代码中可以看到第212行会判断这个token的owner是否就是本线程,若是则直接返回ile根部就走不到254行的sleep_hook()函数。所以这里的Token是一个可重入的递归锁,reactor所属线程是可以直接获取到锁执行的,而其他线程则会调用notify后加入等待token的线程队列中,reactor主线程会被打断后分配事件处理,在dispatch前释放token,那个其他线程获取到更改reactor的状态,待其他线程释放token后,reactor主线程获取到token又可以开始下一次的检测和处理。 ACE_TP_Reactor没有像 Select_Reactor 一样使用一般的 ACE_GUARD_RETURN 宏,后者间接定义的是 ACE_Guard 模板对象,在构造器中默认会调用 LOCK 的 acquire 方法;而 ACE_TP_Token_Guard 则不会在构造器中调用任何锁的方法,而是对象构造后,使用 grab_token/acquire_token 来显示获取锁,析构器中调用 release 则是一致的。 01271 template <class ACE_SELECT_REACTOR_TOKEN> int
01272 ACE_Select_Reactor_T<ACE_SELECT_REACTOR_TOKEN>::dispatch
01273 (int active_handle_count,01274 ACE_Select_Reactor_Handle_Set &dispatch_set)
函数中,有如下代码: 01340 // Next dispatch the notification handlers (if there are any to
01341 // dispatch). These are required to handle multi-threads that
01342 // are trying to update the <Reactor>.
01343
01344 else if (this->dispatch_notification_handlers
01345 (dispatch_set,01346 active_handle_count,01347 other_handlers_dispatched) == -1)
01348 // State has changed or a serious failure has occured,so exit
01349 // loop.
01350 break;
最终调用到ACE_Select_Reactor_Notify的handle 01051 int
01052 ACE_Select_Reactor_Notify::handle_input (ACE_HANDLE handle)
01053 {
01054 ACE_TRACE ("ACE_Select_Reactor_Notify::handle_input");
01055 // Precondition: this->select_reactor_.token_.current_owner () ==
01056 // ACE_Thread::self ();
01057
01058 int number_dispatched = 0;
01059 int result = 0;
01060 ACE_Notification_Buffer buffer;
01061
01062 while ((result = this->read_notify_pipe (handle,buffer)) > 0)
01063 {
01064 // Dispatch the buffer
01065 // NOTE: We count only if we made any dispatches ie. upcalls.
01066 if (this->dispatch_notify (buffer) > 0)
01067 ++number_dispatched;
01068
01069 // Bail out if we've reached the <notify_threshold_>. Note that
01070 // by default <notify_threshold_> is -1,so we'll loop until all
01071 // the notifications in the pipe have been dispatched.
01072 if (number_dispatched == this->max_notify_iterations_)
01073 break;
01074 }
01075
01076 // Reassign number_dispatched to -1 if things have gone seriously
01077 // wrong.
01078 if (result < 0)
01079 number_dispatched = -1;
01080
01081 // Enqueue ourselves into the list of waiting threads. When we
01082 // reacquire the token we'll be off and running again with ownership
01083 // of the token. The postcondition of this call is that
01084 // <select_reactor_.token_.current_owner> == <ACE_Thread::self>.
01085 this->select_reactor_->renew ();
01086 return number_dispatched;
01087 }
如果是普通的ACE_Select_Reactor的reactor,则renew是可以用的。可以在打断select后,执行事件处理后,将需要检测的事件handles刷新,重新检测。 网络例子代码: (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |