加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 百科 > 正文

阻塞IO、非阻塞IO、同步IO、异步IO && Reactor模式

发布时间:2020-12-15 06:48:56 所属栏目:百科 来源:网络整理
导读:http://www.ivaneye.com/2016/07/23/iomodel.html 对于IO来说,我们听得比较多的是: BIO:阻塞IO NIO:非阻塞IO 同步IO 异步IO 以及其组合: 同步阻塞IO 同步非阻塞IO 异步阻塞IO 异步非阻塞IO 那么什么是阻塞IO、非阻塞IO、同步IO、异步IO呢? 一个IO操作其实

http://www.ivaneye.com/2016/07/23/iomodel.html

对于IO来说,我们听得比较多的是:

  • BIO:阻塞IO
  • NIO:非阻塞IO
  • 同步IO
  • 异步IO

以及其组合:

  • 同步阻塞IO
  • 同步非阻塞IO
  • 异步阻塞IO
  • 异步非阻塞IO

那么什么是阻塞IO、非阻塞IO、同步IO、异步IO呢?

  • 一个IO操作其实分成了两个步骤:发起IO请求(阻塞)实际的IO操作(同步)
  • 阻塞IO和非阻塞IO的区别在于第一步:发起IO请求是否会被阻塞,如果阻塞直到完成那么就是传统的阻塞IO;如果不阻塞,那么就是非阻塞IO
  • 同步IO和异步IO的区别就在于第二个步骤是否阻塞,如果实际的IO读写阻塞请求进程,那么就是同步IO,因此阻塞IO、非阻塞IO、IO复用、信号驱动IO都是同步IO;如果不阻塞,而是操作系统帮你做完IO操作再将结果返回给你,那么就是异步IO

举个不太恰当的例子 :比如你家网络断了,你打电话去中国电信报修!

  • 你拨号—客户端连接服务器
  • 电话通了—连接建立
  • 你说:“我家网断了,帮我修下”—发送消息
  • 说完你就在那里等,那么就是阻塞IO
  • 如果正好你有事,你放下带电话,然后处理其他事情了,过一会你来问下,修好了没—那就是非阻塞IO

  • 如果客服说:“马上帮你处理,你稍等”—同步IO
  • 如果客服说:“马上帮你处理,好了通知你”,然后挂了电话—异步IO

下面从代码层面看看BIO与NIO的流程!

BIO

  • 客户端代码
     
     
1
2
3
4
5
6
7
8
9
10
     
     
//Bind,Connect
Socket client = new Socket( "127.0.0.1",7777);
//读写
PrintWriter pw = new PrintWriter(client.getOutputStream());
BufferedReader br=
new BufferedReader( new InputStreamReader(System.in));
pw.write(br.readLine());
//Close
pw.close();
br.close();
  • 服务端代码
10
11
Socket socket;
ServerSocket ss = new ServerSocket( 7777);
while ( true) {
//Accept
socket = ss.accept();
//一般新建一个线程执行读写
BufferedReader br = new BufferedReader(
new InputStreamReader(socket .getInputStream()));
System.out.println( "you input is : " + br.readLine());
}
  • 上面的代码可以说是学习Java的Socket的入门级代码了
  • 代码流程和前面的图可以一一对上

模型图如下所示:

BIO优缺点

  • 优点
    • 模型简单
    • 编码简单
  • 缺点
    • 性能瓶颈低

优缺点很明显。这里主要说下缺点:主要瓶颈在线程上。每个连接都会建立一个线程。虽然线程消耗比进程小,但是一台机器实际上能建立的有效线程有限,以Java来说,1.5以后,一个线程大致消耗1M内存!且随着线程数量的增加,CPU切换线程上下文的消耗也随之增加,在高过某个阀值后,继续增加线程,性能不增反降!而同样因为一个连接就新建一个线程,所以编码模型很简单!

就性能瓶颈这一点,就确定了BIO并不适合进行高性能服务器的开发!像Tomcat这样的Web服务器,从7开始就从BIO改成了NIO,来提高服务器性能!

NIO

  • NIO客户端代码(连接)
