ACE 的Reactor/Proactor框架下高并发、大容量吞吐介绍
最近接触力ACE,发现了很多好文章,在此转载一下,也当作一个笔记吧。
Reactor与Proactor基本概念在高性能的I/O设计中,有两个比较著名的模式Reactor和Proactor模式,其中Reactor模式用于同步I/O,而Proactor运用于异步I/O操作。 在比较这两个模式之前,我们首先的搞明白几个概念,
阻塞和非阻塞是针对于进程在访问数据的时候,根据IO操作的就绪状态来采取的不同方式,说白了是一种读取或者写入操作函数的实现方式。 阻塞方式下读取或者写入函数将一直等待。 非阻塞方式下,读取或者写入函数会立即返回一个状态值。
同步和异步是针对应用程序和内核的交互而言的。 同步指的是用户进程触发IO操作并等待或者轮询的去查看IO操作是否就绪。 异步是指用户进程触发IO操作以后便开始做自己的事情,而当IO操作已经完成的时候会得到IO完成的通知。 一般来说I/O模型可以分为:同步阻塞,同步非阻塞,异步阻塞,异步非阻塞。 让我们来看一下每种不同I/O模型的具体描述
在此种方式下,用户进程在发起一个IO操作以后,必须等待IO操作的完成,只有当真正完成了IO操作以后,用户进程才能运行。JAVA传统的IO模型属于此种方式!
在此种方式下,用户进程发起一个IO操作以后边可返回做其它事情,但是用户进程需要时不时的询问IO操作是否就绪,这就要求用户进程不停的去询问,从而引入不必要的CPU资源浪费。其中目前JAVA的NIO就属于同步非阻塞IO。
此种方式下,应用发起一个IO操作以后,不等待内核IO操作的完成,等内核完成IO操作以后会通知应用程序,这其实就是同步和异步最关键的区别,同步必须等待或者主动的去询问IO是否完成,那么为什么说是阻塞的呢?因为此时是通过select系统调用来完成的,而select函数本身的实现方式是阻塞的,而采用select函数有个好处就是它可以同时监听多个文件句柄,从而提高系统的并发性!
此种方式下,用户进程只需要发起一个IO操作然后立即返回,等IO操作真正的完成以后,应用程序会得到IO操作完成的通知,此时用户进程只需要对数据进行处理就好了,不需要进行实际的IO读写操作,因为真正的IO读取或者写入操作已经由内核完成了。目前Java中还没有支持此种IO模型。 阻塞型I/O意味着控制权只有到调用操作结束后才会回到调用者手里.结果调用者被阻塞了,这段时间了做不了任何其它事情。 更郁闷的是,在等待IO结果的时间里,调用者所在线程此时无法腾出手来去响应其它的请求,这真是太浪费资源了。拿read()操作来说吧,调用此函数的代码会一直僵在此处直至它所读的socket缓存中有数据到来。 相比之下,非阻塞同步是会立即返回控制权给调用者的。调用者不需要等等,它从调用的函数获取两种结果:要么此次调用成功进行了;要么系统返回错误标识告诉调用者当前资源不可用,你再等等或者再试一次看吧。比如read()操作,如果当前socket无数据可读,则立即返回EWOULBLOCK/EAGAIN,告诉调用read()者"数据还没准备好,你稍后再试".。 在非阻塞异步调用中,稍有不同。调用函数在立即返回时,还告诉调用者,这次请求已经开始了。系统会使用另外的资源或者线程来完成这次调用操作,并在完成的时候知会调用者(比如通过回调函数)。拿Windows的ReadFile()或者POSIX的aio_read()来说,调用它之后,函数立即返回,操作系统在后台同时开始读操作。 在以上三种IO形式中,非阻塞异步是性能最高、伸缩性最好的。搞清楚了以上概念以后,我们再回过头来看看,Reactor模式和Proactor模式。 此文详细的阐述了基于TCP高性能的GOLDEN数据服务器模块的设计以及解决方案 ,我们在文章的后面就不再提及阻塞式的方案了,因为阻塞式I/O实在是缺少可伸缩性,性能也达不到高性能服务器的要求。 两种IO多路复用方案:Reactor和Proactor一般情况下,I/O复用机制需要事件分离器(eventdemultiplexor).事件分离器的作用,就是将那些读写事件源分发给各读写事件的处理者,就像送快递的在楼下喊:谁的什么东西送了,快来拿吧。开发人员在开始的时候需要在事件分离器那里注册感兴趣的事件,并提供相应的事件处理器(eventhandlers),或者是回调函数;事件分离器在适当的时候会将请求的事件分发给这些handler或者回调函数。 涉及到事件分离器的两种模式称为:Reactor和Proactor。Reactor模式是基于同步I/O的,而Proactor模式是和异步I/O相关的。 在Reactor模式中,事件分离者等待某个事件或者是应用或者是某个操作的状态发生(比如文件描述符可读写,或者是socket可读写),事件分离者就把这个事件传给事先注册的事件处理器或者事件处理函数或者回调函数,由后者来做实际的读写操作。 在Proactor模式中,事件处理器(或者由事件分离器代为)直接发起一个异步读写操作(相当于请求),而实际的工作是由操作系统来完成的。发起时,需要提供的参数包括用于存放读到数据的缓存区,读的数据大小,或者用于存放外发数据的缓存区,以及这个请求完后的回调函数等信息。事件分离器得知了这个请求,它默默等待这个请求的完成,然后转发完成事件给相应的事件处理器或者事件处理函数或者回调。举例来说,在Windows上事件处理器投递了一个异步IO操作(称有overlapped的技术),事件分离器等IOCompletion事件完成 ,这种异步模式的典型实现是基于操作系统底层异步API的,所以我们可称之为“系统级别”的或者“真正意义上”的异步,因为具体的读写是由操作系统代劳的。 举另外个例子来更好地理解Reactor与Proactor两种模式的区别。这里我们只关注read操作,因为write操作也是差不多的。下面是Reactor的做法:
下面再来看看真正意义的异步模式Proactor是如何做的:
现行做法开源C++开发框架ACE(Adaptive Communication Enviromen)提供了大量平台独立的底层并发支持类(线程、互斥量等).同时在更高一层它也提供了独立的几组C++类,用于实现Reactor及Proactor模式。 尽管它们都是平台独立的单元,但他们都提供了不同的接口. ACE Proactor在MS-Windows上无论是性能还在健壮性都更胜一筹,这主要是由于Windows提供了一系列高效的底层异步API。 不幸的是,并不是所有操作系统都为底层异步提供健壮的支持。举例来说,许多Unix系统就有麻烦.ACE中的Proactor在Unix上是使用Posix标准实现的异步操作,Posix中有一个AIO,Proactor使用AIO实现异步传输。但Linux在2.6以前版本中不支持AIO,而在2.6版本以后,部分支持AIO。就因为这个部分支持,所以,Posix的子类不能正常工作。因此,ACEReactor可能是Unix系统上更合适的解决方案.正因为系统底层的支持力度不一,为了在各系统上有更好的性能,开发者不得不维护独立的好几份代码:为Windows准备的ACEProactor以及为Unix系列提供的ACEReactor。 就像我们提到过的,真正的异步模式需要操作系统级别的支持。由于事件处理器及操作系统交互的差异,为Reactor和Proactor设计一种通用统一的外部接口是非常困难的。这也是设计通行开发框架的难点所在。 ACE Proactor框架怎样发送和接收数据ACEProactor框架包含了一组高度相关的类,其数量相对较多,我在进行以下描述的时候不可能按照顺序讨论它们,而又不进行提前引用。到最后我会描述完所有这些类。下面这些类给出了ACE Proactor框架的各个类以及它们之间的关系。可以把这个图1-1当作描述ACE Proactor框架实际应用的范本。注意:类名中以ACE_开始的类名称是ACE Procator框架中包含的类,而以golden_开始的类名称是实际应用范本提供的类。 下面的代码声明了一个类,它所完成的基本工作是处理接收和发送数据。
图1.1ACE Proactor框架中的类 #include "ace/Asynch_IO.h" class golden_aio_handler : public ACE_Service_Handler { public : golden_aio_handler (golden_aio_acceptor *acc = 0) ; virtual void open ( ACE_HANDLE new_handle, ACE_Message_Block &message_block ) ; virtual void handle_read_stream( const ACE_Asynch_Read_Stream::Result &result); virtual void handle_write_stream( const ACE_Asynch_Write_Stream::Result &result); private: ACE_Asynch_Read_Stream reader_; ACE_Asynch_Write_Stream writer_; } ; 这段代码首先包含了一些必需的头文件,以引入这个例子使用的ACE Proactor框架类:
设置事件处理器并发起I/O当TCP连接打开时,我们应该把新socket的句柄传给事件处理器对象,在这个例子中是golden_aio_handler。把句柄放在事件处理器里是有益的,原因如下:
在使用ACE_Proactor框架的异步连接建立类时golden_aio_handler::open()挂钩方法会在新连接建立时被调用。下面是我们程序中的open()挂钩: void golden_aio_handler::open(ACE_HANDLE new_handle,ACE_Message_Block &) this->handle(new_handle); //打开异步读写 reader_.open (*this,new_handle,proactor ()); writer_.open (*this,宋体; font-size:14px"> //准备读的缓冲区 ACE_NEW_NORETURN(mblk_,ACE_Message_Block (SIZEOF_HEADER_WITH_CRC)); if (reader_.read (*mblk_,SIZEOF_HEADER_WITH_CRC) <0) delete this ; } 在一开始,我们使用继承而得到的ACE_Handler::handle()方法保存新socket的句柄。该方法把句柄存储在一个方便的地方,以便在析构函数~golden_aio_handler()访问或者用于其他用途。这是在这个类中实现的socket句柄生命期管理的一部分。 要发起I/O,必须初始化所需的I/O工厂对象。在存储了socket句柄之后,open()方法会初始化reader_和writer_ I/O工厂对象,为发起I/O操作做准备。两个类的open()方法都是一样的: int open (ACE_Handler &handler, ACE_HANDLE handle = ACE_INVALID_HANDLE,宋体; font-size:14px"> const void *completion_key = 0,宋体; font-size:14px"> ACE_Proactor *proactor = 0); 第一个参数表示工厂对象所发起的操作的完成事件处理器里。当通过工厂对象发起的I/O操作完成时,ACE_Proactor框架会回调这个对象。这也是为什么该处理器对象叫做完成事件处理器的原因。在我们的程序中,golden_aio_handler对象是ACE_Handler的后代,即是读操作也是写操作的完成事件处理器,所以*this被用作处理器参数。handle是新传入的socket句柄,completion_key参数只适用于windows默认传入0即可,proactor参数会传入一个在进程范围的ACE_Procator单体对象。 程序中的open()挂钩方法所做的最后一件事情,是调用ACE_Asynch_Read_Stream::read()方法,从而在新的socket上发起一个读操作。ACE_Asynch_Read_Stream::read()函数如下: int read (ACE_Message_Block &message_block,宋体; font-size:14px"> size_t num_bytes_to_read,宋体; font-size:14px"> const void *act = 0,宋体; font-size:14px"> int priority = 0,宋体; font-size:14px"> int signal_number = ACE_SIGRTMIN); 为传输指定一个ACE_Message_Block,使得缓冲区管理变得更为容易,因为可以利用ACE_Message_Block的各种能力,以及它与ACE的其他部分的集成。在发起读操作时,数据会被读入开始于数据块的写指针处在的块中,因为要被读取的数据将被写入块中。 完成I/O操作ACE_Proactor框架是基于事件的框架。I/O工厂登记“每个操作”与“该操作完成时应回调的完成事件处理器”之间建立关联。当读取完成时,ACE_Proactor框架会调用ACE_Handler::handle_read_stream()挂钩方法: void golden_aio_handler::handle_read_stream( const ACE_Asynch_Read_Stream::Result &result) { ACE_Asynch_Read_Stream::Result &result if (!result.success () || result.bytes_transferred () == 0) delete this; else if (result.bytes_transferred () < result.bytes_to_read ()) delete this ; else if (mblk_->length () == SIZEOF_HEADER_WITH_CRC) handle_msg_header(); else { if (handle_msg_pack()<0) 传入的ACE_Asynch_Read_Stream::Result指向的是用于保存读取操作结果的对象。每个I/O工厂类都会定义自己的Result类,即用于保存每个操作发起时所用的参数,又用于保存操作的结果。 如果读操作读取了任何数据,处理接收read到的报文数据包用handle_msg_pack函数,然后发起一个写操作,把数据处理结果返回给对端。当写操作完成时,ACE_Proactor框架调用下面的handle_write_stream方法: void golden_aio_handler::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result) if(reader_.read (*mblk_,SIZEOF_HEADER_WITH_CRC) < 0) delete this; 不管写操作是否成功完成,在该操作中使用的消息块都会释放。如果socket出了问题,先前发起的读操作也会完成并出错,而handle_read_stream()会清理对象和socket句柄。 图1-2给出了本程序事件序列。 、建立连接ACE提供里两个工厂类,用于通过ACE_Proactor框架前摄式地建立TCP/IP连接: ACE_Asynch_Acceptor, 用于发起被动的连接建立 ACE_Asynch_Connector,用于发起主动的连接建立 图1.2ACE Proactor异步回调序列图 当使用其中一个类建立TCP/IP连接时,ACE_Proactor框架会创建一个从ACE_Service_Handler派生的事件服务处理器,比如golden_aio_handler,用以处理新连接。ACE_Service_Handler类是 ACE_Proactor框架中所有用异步方式连接的服务的基类,从ACE_Handler派生,所以服务类也可以处理在服务中发起的I/O操作的完成. ACE_Asynch_Acceptor是一个相当容易使用的类,它的一个挂钩方法是一个protected虚方法:make_handler()。Proactor框架调用这个方法获取一个ACE_Service_Handler对象,用以为新连接提供服务。下面的代码说明了这种情况: golden_aio_handler * golden_aio_acceptor::make_handler (void) ///来一个连接,就新增一个句柄。在线程池中处理 golden_aio_handler *ih; ACE_NEW_RETURN (ih,golden_aio_handler (this),0); if (clients_.insert (ih) == -1) delete ih ; return NULL ; return ih; return 0 ; ACE_Proactor完成多路分离器 ACE_Proactor类负责驱动ACE_Proactor框架的完成处理,这个类等待完成事件的发生、把这些事件多路分离给相关联的完成事件处理器,并分派每个完成处理器上适当的挂钩方法。因此,要让异步I/O完成事件处理器得以发生----无论是I/O还是连接建立----在Golden Server中都必须运行前摄器的时间循环。这通常很简单,只需要把下面的代码插入到应用中就可以了: int golden_aio::svc() ACE_Proactor::instance()->proactor_run_event_loop (); return 1 ; 可以通过两种方式来使用ACE_Proactor,如上所示的程序代码instance(),作为单体来使用。也可以通过实例化一个或多个实例来使用。这个能力被用于在一个进程中支持多个前摄器。如下代码所示,这是应用于镜像发送和镜像接收的前摄器 int golden_mirror_sender::svc() proactor_sender_->proactor_run_event_loop (); int golden_mirror_receiver::svc() proactor_recviver_->proactor_run_event_loop (); 各种操作系统上的异步I/O设施会有很大的不同,为了在所有这些系统上维持统一的接口和编程方法,ACE_Proactor类使用了Bridge模式来维持灵活性和可扩展性,同时还使得ACE_Proactor框架能够使用不同的异步I/O实现。 ACE_WIN32_Proactor类是Windows上的ACE_Proactor实现。使用了I/O完成端口进行完成事件检测。在初始化异步操作工厂时,I/O句柄与前摄器的I/O完成端口被关联在一起。在这种实现中,windows下的GetQueuedCompletionStatus()函数负责执行事件循环,如下程序代码 int golden_server::create_proactor() ACE_Proactor::instance()->close_singleton(); impl_ = new ACE_WIN32_Proactor(0,1); ACE_Proactor::instance(new ACE_Proactor(impl_,1),1) ; return 0; int golden_server::create_proactor_mirror_recviver() ACE_NEW_RETURN(impl_mirror_recviver_,ACE_WIN32_Proactor(0,-1); ACE_NEW_RETURN(mirror_recviver_proactor_,ACE_Proactor(impl_mirror_recviver_,宋体; font-size:14px"> int golden_server::create_proactor_mirror_sender() ACE_NEW_RETURN(impl_mirror_sender_,宋体; font-size:14px"> ACE_NEW_RETURN(mirror_sender_proactor_,ACE_Proactor(impl_mirror_sender_,宋体; font-size:14px"> } |