#include "ace/OS_main.h" #include "ace/OS_NS_string.h" #include "ace/OS_NS_unistd.h" #include "ace/Reactor.h" #include "ace/Process.h" #include "ace/SOCK_Dgram.h" #include "ace/INET_Addr.h" #include "ace/Log_Msg.h" #include "ace/Thread_Manager.h" #include "ace/Task_T.h"
#define SERVER_PORT 10101 static const size_t TASK_THREAD_POOL_SIZE = 10;
class Task_Worker: public ACE_Task<ACE_MT_SYNCH> { public: virtual int svc(void) { while(1) { ACE_Message_Block *mb = NULL; if(this->getq(mb) == -1) { continue; } process_task(mb); } return 0; } private: void process_task(ACE_Message_Block *mb) { mb->release(); } };
class Task_Manager: public ACE_Task<ACE_MT_SYNCH> { public: virtual int svc(void) { Task_Worker task_tp; task_tp.activate(THR_NEW_LWP | THR_JOINABLE,TASK_THREAD_POOL_SIZE); while(1) { ACE_Message_Block *mb = NULL; if(this->getq(mb) < 0) { task_tp.msg_queue()->deactivate(); task_tp.wait(); } task_tp.putq(mb); } return 0; } };
class Dgram_Endpoint : public ACE_Event_Handler { public: Task_Manager task_mgr; Dgram_Endpoint (const ACE_INET_Addr &local_addr); virtual ACE_HANDLE get_handle (void) const; virtual int handle_input (ACE_HANDLE handle); virtual int handle_timeout (const ACE_Time_Value & tv,const void *arg = 0); virtual int handle_close (ACE_HANDLE handle,ACE_Reactor_Mask close_mask); virtual int handle_signal (int signum,siginfo_t*,ucontext_t*); int send (const char *buf,size_t len,const ACE_INET_Addr &);
private: ACE_SOCK_Dgram endpoint_; };
int Dgram_Endpoint::send (const char *buf,const ACE_INET_Addr &addr) { return this->endpoint_.send (buf,len,addr); }
Dgram_Endpoint::Dgram_Endpoint (const ACE_INET_Addr &local_addr) : endpoint_(local_addr) { task_mgr.activate(); }
ACE_HANDLE Dgram_Endpoint::get_handle (void) const { return this->endpoint_.get_handle(); }
int Dgram_Endpoint::handle_close (ACE_HANDLE handle,ACE_Reactor_Mask) {
ACE_DEBUG((LM_DEBUG,"************handle_close***********/n")); ACE_UNUSED_ARG (handle); this->endpoint_.close(); delete this; return 0; }
int Dgram_Endpoint::handle_input (ACE_HANDLE) { char buf[BUFSIZ]; ACE_INET_Addr from_addr; char address[32];
//ACE_DEBUG ((LM_DEBUG,"(%P|%t) activity occurred on handle %d!/n",this->endpoint_.get_handle ())); ssize_t nbytes = this->endpoint_.recv (buf,sizeof(buf),from_addr); #if 0 if (nbytes == -1) ACE_ERROR ((LM_ERROR,"%p","handle_input error/n")); else ACE_DEBUG ((LM_DEBUG,"[%d]bytes from[%s] received:%s/n",nbytes,address,buf)); #endif
ACE_Message_Block *mb = NULL; ACE_NEW_RETURN(mb,ACE_Message_Block(nbytes,ACE_Message_Block::MB_DATA,buf),-1); mb->wr_ptr(nbytes); this->task_mgr.putq(mb); return 0; }
int Dgram_Endpoint::handle_timeout (const ACE_Time_Value &,const void *) { ACE_DEBUG ((LM_DEBUG,"(%P|%t) timed out for endpoint/n")); return 0; } int Dgram_Endpoint::handle_signal (int signum,siginfo_t* siginfo,ucontext_t* context) { return ACE_Event_Handler::handle_signal (signum,siginfo,context); } int main (int argc,char *argv[]) { ACE_INET_Addr local_addr(SERVER_PORT); Dgram_Endpoint *endpoint;
ACE_NEW_RETURN (endpoint,Dgram_Endpoint (local_addr),-1);
if (ACE_Reactor::instance ()->register_handler(endpoint,ACE_Event_Handler::READ_MASK) == -1) { ACE_ERROR_RETURN ((LM_ERROR,"ACE_Reactor::register_handler"),-1); }#if 0 if (-1 == ACE_Reactor::instance()->register_handler(SIGINT,endpoint)) { ACE_ERROR_RETURN((LM_ERROR,"fail to register SIGINT handler"),-1); } ACE_Time_Value time_out(5); ACE_Reactor::instance()->schedule_timer(&callback,(void *)"time out",ACE_Time_Value::zero,time_out);#endif ACE_Reactor::instance()->run_event_loop(); return 0;} (编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|