8
     
     
//获取socket通道
SocketChannel channel = SocketChannel.open();
channel. configureBlocking( false);
//获得通道管理器
selector=Selector.open();
channel.connect( new InetSocketAddress(serverIp,port));
//为该通道注册SelectionKey.OP_CONNECT事件
channel.register(selector,SelectionKey.OP_CONNECT);
  • NIO客户端代码(监听)
11
12
13
14
15
16
17
18
19
20
21
while( true){
//选择注册过的io操作的事件(第一次为SelectionKey.OP_CONNECT)
selector.select();
while(SelectionKey key : selector.selectedKeys()){
if(key.isConnectable()){
SocketChannel channel=(SocketChannel)key.channel();
if(channel.isConnectionPending()){
channel.finishConnect(); //如果正在连接,则完成连接
}
channel.register(selector,SelectionKey.OP_READ);
} else if(key.isReadable()){ //有可读数据事件。
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer buffer = ByteBuffer.allocate( 10);
channel.read(buffer);
byte[] data = buffer.array();
String message = new String(data);
System.out.println( "recevie message from server:,size:"
+ buffer.position() + " msg: " + message);
}
}
}
  • NIO服务端代码(连接)
//获取一个ServerSocket通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel. configureBlocking( false);
serverChannel.socket().bind( new InetSocketAddress(port));
//获取通道管理器
selector = Selector.open();
//将通道管理器与通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,
serverChannel.register(selector,SelectionKey.OP_ACCEPT);
  • NIO服务端代码(监听)
21
22
23
//当有注册的事件到达时,方法返回,否则阻塞。
selector.select();
for(SelectionKey key : selector.selectedKeys()){
if(key.isAcceptable()){
ServerSocketChannel server =
(ServerSocketChannel)key.channel();
SocketChannel channel = server.accept();
channel.write(ByteBuffer.wrap(
new String( "send message to client").getBytes()));
//在与客户端连接成功后,为客户端通道注册SelectionKey.OP_READ事件。
channel.register(selector,93)">if(key.isReadable()){ //有可读数据事件
SocketChannel channel = (SocketChannel)key.channel();
ByteBuffer buffer = ByteBuffer.allocate( 10);
int read = channel.read(buffer);
"receive message from client,145)">" msg: " + message);
}
}
}

NIO模型示例如下:

  • Acceptor注册Selector,监听accept事件
  • 当客户端连接后,触发accept事件
  • 服务器构建对应的Channel,并在其上注册Selector,监听读写事件
  • 当发生读写事件后,进行相应的读写处理

NIO优缺点

  • 优点
    • 性能瓶颈高
  • 缺点
    • 模型复杂
    • 编码复杂
    • 需处理半包问题

NIO的优缺点和BIO就完全相反了!性能高,不用一个连接就建一个线程,可以一个线程处理所有的连接!相应的,编码就复杂很多,从上面的代码就可以明显体会到了。还有一个问题,由于是非阻塞的,应用无法知道什么时候消息读完了,就存在了半包问题!



半包问题

简单看一下下面的图就能理解半包问题了!

我们知道TCP/IP在发送消息的时候,可能会拆包(如上图1)!这就导致接收端无法知道什么时候收到的数据是一个完整的数据。例如:发送端分别发送了ABC,DEF,GHI三条信息,发送时被拆成了AB,CDRFG,H,I这四个包进行发送,接受端如何将其进行还原呢?在BIO模型中,当读不到数据后会阻塞,而NIO中不会!所以需要自行进行处理!例如,以换行符作为判断依据,或者定长消息发生,或者自定义协议!

NIO虽然性能高,但是编码复杂,且需要处理半包问题!为了方便的进行NIO开发,就有了Reactor模型!

Reactor模型

  • AWT Events

Reactor模型和AWT事件模型很像,就是将消息放到了一个队列中,通过异步线程池对其进行消费!

