本文可看成是对Doug Lea Scalable IO in Java一文的翻译。
当前分布式计算 Web Services盛行天下,这些网络服务的底层都离不开对socket的操作。他们都有一个共同的结构: 1. Read request 2. Decode request 3. Process service 4. Encode reply 5. Send reply
经典的网络服务的设计如下图,在每个线程中完成对数据的处理:
但这种模式在用户负载增加时,性能将下降非常的快。我们需要重新寻找一个新的方案,保持数据处理的流畅,很显然,事件触发机制是最好的解决办法,当有事件发生时,会触动handler,然后开始数据的处理。
Reactor模式类似于AWT中的Event处理:
Reactor模式参与者
1.Reactor 负责响应IO事件,一旦发生,广播发送给相应的Handler去处理,这类似于AWT的thread 2.Handler 是负责非堵塞行为,类似于AWT ActionListeners;同时负责将handlers与event事件绑定,类似于AWT addActionListener
如图:
Java的NIO为reactor模式提供了实现的基础机制,它的Selector当发现某个channel有数据时,会通过SlectorKey来告知我们,在此我们实现事件和handler的绑定。
我们来看看Reactor模式代码:
- publicclassReactorimplementsRunnable{
-
- finalSelectorselector;
- finalServerSocketChannelserverSocket;
- Reactor(intport)throwsIOException{
- selector=Selector.open();
- serverSocket=ServerSocketChannel.open();
- InetSocketAddressaddress=newInetSocketAddress(InetAddress.getLocalHost(),port);
- serverSocket.socket().bind(address);
- serverSocket.configureBlocking(false);
-
- SelectionKeysk=serverSocket.register(selector,SelectionKey.OP_ACCEPT);
- logger.debug("-->StartserverSocket.register!");
- //利用sk的attache功能绑定Acceptor如果有事情,触发Acceptor
- sk.attach(newAcceptor());
- logger.debug("-->attach(newAcceptor()!");
- }
- voidrun(){
- try{
- while(!Thread.interrupted())
- {
- selector.select();
- Setselected=selector.selectedKeys();
- Iteratorit=selected.iterator();
-
- while(it.hasNext())
-
- //以后触发SocketReadHandler
- dispatch((SelectionKey)(it.next()));
- selected.clear();
- }
- }catch(IOExceptionex){
- logger.debug("reactorstop!"+ex);
- }
-
- voiddispatch(SelectionKeyk){
- Runnabler=(Runnable)(k.attachment());
- if(r!=null){
- //r.run();
- classAcceptorimplementsRunnable{
- voidrun(){
- logger.debug("-->readyforaccept!");
- SocketChannelc=serverSocket.accept();
- if(c!=null)
- //调用Handler来处理channel
- newSocketReadHandler(selector,c);
- logger.debug("acceptstop!"+ex);
- }
以上代码中巧妙使用了SocketChannel的attach功能,将Hanlder和可能会发生事件的channel链接在一起,当发生事件时,可以立即触发相应链接的Handler。
再看看Handler代码:
classSocketReadHandler staticLoggerlogger=Logger.getLogger(SocketReadHandler. class);
privateTesttest=newTest();
finalSocketChannelsocket;
finalSelectionKeysk;
staticfinalintREADING=0,SENDING=1;
intstate=READING;
publicSocketReadHandler(Selectorsel,SocketChannelc)
socket=c;
socket.configureBlocking( sk=socket.register(sel,0);
//将SelectionKey绑定为本Handler下一步有事件触发时,将调用本类的run方法。
//参看dispatch(SelectionKeyk)
this);
//同时将SelectionKey标记为可读,以便读取。
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
//test.read(socket,input);
readRequest();
catch(Exceptionex){
logger.debug("readRequesterror"+ex);
*处理读取data
*@paramkey
*@throwsException
*/
privatevoidreadRequest()throwsException{
ByteBufferinput=ByteBuffer.allocate(1024);
input.clear();
intbytesRead=socket.read(input);
......
//激活线程池处理这些request
requestHandle(newRequest(socket,btt));
.....
}catch(Exceptione){
注意在Handler里面又执行了一次attach,这样,覆盖前面的Acceptor,下次该Handler又有READ事件发生时,将直接触发Handler.从而开始了数据的读 处理、写、发出等流程处理。
将数据读出后,可以将这些数据处理线程做成一个线程池,这样,数据读出后,立即扔到线程池中,这样加速处理速度:
进一步,我们可以使用多个Selector分别处理连接和读事件。一个高性能的Java网络服务机制就要形成,激动人心的集群并行计算即将实现。 (编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|