ACE反应器(Reactor)模式
ACE反应器(Reactor)模式 1.ACE反应器框架简介 反应器(Reactor):用于事件多路分离和分派的体系结构模式 通常的,对一个文件描述符指定的文件或设备,有两种工作方式:阻塞与非阻塞。所谓阻塞方式的意思是指当试图对该文件描述符进行读写时如果当时没有东西可读或者暂时不可写程序就进入等待状态直到有东西可读或者可写为止。而对于非阻塞状态如果没有东西可读或者不可写读写函数马上返回而不会等待。 在前面的章节中提到的Tcp通信的例子中,就是采用的阻塞式的工作方式:当接收tcp数据时,如果远端没有数据可以读,则会一直阻塞到读到需要的数据为止。这种方式的传输和传统的被动方法的调用类似,非常直观,并且简单有效,但是同样也存在一个效率问题,如果你是开发一个面对着数千个连接的服务器程序,对每一个客户端都采用阻塞的方式通信,如果存在某个非常耗时的读写操作时,其它的客户端通信将无法响应,效率非常低下。 一种常用做法是:每建立一个Socket连接时,同时创建一个新线程对该Socket进行单独通信(采用阻塞的方式通信)。这种方式具有很高的响应速度,并且控制起来也很简单,在连接数较少的时候非常有效,但是如果对每一个连接都产生一个线程的无疑是对系统资源的一种浪费,如果连接数较多将会出现资源不足的情况。 另一种较高效的做法是:服务器端保存一个Socket连接列表,然后对这个列表进行轮询,如果发现某个Socket端口上有数据可读时(读就绪),则调用该socket连接的相应读操作;如果发现某个Socket端口上有数据可写时(写就绪),则调用该socket连接的相应写操作;如果某个端口的Socket连接已经中断,则调用相应的析构方法关闭该端口。这样能充分利用服务器资源,效率得到了很大提高。 在Socket编程中就可以通过select等相关API实现这一方式。但直接用这些API控制起来比较麻烦,并且也难以控制和移植,在ACE中可以通过Reactor模式简化这一开发过程。 反应器本质上提供一组更高级的编程抽象,简化了事件驱动的分布式应用的设计和实现。除此而外,反应器还将若干不同种类的事件的多路分离集成到易于使用的API中。特别地,反应器对基于定时器的事件、信号事件、基于I/O端口监控的事件和用户定义的通知进行统一地处理。 ACE中的反应器与若干内部和外部组件协同工作。其基本概念是反应器框架检测事件的发生(通过在OS事件多路分离接口上进行侦听),并发出对预登记事件处理器(event handler)对象中的方法的"回调"(callback)。该方法由应用开发者实现,其中含有应用处理此事件的特定代码。 使用ACE的反应器,只需如下几步: 1. 创建事件处理器,以处理他所感兴趣的某事件。2. 在反应器上登记,通知说他有兴趣处理某事件,同时传递他想要用以处理此事件的事件处理器的指针给反应器。 随后反应器框架将自动地: 1. 在内部维护一些表,将不同的事件类型与事件处理器对象关联起来。2. 在用户已登记的某个事件发生时,反应器发出对处理器中相应方法的回调。 反应器模式在ACE中被实现为ACE_Reactor类,它提供反应器框架的功能接口。 如上面所提到的,反应器将事件处理器对象作为服务提供者使用。反应器内部记录某个事件处理器的特定事件的相关回调方法。当这些事件发生时,反应器会创建这种事件和相应的事件处理器的关联。 1. 事件处理器 在反应器框架中,所有应用特有的事件处理器都必须由ACE_Event_Handler的抽象接口类派生。可以通过重载相应的"handle_"方法实现相关的回调方法。ACE_Reactor基本上有三个步骤: 1. 创建ACE_Event_Handler的子类,并在其中实现适当的"handle_"方法,以处理你想要此事件处理器为之服务的事件类型。2. 通过调用反应器对象的register_handler(),将你的事件处理器登记到反应器。3. 在事件发生时,反应器将自动回调相应的事件处理器对象的适当的handle_"方法。 下面我就以一个Socket客户端的例子为例简单的说明反应器的基本用法。 Cpp代码 1. #include<ace/OS.h> 2. #include<ace/Reactor.h> 3. #include<ace/SOCK_Connector.h> 4. 5. #include<string> 6. #include<iostream> 7. usingnamespacestd; 8. 9. classMyClient:publicACE_Event_Handler 10. { 11. public: 12. boolopen() 13. { 14. ACE_SOCK_Connectorconnector; 15. ACE_INET_Addraddr(3000,"127.0.0.1"); 16. ACE_Time_Valuetimeout(5,0); 17. if(connector.connect(peer,addr,&timeout)!=0) 18. { 19. cout<<endl<<"connecetdfail"; 20. returnfalse; 21. } 22. ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK); 23. cout<<endl<<"connecetd"; 24. returntrue; 25. } 26. 27. ACE_HANDLEget_handle(void)const 28. { 29. returnpeer.get_handle(); 30. } 31. 32. inthandle_input(ACE_HANDLEfd) 33. { 34. intrev=0; 35. ACE_Time_Valuetimeout(5,0); 36. if((rev=peer.recv(buffer,1000,&timeout))>0) 37. { 38. buffer[rev]='/0'; 39. cout<<endl<<"rev:/t"<<buffer<<endl; 40. } 41. return3; 42. } 43. 44. private: 45. ACE_SOCK_Streampeer; 46. charbuffer[1024]; 47. }; 48. 49. intmain(intargc,char*argv[]) 50. { 51. MyClientclient; 52. client.open(); 53. 54. while(true) 55. { 56. ACE_Reactor::instance()->handle_events(); 57. } 58. 59. return0; 60. }
在这个例子中,客户端连接上服务器后,通过ACE_Reactor::instance()->register_handler(this注册了一个读就绪的回调函数,当服务器端给客户端发消息的时候,会自动触发handle_input()函数,将接收到的信息打印出来。 这个例子只是为了演示反应器的基本用法,并不完善,我将在下一节中对如何在Socket通信中使用反应器做进一步的介绍。 2 . Socket编程中的事件处理器Socket编程中,常见的事件就是"读就绪","写就绪",通过对这两个事件的捕获分发,可以实现Socket中的异步操作。 Socket编程中的事件处理器 在前面我们已经介绍过,在ACE反应器框架中,任何都必须派生自ACE_Event_Handler类,并通过重载其相应会调事件处理函数来实现相应的回调处理的。在Socket编程中,我们通常需要重载的函数有 1. handle_input() 此外,为了使Reactor能通过I/O句柄找到对应的事件处理器,还必须重载其get_handle()方法以使得Reactor建立起I/O句柄和事件处理器的关联。 使用Reactor框架。 下面我们将以一个客户端的程序为例,介绍如何在Socket编程中使用Reactor框架。 一.建立一个客户端对象(事件处理器)。 客户端对象就是一个事件处理器,其声明如下: Cpp代码 1. classClient:publicACE_Event_Handler 2. { 3. public: 4. ACE_HANDLEget_handle(void)const; 5. inthandle_input(ACE_HANDLEfd); 6. inthandle_close(ACE_HANDLEhandle, 7. ACE_Reactor_Maskclose_mask); 8. ACE_SOCK_Stream&Peer(); 9. private: 10. ACE_SOCK_Streampeer; 11. }; Client端中我只关心"事件,故只重载了handle_input函数(大多数应用下只需要重载handle_input函数)。另外,在客户端还保存了一个ACE_SOCK_Stream的peer对象用来进行Socket通信,同时封装了一个Peer()函数返回它的引用。 二.重载相应回调处理函数 Cpp代码 1. ACE_SOCK_Stream&Client::Peer() 2. { 3. returnpeer; 4. } 5. 6. ACE_HANDLEClient::get_handle(void)const 7. { 8. returnpeer.get_handle(); 9. } 10. 11. intClient::handle_input(ACE_HANDLEfd) 12. { 13. intrev=0; 14. if((rev=peer.recv(buffer,1000))>0) 15. { 16. buffer[rev]='/0'; 17. cout<<endl<<"rev:/t"<<buffer<<endl; 18. return0; 19. }
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |