import java.io.IOException;
java.net.InetAddress;
java.net.InetSocketAddress;
java.nio.channels.SelectionKey;
java.nio.channels.Selector;
java.nio.channels.ServerSocketChannel;
java.util.Iterator;
java.util.Set;
/**
* 反应器模式 用于解决多用户访问并发问题
*
* 举个例子:餐厅服务问题
*
* 传统线程池做法:来一个客人(请求)去一个服务员(线程)
* 反应器模式做法:当客人点菜的时候,服务员就可以去招呼其他客人了,等客人点好了菜,直接招呼一声:服务员
*
* @author linxcool
*/
public class Reactor implements Runnable {
final Selector selector;
ServerSocketChannel serverSocketChannel;
public Reactor(int port) throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress inetSocketAddress = new InetSocketAddress(
InetAddress.getLocalHost(),port);
serverSocketChannel.socket().bind(inetSocketAddress);
serverSocketChannel.configureBlocking(false);
// 向selector注册该channel
SelectionKey selectionKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
利用selectionKey的attache功能绑定Acceptor 如果有事情,触发Acceptor
selectionKey.attach(new Acceptor(this));
}
@Override
void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
while (it.hasNext()) {
来一个事件 第一次触发一个accepter线程
以后触发SocketReadHandler
SelectionKey selectionKey = it.next();
dispatch(selectionKey);
selectionKeys.clear();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
* 运行Acceptor或SocketReadHandler
*
* @param key
*/
dispatch(SelectionKey key) {
Runnable r = (Runnable) (key.attachment());
if (r != null) {
r.run();
}
}
}
java.nio.channels.SocketChannel;
class Acceptor
private Reactor reactor;
public Acceptor(Reactor reactor) {
this.reactor =
reactor;
}
@Override
{
SocketChannel socketChannel =
reactor.serverSocketChannel.accept();
if (socketChannel !=
null)
调用Handler来处理channel
SocketReadHandler(reactor.selector,socketChannel);
} (IOException e) {
e.printStackTrace();
}
}
}
class SocketReadHandler SocketChannel socketChannel;
SocketReadHandler(Selector selector,SocketChannel socketChannel)
IOException {
this.socketChannel = socketChannel;
socketChannel.configureBlocking();
SelectionKey selectionKey = socketChannel.register(selector,0 将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。
参看dispatch(SelectionKey key)
selectionKey.attach( 同时将SelectionKey标记为可读,以便读取。
selectionKey.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
* 处理读取数据
*/
@Override
run() {
ByteBuffer inputBuffer = ByteBuffer.allocate(1024);
inputBuffer.clear();
{
socketChannel.read(inputBuffer);
激活线程池 处理这些request
requestHandle(new Request(socket,btt));
}