本文可看成是对Doug Lea Scalable IO in Java一文的翻译。
当前分布式计算 Web Services盛行天下,这些网络服务的底层都离不开对socket的操作。他们都有一个共同的结构: 1. Read request 2. Decode request 3. Process service 4. Encode reply 5. Send reply
经典的网络服务的设计如下图,在每个线程中完成对数据的处理:
但这种模式在用户负载增加时,性能将下降非常的快。我们需要重新寻找一个新的方案,保持数据处理的流畅,很显然,事件触发机制是最好的解决办法,当有事件发生时,会触动handler,然后开始数据的处理。
Reactor模式类似于AWT中的Event处理:
Reactor模式参与者
1.Reactor 负责响应IO事件,一旦发生,广播发送给相应的Handler去处理,这类似于AWT的thread 2.Handler 是负责非堵塞行为,类似于AWT ActionListeners;同时负责将handlers与event事件绑定,类似于AWT addActionListener
如图:
Java的NIO为reactor模式提供了实现的基础机制,它的Selector当发现某个channel有数据时,会通过SlectorKey来告知我们,在此我们实现事件和handler的绑定。
我们来看看Reactor模式代码:
- publicclassReactorimplementsRunnable{
- finalSelectorselector;
- finalServerSocketChannelserverSocket;
- Reactor(intport)throwsIOException{
- selector=Selector.open();
- serverSocket=ServerSocketChannel.open();
- InetSocketAddressaddress=newInetSocketAddress(InetAddress.getLocalHost(),port);
- serverSocket.socket().bind(address);
- serverSocket.configureBlocking(false);
-
- SelectionKeysk=serverSocket.register(selector,SelectionKey.OP_ACCEPT);
- logger.debug("-->StartserverSocket.register!");
-
- sk.attach(newAcceptor());
- logger.debug("-->attach(newAcceptor()!");
- }
- publicvoidrun(){
- try{
- while(!Thread.interrupted())
- {
- selector.select();
- Setselected=selector.selectedKeys();
- Iteratorit=selected.iterator();
-
- while(it.hasNext())
-
-
- dispatch((SelectionKey)(it.next()));
- selected.clear();
- }
- }catch(IOExceptionex){
- logger.debug("reactorstop!"+ex);
- }
- }
-
- voiddispatch(SelectionKeyk){
- Runnabler=(Runnable)(k.attachment());
- if(r!=null){
-
- }
- }
- classAcceptorimplementsRunnable{
- publicvoidrun(){
- try{
- logger.debug("-->readyforaccept!");
- SocketChannelc=serverSocket.accept();
- if(c!=null)
-
- newSocketReadHandler(selector,c);
- }
- catch(IOExceptionex){
- logger.debug("acceptstop!"+ex);
- }
- }
- }
- }
以上代码中巧妙使用了SocketChannel的attach功能,将Hanlder和可能会发生事件的channel链接在一起,当发生事件时,可以立即触发相应链接的Handler。
再看看Handler代码:
- publicclassSocketReadHandlerimplementsRunnable{
- publicstaticLoggerlogger=Logger.getLogger(SocketReadHandler.class);
- privateTesttest=newTest();
- finalSocketChannelsocket;
- finalSelectionKeysk;
- staticfinalintREADING=0,SENDING=1;
- intstate=READING;
- publicSocketReadHandler(Selectorsel,SocketChannelc)
- throwsIOException{
- socket=c;
- socket.configureBlocking(false);
- sk=socket.register(sel,0);
-
-
- sk.attach(this);
-
- sk.interestOps(SelectionKey.OP_READ);
- sel.wakeup();
- }
- publicvoidrun(){
- try{
-
- readRequest();
- }catch(Exceptionex){
- logger.debug("readRequesterror"+ex);
- }
- }
- privatevoidreadRequest()throwsException{
- ByteBufferinput=ByteBuffer.allocate(1024);
- input.clear();
- try{
- intbytesRead=socket.read(input);
- ......
-
- requestHandle(newRequest(socket,btt));
- .....
- }catch(Exceptione){
- }
- }
注意在Handler里面又执行了一次attach,这样,覆盖前面的Acceptor,下次该Handler又有READ事件发生时,将直接触发Handler.从而开始了数据的读 处理、写、发出等流程处理。
将数据读出后,可以将这些数据处理线程做成一个线程池,这样,数据读出后,立即扔到线程池中,这样加速处理速度:
进一步,我们可以使用多个Selector分别处理连接和读事件。一个高性能的Java网络服务机制就要形成,激动人心的集群并行计算即将实现。 (编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|