加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 百科 > 正文

Reactor模式和NIO

发布时间:2020-12-15 05:29:51 所属栏目:百科 来源:网络整理
导读:本文可看成是对Doug Lea Scalable IO in Java一文的翻译。 当前分布式计算 Web Services盛行天下,这些网络服务的底层都离不开对socket的操作。他们都有一个共同的结构: 1. Read request 2. Decode request 3. Process service 4. Encode reply 5. Send rep
本文可看成是对Doug Lea Scalable IO in Java一文的翻译。

当前分布式计算 Web Services盛行天下,这些网络服务的底层都离不开对socket的操作。他们都有一个共同的结构:
1. Read request
2. Decode request
3. Process service
4. Encode reply
5. Send reply

经典的网络服务的设计如下图,在每个线程中完成对数据的处理:

但这种模式在用户负载增加时,性能将下降非常的快。我们需要重新寻找一个新的方案,保持数据处理的流畅,很显然,事件触发机制是最好的解决办法,当有事件发生时,会触动handler,然后开始数据的处理。

Reactor模式类似于AWT中的Event处理:

Reactor模式参与者

1.Reactor 负责响应IO事件,一旦发生,广播发送给相应的Handler去处理,这类似于AWT的thread
2.Handler 是负责非堵塞行为,类似于AWT ActionListeners;同时负责将handlers与event事件绑定,类似于AWT addActionListener

如图:

Java的NIO为reactor模式提供了实现的基础机制,它的Selector当发现某个channel有数据时,会通过SlectorKey来告知我们,在此我们实现事件和handler的绑定。

我们来看看Reactor模式代码:

Java代码
  1. publicclassReactorimplementsRunnable{
  2.   finalSelectorselector;
  3.   finalServerSocketChannelserverSocket;
  4.   Reactor(intport)throwsIOException{
  5.     selector=Selector.open();
  6.     serverSocket=ServerSocketChannel.open();
  7.     InetSocketAddressaddress=newInetSocketAddress(InetAddress.getLocalHost(),port);
  8.     serverSocket.socket().bind(address);
  9.     serverSocket.configureBlocking(false);
  10.     //向selector注册该channel
  11.     SelectionKeysk=serverSocket.register(selector,SelectionKey.OP_ACCEPT);
  12.     logger.debug("-->StartserverSocket.register!");
  13.     //利用sk的attache功能绑定Acceptor如果有事情,触发Acceptor
  14.     sk.attach(newAcceptor());
  15.     logger.debug("-->attach(newAcceptor()!");
  16.   }
  17.   publicvoidrun(){//normallyinanewThread
  18.     try{
  19.     while(!Thread.interrupted())
  20.     {
  21.       selector.select();
  22.       Setselected=selector.selectedKeys();
  23.       Iteratorit=selected.iterator();
  24.       //Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
  25.       while(it.hasNext())
  26.         //来一个事件第一次触发一个accepter线程
  27.         //以后触发SocketReadHandler
  28.         dispatch((SelectionKey)(it.next()));
  29.         selected.clear();
  30.       }
  31.     }catch(IOExceptionex){
  32.         logger.debug("reactorstop!"+ex);
  33.     }
  34.   }
  35.   //运行Acceptor或SocketReadHandler
  36.   voiddispatch(SelectionKeyk){
  37.     Runnabler=(Runnable)(k.attachment());
  38.     if(r!=null){
  39.       //r.run();
  40.     }
  41.   }
  42.   classAcceptorimplementsRunnable{//inner
  43.     publicvoidrun(){
  44.     try{
  45.       logger.debug("-->readyforaccept!");
  46.       SocketChannelc=serverSocket.accept();
  47.       if(c!=null)
  48.         //调用Handler来处理channel
  49.         newSocketReadHandler(selector,c);
  50.       }
  51.     catch(IOExceptionex){
  52.       logger.debug("acceptstop!"+ex);
  53.     }
  54.     }
  55.   }
  56. }

以上代码中巧妙使用了SocketChannel的attach功能,将Hanlder和可能会发生事件的channel链接在一起,当发生事件时,可以立即触发相应链接的Handler。

再看看Handler代码:

Java代码
  1. publicclassSocketReadHandlerimplementsRunnable{
  2.   publicstaticLoggerlogger=Logger.getLogger(SocketReadHandler.class);
  3.   privateTesttest=newTest();
  4.   finalSocketChannelsocket;
  5.   finalSelectionKeysk;
  6.   staticfinalintREADING=0,SENDING=1;
  7.   intstate=READING;
  8.   publicSocketReadHandler(Selectorsel,SocketChannelc)
  9.     throwsIOException{
  10.     socket=c;
  11.     socket.configureBlocking(false);
  12.     sk=socket.register(sel,0);
  13.     //将SelectionKey绑定为本Handler下一步有事件触发时,将调用本类的run方法。
  14.     //参看dispatch(SelectionKeyk)
  15.     sk.attach(this);
  16.     //同时将SelectionKey标记为可读,以便读取。
  17.     sk.interestOps(SelectionKey.OP_READ);
  18.     sel.wakeup();
  19.   }
  20.   publicvoidrun(){
  21.     try{
  22.     //test.read(socket,input);
  23.       readRequest();
  24.     }catch(Exceptionex){
  25.     logger.debug("readRequesterror"+ex);
  26.     }
  27.   }
  28. /**
  29. *处理读取data
  30. *@paramkey
  31. *@throwsException
  32. */
  33. privatevoidreadRequest()throwsException{
  34.   ByteBufferinput=ByteBuffer.allocate(1024);
  35.   input.clear();
  36.   try{
  37.     intbytesRead=socket.read(input);
  38.     ......
  39.     //激活线程池处理这些request
  40.     requestHandle(newRequest(socket,btt));
  41.     .....
  42.   }catch(Exceptione){
  43.   }
  44. }

注意在Handler里面又执行了一次attach,这样,覆盖前面的Acceptor,下次该Handler又有READ事件发生时,将直接触发Handler.从而开始了数据的读 处理、写、发出等流程处理。

将数据读出后,可以将这些数据处理线程做成一个线程池,这样,数据读出后,立即扔到线程池中,这样加速处理速度:

进一步,我们可以使用多个Selector分别处理连接和读事件。一个高性能的Java网络服务机制就要形成,激动人心的集群并行计算即将实现。

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读