Reactor中的组件

  • Reactor:Reactor是IO事件的派发者。
  • Acceptor:Acceptor接受client连接,建立对应client的Handler,并向Reactor注册此Handler。
  • Handler:和一个client通讯的实体,按这样的过程实现业务的处理。一般在基本的Handler基础上还会有更进一步的层次划分, 用来抽象诸如decode,process和encoder这些过程。比如对Web Server而言,decode通常是HTTP请求的解析, process的过程会进一步涉及到Listener和Servlet的调用。业务逻辑的处理在Reactor模式里被分散的IO事件所打破, 所以Handler需要有适当的机制在所需的信息还不全(读到一半)的时候保存上下文,并在下一次IO事件到来的时候(另一半可读了)能继续中断的处理。为了简化设计,Handler通常被设计成状态机,按GoF的state pattern来实现。

对应上面的NIO代码来看:

  • Reactor:相当于有分发功能的Selector
  • Acceptor:NIO中建立连接的那个判断分支
  • Handler:消息读写处理等操作类

Reactor从线程池和Reactor的选择上可以细分为如下几种:

Reactor单线程模型

这个模型和上面的NIO流程很类似,只是将消息相关处理独立到了Handler中去了!

虽然上面说到NIO一个线程就可以支持所有的IO处理。但是瓶颈也是显而易见的!我们看一个客户端的情况,如果这个客户端多次进行请求,如果在Handler中的处理速度较慢,那么后续的客户端请求都会被积压,导致响应变慢!所以引入了Reactor多线程模型!

Reactor多线程模型

Reactor多线程模型就是将Handler中的IO操作和非IO操作分开,操作IO的线程称为IO线程,非IO操作的线程称为工作线程!这样的话,客户端的请求会直接被丢到线程池中,客户端发送请求就不会堵塞!

但是当用户进一步增加的时候,Reactor会出现瓶颈!因为Reactor既要处理IO操作请求,又要响应连接请求!为了分担Reactor的负担,所以引入了主从Reactor模型!

主从Reactor模型

主Reactor用于响应连接请求,从Reactor用于处理IO操作请求!

Netty

Netty是一个高性能NIO框架,其是对Reactor模型的一个实现!

  • Netty客户端代码
19
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE,true);
b.handler( new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast( new TimeClientHandler());
}
});
ChannelFuture f = b.connect(host,port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
  • Netty Client Handler
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
channelRead(ChannelHandlerContext ctx,Object msg) {
ByteBuf m = (ByteBuf) msg;
try {
long currentTimeMillis =
(m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println( new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
exceptionCaught Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
  • Netty服务端代码
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler( new ChannelInitializer<SocketChannel>() {
@Override
throws Exception {
ch.pipeline().addLast( new TimeServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG,179)">128)
.childOption(ChannelOption.SO_KEEPALIVE,93)">true);
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
  • Netty Server Handler
23
24
25
TimeServerHandler ChannelInboundHandlerAdapter {
channelActive(final ChannelHandlerContext ctx) {
final ByteBuf time = ctx.alloc().buffer( 4);
time.writeInt(( int)
(System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time);
f.addListener( new ChannelFutureListener() {
@Override
operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
});
}
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

我们从Netty服务器代码来看,与Reactor模型进行对应!

  • EventLoopGroup就相当于是Reactor,bossGroup对应主Reactor,workerGroup对应从Reactor
  • TimeServerHandler就是Handler
  • child开头的方法配置的是客户端channel,非child开头的方法配置的是服务端channel

具体Netty内容,请访问Netty官网!

Netty的问题

Netty开发中一个很明显的问题就是回调,一是打破了线性编码习惯,
二就是Callback Hell!

看下面这个例子:

3
a.doing1(); //1
a.doing2(); //2
a.doing3(); //3

1,2,3处代码如果是同步的,那么将按顺序执行!但是如果不是同步的呢?我还是希望2在1之后执行,3在2之后执行!怎么办呢?想想AJAX!我们需要写类似如下这样的代码!

9
a.doing1( new Callback(){
callback(){
a.doing2( new Callback(){
(){
a.doing3();
}
})
}
});

那有没有办法解决这个问题呢?其实不难,实现一个类似Future的功能!当Client获取结果时,进行阻塞,当得到结果后再继续往下走!实现方案,一个就是使用锁了,还有一个就是使用RingBuffer。经测试,使用RingBuffer比使用锁TPS有2000左右的提高!

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读