加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 百科 > 正文

Reactor 模式的 JAVA NIO 多线程服务器

发布时间:2020-12-15 04:57:27 所属栏目:百科 来源:网络整理
导读:JAVA NIO 多线程服务器 1.2版 Reactor 模式的 JAVA NIO 多线程服务器 从 JDK 1.4开始,Java的标准库中就包含了NIO,即所谓的“New IO”。其中最重要的功能就是提供了“非阻塞”的IO,当然包括了Socket。NonBlocking的IO就是对select(Unix平台下)以及 WaitFor

JAVA NIO 多线程服务器 1.2版
Reactor 模式的 JAVA NIO 多线程服务器

JDK 1.4开始,Java的标准库中就包含了NIO,即所谓的“New IO”。其中最重要的功能就是提供了“非阻塞”的IO,当然包括了Socket。NonBlocking的IO就是对select(Unix平台下)以及 WaitForMultipleObjects(Windows平台)的封装,提供了高性能、易伸缩的服务架构。

说来惭愧,直到JDK1.4才有这种功能,但迟到者不一定没有螃蟹吃,NIO就提供了优秀的面向对象的解决方案,可以很方便地编写高性能的服务器。

话说回来,传统的Server/Client实现是基于Thread per request,即服务器为每个客户端请求建立一个线程处理,单独负责处理一个客户的请求。比如像Tomcat(新版本也会提供NIO方案)、Resin等Web服务器就是这样实现的。当然为了减少瞬间峰值问题,服务器一般都使用线程池,规定了同时并发的最大数量,避免了线程的无限增长。

但这样有一个问题:如果线程池的大小为100,当有100个用户同时通过HTTP现在一个大文件时,服务器的线程池会用完,因为所有的线程都在传输大文件了,即使第101个请求者仅仅请求一个只有10字节的页面,服务器也无法响应了,只有等到线程池中有空闲的线程出现。

另外,线程的开销也是很大的,特别是达到了一个临界值后,性能会显著下降,这也限制了传统的Socket方案无法应对并发量大的场合,而“非阻塞”的IO就能轻松解决这个问题。

下面只是一个简单的例子:服务器提供了下载大型文件的功能,客户端连接上服务器的12345端口后,就可以读取服务器发送的文件内容信息了。注意这里的服务器只有一个主线程,没有其他任何派生线程,让我们看看NIO是如何用一个线程处理N个请求的。

NIO服务器最核心的一点就是反应器模式:当有感兴趣的事件发生的,就通知对应的事件处理器去处理这个事件,如果没有,则不处理。所以使用一个线程做轮询就可以了。当然这里这是个例子,如果要获得更高性能,可以使用少量的线程,一个负责接收请求,其他的负责处理请求,特别是对于多CPU时效率会更高。

关于使用NIO过程中出现的问题,最为普遍的就是为什么没有请求时CPU的占用率为100%?出现这种问题的主要原因是注册了不感兴趣的事件,比如如果没有数据要发到客户端,而又注册了写事件(OP_WRITE),则在 Selector.select()上就会始终有事件出现,CPU就一直处理了,而此时select()应该是阻塞的。

线程模型

NIO 的选择器采用了多路复用(Multiplexing)技术,可在一个选择器上处理多个套接字,通过获取读写通道来进行 IO 操作。由于网络带宽等原因,在通道的读、写操作中是容易出现等待的,所以在读、写操作中引入多线程,对性能提高明显,而且可以提高客户端的感知服务质量。所以本文的模型将主要通过使用读、写线程池来提高与客户端的数据交换能力。

如下图所示,服务端接受客户端请求后,控制线程将该请求的读通道交给读线程池,由读线程池分配线程完成对客户端数据的读取操作;当读线程完成读操作后,将数据返回控制线程,进行服务端的业务处理;完成业务处理后,将需回应给客户端的数据和写通道提交给写线程池,由写线程完成向客户端发送回应数据的操作。


(NIO 多线程服务器模型)

同时整个服务端的流程处理,建立于事件机制上。在 [接受连接->读->业务处理->写 >关闭连接 ]这个过程中,触发器将触发相应事件,由事件处理器对相应事件分别响应,完成服务器端的业务处理。
下面我们就来详细看一下这个模型的各个组成部分。

public class MiniServer extends Thread
{
private static final Log log = LogFactory.getLog(MiniServer.class);

private final Selector s;
private final ServerSocketChannel ssc;
private ExecutorService executor;

public MiniServer(int portnumber,ExecutorService executor) throws IOException
{
this.executor=executor;
s = Selector.open();
ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(portnumber));
ssc.configureBlocking(false);
ssc.register(s,SelectionKey.OP_ACCEPT);
}

public void run()
{
try
{
while(s.isOpen())
{
int nKeys=s.select();
if(nKeys>0)
{
Iterator<SelectionKey> it = s.selectedKeys().iterator();
while (it.hasNext())
{
SelectionKey key = it.next();
it.remove();
if (!key.isValid() || !key.channel().isOpen())
continue;
if(key.isAcceptable())
{
SocketChannel sc = ssc.accept();
if (sc != null)
{
sc.configureBlocking(false);
sc.register(s,SelectionKey.OP_READ,new Reader(executor));
}
}
else if(key.isReadable()||key.isWritable())
{
Reactor reactor = (Reactor) key.attachment();
reactor.execute(key);
}
}
}
}
}
catch(IOException e)
{
log.info(e);
}
}
}


public interface Reactor
{
void execute(SelectionKey key);
}


public class Reader implements Reactor
{
private static final Log log = LogFactory.getLog(Reader.class);

private byte[] bytes=new byte[0];
private ExecutorService executor;

public Reader(ExecutorService executor)
{
this.executor=executor;
}

@Override
public void execute(SelectionKey key)
{
SocketChannel sc = (SocketChannel) key.channel();
try
{
ByteBuffer buffer=ByteBuffer.allocate(1024);
int len=-1;
while(sc.isConnected() && (len=sc.read(buffer))>0)
{
buffer.flip();
byte [] content = new byte[buffer.limit()];
buffer.get(content);
bytes=NutUtil.ArrayCoalition(bytes,content);
buffer.clear();
}
if(len==0)
{
key.interestOps(SelectionKey.OP_READ);
key.selector().wakeup();
}
else if(len==-1)
{
Callable<byte[]> call=new ProcessCallable(bytes);
Future<byte[]> task=executor.submit(call);
ByteBuffer output=ByteBuffer.wrap(task.get());
sc.register(key.selector(),SelectionKey.OP_WRITE,new Writer(output));
}
}
catch(Exception e)
{
log.info(e);
}
}
}

public class Writer implements Reactor { private static final Log log = LogFactory.getLog(Writer.class); private ByteBuffer output; public Writer(ByteBuffer output) { this.output=output; } public void execute(SelectionKey key) { SocketChannel sc = (SocketChannel) key.channel(); try { while(sc.isConnected() && output.hasRemaining()) { int len=sc.write(output); if(len<0) { throw new EOFException(); } if(len==0) { key.interestOps(SelectionKey.OP_WRITE); key.selector().wakeup(); break; } } if(!output.hasRemaining()) { output.clear(); key.cancel(); sc.close(); } } catch(IOException e) { log.info(e); } } }

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读