Mina主体逻辑流程
Mina也是一个one loop per thread的Reactor框架,关于这部分的知识可以看看《muduo网络库》这本书,Mina的优化什么的我看的不是很仔细,而且很多看不懂。这一篇博客主要从上层代码走一下Mina的主要逻辑流程。 简单介绍Mina有几个主要的组件,分别是IoService,IoBuffer,IoFilter,IoHandler,IoSession,IoFuture(这部分简要介绍了参考资料中关于Mina的资料,可详细阅读参考资料) Mina的命名规范都是一个IoXXX接口,然后AbstractIoXXX定义主要逻辑过程的抽象类,然后就是具体的实现一般是NIOXXX
整体代码流程public class MinaAcceptorThread implements Runnable {
@Override
public void run() {
MinaClientHandler handler = new MinaClientHandler();
NioSocketAcceptor acceptor = new NioSocketAcceptor();
acceptor.setReuseAddress(true);
acceptor.getFilterChain().addLast("protocol",new ProtocolCodecFilter(new IMCodeFactory(false)));
acceptor.setDefaultLocalAddress(new InetSocketAddress(PathConstant.PORT));
acceptor.setHandler(handler);
try {
acceptor.bind();
} catch (IOException e) {
e.printStackTrace();
}
}
}
NioSocketAcceptor会在初始化时调用 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,Class<? extends IoProcessor<S>> processorClass) {
this(sessionConfig,null,new SimpleIoProcessorPool<S>(processorClass),true);
}
NioSocketAcceptor中的selector只负责接收新的连接,具体的one loop per thread是由SimpleIoProcessorPool实现的。 SimpleIoProcessorPool中默认的线程池 NioSocketAcceptor接收到新的连接时执行 if (selected > 0) {
// We have some connection request,let's process
// them here.
processHandles(selectedHandles());
}
初始化session的过程没有细看 private void processHandles(Iterator<H> handles) throws Exception {
while (handles.hasNext()) {
H handle = handles.next();
handles.remove();
// Associates a new created connection to a processor,
// and get back a session
S session = accept(processor,handle);
if (session == null) {
break;
}
initSession(session,null);
// add the session to the SocketIoProcessor
session.getProcessor().add(session);
}
}
加入SimpleIoProcessorPool的时候会将session与processor绑定,也就是连接绑定processor。 private IoProcessor<S> getProcessor(S session) {
IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);
if (processor == null) {
if (disposed || disposing) {
throw new IllegalStateException("A disposed processor cannot be accessed.");
}
processor = pool[Math.abs((int) session.getId()) % pool.length];
if (processor == null) {
throw new IllegalStateException("A disposed processor cannot be accessed.");
}
session.setAttributeIfAbsent(PROCESSOR,processor);
}
return processor;
}
SimpleIoProcessorPool中的pool是提前生成的CPU个数+1的NIOProcessor public final void add(S session) {
if (disposed || disposing) {
throw new IllegalStateException("Already disposed.");
}
// Adds the session to the newSession queue and starts the worker
newSessions.add(session);
startupProcessor();
}
接下来就是执行AbstractPollingIoProcessor中内部的Processor类,也就是在线程池中 private void startupProcessor() {
Processor processor = processorRef.get();
if (processor == null) {
processor = new Processor();
if (processorRef.compareAndSet(null,processor)) {
executor.execute(new NamePreservingRunnable(processor,threadName));
}
}
// Just stop the select() and start it again,so that the processor
// can be activated immediately.
wakeup();
}
这部分也就是每个NIO具体在select分配过来的网络连接 nSessions += handleNewSessions();//创建FilterChain过程
....
//处理过程
if (selected > 0) {
process();
}
具体处理数据的过程 private void process(S session) {
// Process Reads
if (isReadable(session) && !session.isReadSuspended()) {
read(session);
}
// Process writes
if (isWritable(session) && !session.isWriteSuspended()) {
// add the session to the queue,if it's not already there
if (session.setScheduledForFlush(true)) {
flushingSessions.add(session);
}
}
}
以read为例 private void read(S session) {
...
if (readBytes > 0) {
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireMessageReceived(buf);
buf = null;
if (hasFragmentation) {
if (readBytes << 1 < config.getReadBufferSize()) {
session.decreaseReadBufferSize();
} else if (readBytes == config.getReadBufferSize()) {
session.increaseReadBufferSize();
}
}
}
...
调用IoFilterChain就开始了对数据解析以及最终用户通过IoHandler得到具体的网络通信数据。FilterChain的流程在DefaultIoFilterChain中定义 public void fireMessageReceived(Object message) {
if (message instanceof IoBuffer) {
session.increaseReadBytes(((IoBuffer) message).remaining(),System
.currentTimeMillis());
}
Entry head = this.head;
callNextMessageReceived(head,session,message);
}
private void callNextMessageReceived(Entry entry,IoSession session,Object message) {
try {
IoFilter filter = entry.getFilter();
NextFilter nextFilter = entry.getNextFilter();
filter.messageReceived(nextFilter,message);
} catch (Throwable e) {
fireExceptionCaught(e);
}
}
CumulativeProtocolDecoder可以帮助你积累未收完的一条消息,doDecode返回false会使用IoBuffer缓存该消息。 public void messageReceived(NextFilter nextFilter,Object message) throws Exception {
LOGGER.debug( "Processing a MESSAGE_RECEIVED for session {}",session.getId() );
if (!(message instanceof IoBuffer)) {
nextFilter.messageReceived(session,message);
return;
}
IoBuffer in = (IoBuffer) message;
ProtocolDecoder decoder = factory.getDecoder(session);
ProtocolDecoderOutput decoderOut = getDecoderOut(session,nextFilter);
// Loop until we don't have anymore byte in the buffer,
// or until the decoder throws an unrecoverable exception or
// can't decoder a message,because there are not enough
// data in the buffer
while (in.hasRemaining()) {
int oldPos = in.position();
try {
synchronized (decoderOut) {
// Call the decoder with the read bytes
decoder.decode(session,in,decoderOut);
}
// Finish decoding if no exception was thrown.
decoderOut.flush(nextFilter,session);
} catch (Throwable t) {
...
最后看看在DefaultIoFilterChain中如何实现IoFilterChain与IoHandler结合成一个链条。DefaultIoFilterChain中的Entry最后由DefaultIoFilterChainImpl中buildFilterChain把Entry插入IoFilterChain。Entry中包含了IoFilter,Entry是IoFilterChain的包装 接上面创建FilterChain过程 nSessions += handleNewSessions();
handlerNewSessions()->addNow()会将初始化时addLast的IoFilterChain(也就是一开始代码中的acceptor.getFilterChain().addLast(“protocol”,new ProtocolCodecFilter(new IMCodeFactory(false)));) 通过bulider生成DefaultIoFilterChain中Entry的链子 public void buildFilterChain(IoFilterChain chain) throws Exception {
for (Entry e : entries) {
chain.addLast(e.getName(),e.getFilter());
}
}
DefaultIoFilterChain在初始化时会 public DefaultIoFilterChain(AbstractIoSession session) {
if (session == null) {
throw new IllegalArgumentException("session");
}
this.session = session;
head = new EntryImpl(null,"head",new HeadFilter());
tail = new EntryImpl(head,"tail",new TailFilter());
head.nextEntry = tail;
}
Head主要是flush发送缓冲区(具体关于send的过程没有详细阅读) if (!s.isWriteSuspended()) {
s.getProcessor().flush(s);
}
Tail是IoHandler,看一眼就明白了 private static class TailFilter extends IoFilterAdapter {
@Override
public void sessionCreated(NextFilter nextFilter,IoSession session)
throws Exception {
try {
session.getHandler().sessionCreated(session);
} finally {
// Notify the related future.
ConnectFuture future = (ConnectFuture) session
.removeAttribute(SESSION_CREATED_FUTURE);
if (future != null) {
future.setSession(session);
}
}
}
疑惑:
下一篇博客内容
参考资料系列介绍 Mina官方文档 资料 资料 Mina实现自定义应用层协议 Java NIO基础 多selector介绍 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |