二、I/O 多路复用之select、poll、epoll详解
</td>
</tr></table>
<p id="articleHeader14">select,poll,epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。
sellect、poll、epoll三者的区别:
select?
select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。
select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。
select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。
poll?
poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。
poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。
另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。
epoll?直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。
epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。
epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。
另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。
Python select?
Python中的select模块专注于I/O多路复用,提供了三个方法(其中后两个在Linux中可用,windows仅支持select,官方解释:),另外也提供了kqueue方法(freeBSD系统)
Python的select()方法直接调用操作系统的IO接口,它监控sockets,open files,and pipes(所有带fileno()方法的文件句柄)何时变成readable 和writeable,或者通信错误(异常),select()使得同时监控多个连接变的简单,并且这比写一个长循环来等待和监控多客户端连接要高效,因为select直接通过操作系统提供的C的网络接口进行操作,而不是通过Python的解释器。
select方法
1、调用select()方法,上下文切换转换为内核态
2、将fd从用户空间复制到内核空间
3、内核遍历所有fd,查看其对应事件是否发生
4、如果没发生,将进程阻塞,当设备驱动产生中断或者timeout时间后,将进程唤醒,再次进行遍历
5、返回遍历后的fd
6、将fd从内核空间复制到用户空间
使用方法:
?
参数列表:
- rlist: wait until ready for reading
- wlist: wait until ready for writing
- xlist: wait for an “exceptional condition”
- timeout: 超时时间
返回三个值:
select方法用来监视文件描述符(当文件描述符条件不满足时,select会阻塞),当某个文件描述符状态改变后,会返回三个列表
????1、当参数1 序列中的fd满足“可读”条件时,则获取发生变化的fd并添加到fd_r_list中
??? 2、当参数2 序列中含有fd时,则将该序列中所有的fd添加到 fd_w_list中
??? 3、当参数3 序列中的fd发生错误时,则将该发生错误的fd添加到 fd_e_list中
??? 4、当超时时间为空,则select会一直阻塞,直到监听的句柄发生变化
?? 当超时时间 = n(正整数)时,那么如果监听的句柄均无任何变化,则select会阻塞n秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。
=1
server.bind((,50005=
= r r server:
conn,addr=( </span><span style="color: #0000ff;">else</span>:<span style="color: #008000;">#</span><span style="color: #008000;">否则代表readable里是已经建立的连接</span>
data=r.recv(1024<span style="color: #000000;">)
</span><span style="color: #0000ff;">if</span><span style="color: #000000;"> data:
</span><span style="color: #0000ff;">print</span><span style="color: #000000;">(data)
r.send(data)
</span><span style="color: #0000ff;">else</span><span style="color: #000000;">:
</span><span style="color: #0000ff;">print</span>(<span style="color: #800000;">"</span><span style="color: #800000;">客户端已经断开</span><span style="color: #800000;">"</span><span style="color: #000000;">)
inputs.remove(r)
</span><span style="color: #0000ff;">for</span> e <span style="color: #0000ff;">in</span><span style="color: #000000;"> exceptionable:
</span><span style="color: #0000ff;">print</span>(<span style="color: #800000;">"</span><span style="color: #800000;">客户端出错!</span><span style="color: #800000;">"</span><span style="color: #000000;">)
inputs.remove(e)</span></pre>
=socket.socket()
client.connect((,5000))
True:
data=input(>>>=client.recv(1024(data.decode())
select文艺版:
server =<span style="color: #000000;"> socket.socket()
server.setblocking(0)
server_addr
= (<span style="color: #800000;">'<span style="color: #800000;">localhost<span style="color: #800000;">',10000<span style="color: #000000;">)
<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;">starting up on %s port %s<span style="color: #800000;">' %<span style="color: #000000;"> server_addr)
server.bind(server_addr)
server.listen(5<span style="color: #000000;">)
inputs = [server,] <span style="color: #008000;">#<span style="color: #008000;">自己也要监测呀,因为server本身也是个fd
outputs =<span style="color: #000000;"> []
message_queues =<span style="color: #000000;"> {}
<span style="color: #0000ff;">while<span style="color: #000000;"> True:
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;">waiting for next event...<span style="color: #800000;">"<span style="color: #000000;">)
readable,writeable,exeptional </span>= select.select(inputs,inputs) <span style="color: #008000;">#</span><span style="color: #008000;">如果没有任何fd就绪,那程序就会一直阻塞在这里</span>
<span style="color: #0000ff;">for</span> s <span style="color: #0000ff;">in</span> readable: <span style="color: #008000;">#</span><span style="color: #008000;">每个s就是一个socket</span>
<span style="color: #0000ff;">if</span> s <span style="color: #0000ff;">is</span> server: <span style="color: #008000;">#</span><span style="color: #008000;">别忘记,上面我们server自己也当做一个fd放在了inputs列表里,传给了select,如果这个s是server,代表server这个fd就绪了,</span>
<span style="color: #008000;">#</span><span style="color: #008000;">就是有活动了,什么情况下它才有活动? 当然 是有新连接进来的时候 呀</span>
<span style="color: #008000;">#</span><span style="color: #008000;">新连接进来了,接受这个连接</span>
conn,client_addr =<span style="color: #000000;"> s.accept()
</span><span style="color: #0000ff;">print</span>(<span style="color: #800000;">"</span><span style="color: #800000;">new connection from</span><span style="color: #800000;">"</span><span style="color: #000000;">,client_addr)
conn.setblocking(0)
inputs.append(conn) </span><span style="color: #008000;">#</span><span style="color: #008000;">为了不阻塞整个程序,我们不会立刻在这里开始接收客户端发来的数据,把它放到inputs里,下一次loop时,这个新连接</span>
<span style="color: #008000;">#</span><span style="color: #008000;">就会被交给select去监听,如果这个连接的客户端发来了数据,那这个连接的fd在server端就会变成就续的,select就会把这个连接返回,返回到</span>
<span style="color: #008000;">#</span><span style="color: #008000;">readable 列表里,然后你就可以loop readable列表,取出这个连接,开始接收数据了,下面就是这么干 的</span>
<span style="color: #000000;">
message_queues[conn] = queue.Queue() <span style="color: #008000;">#<span style="color: #008000;">接收到客户端的数据后,不立刻返回,暂存在队列里,以后发送
<span style="color: #0000ff;">else</span>: <span style="color: #008000;">#</span><span style="color: #008000;">s不是server的话,那就只能是一个 与客户端建立的连接的fd了</span>
<span style="color: #008000;">#</span><span style="color: #008000;">客户端的数据过来了,在这接收</span>
data = s.recv(1024<span style="color: #000000;">)
</span><span style="color: #0000ff;">if</span><span style="color: #000000;"> data:
</span><span style="color: #0000ff;">print</span>(<span style="color: #800000;">"</span><span style="color: #800000;">收到来自[%s]的数据:</span><span style="color: #800000;">"</span> %<span style="color: #000000;"> s.getpeername()[0],data)
message_queues[s].put(data) </span><span style="color: #008000;">#</span><span style="color: #008000;">收到的数据先放到queue里,一会返回给客户端</span>
<span style="color: #0000ff;">if</span> s <span style="color: #0000ff;">not</span> <span style="color: #0000ff;">in</span><span style="color: #000000;"> outputs:
outputs.append(s) </span><span style="color: #008000;">#</span><span style="color: #008000;">为了不影响处理与其它客户端的连接,这里不立刻返回数据给客户端</span>
<span style="color: #0000ff;">else</span>:<span style="color: #008000;">#</span><span style="color: #008000;">如果收不到data代表什么呢? 代表客户端断开了呀</span>
<span style="color: #0000ff;">print</span>(<span style="color: #800000;">"</span><span style="color: #800000;">客户端断开了</span><span style="color: #800000;">"</span><span style="color: #000000;">,s)
</span><span style="color: #0000ff;">if</span> s <span style="color: #0000ff;">in</span><span style="color: #000000;"> outputs:
outputs.remove(s) </span><span style="color: #008000;">#</span><span style="color: #008000;">清理已断开的连接</span>
<span style="color: #000000;">
inputs.remove(s) <span style="color: #008000;">#<span style="color: #008000;">清理已断开的连接
<span style="color: #0000ff;">del</span> message_queues[s] <span style="color: #008000;">#</span><span style="color: #008000;">#清理已断开的连接</span>
<span style="color: #0000ff;">for</span> s <span style="color: #0000ff;">in</span><span style="color: #000000;"> writeable:
</span><span style="color: #0000ff;">try</span><span style="color: #000000;"> :
next_msg </span>=<span style="color: #000000;"> message_queues[s].get_nowait()
</span><span style="color: #0000ff;">except</span><span style="color: #000000;"> queue.Empty:
</span><span style="color: #0000ff;">print</span>(<span style="color: #800000;">"</span><span style="color: #800000;">client [%s]</span><span style="color: #800000;">"</span> %s.getpeername()[0],<span style="color: #800000;">"</span><span style="color: #800000;">queue is empty..</span><span style="color: #800000;">"</span><span style="color: #000000;">)
outputs.remove(s)
</span><span style="color: #0000ff;">else</span><span style="color: #000000;">:
</span><span style="color: #0000ff;">print</span>(<span style="color: #800000;">"</span><span style="color: #800000;">sending msg to [%s]</span><span style="color: #800000;">"</span>%<span style="color: #000000;">s.getpeername()[0],next_msg)
s.send(next_msg.upper())
</span><span style="color: #0000ff;">for</span> s <span style="color: #0000ff;">in</span><span style="color: #000000;"> exeptional:
</span><span style="color: #0000ff;">print</span>(<span style="color: #800000;">"</span><span style="color: #800000;">handling exception for </span><span style="color: #800000;">"</span><span style="color: #000000;">,s.getpeername())
inputs.remove(s)
</span><span style="color: #0000ff;">if</span> s <span style="color: #0000ff;">in</span><span style="color: #000000;"> outputs:
outputs.remove(s)
s.close()
</span><span style="color: #0000ff;">del</span> message_queues[s]</pre>
在服务端我们可以看到,我们需要不停的调用select,?这就意味着:
?
poll方法
poll与select相差不大,相比于select而言,内核监测的文件描述不受限制。
epoll很好的改进了select:
关于水平触发和边缘触发:
Level_triggered(水平触发,有时也称条件触发):当被监控的文件描述符上有可读写事件发生时,epoll.poll()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll.poll()时,它还会通知你在上没读写完的文件描述符上继续读写,如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率!!! 优点很明显:稳定可靠
Edge_triggered(边缘触发,有时也称状态触发):当被监控的文件描述符上有可读写事件发生时,epoll.poll()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll.poll()时,这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符。缺点:某些条件下不可靠
?epoll实例:
s =<span style="color: #000000;"> socket.socket()
s.bind((<span style="color: #800000;">'<span style="color: #800000;">127.0.0.1<span style="color: #800000;">',8888<span style="color: #000000;">))
s.listen(5<span style="color: #000000;">)
epoll_obj =<span style="color: #000000;"> select.epoll()
epoll_obj.register(s,select.EPOLLIN)
connections =<span style="color: #000000;"> {}
<span style="color: #0000ff;">while<span style="color: #000000;"> True:
events =<span style="color: #000000;"> epoll_obj.poll()
<span style="color: #0000ff;">for fd,event <span style="color: #0000ff;">in<span style="color: #000000;"> events:
<span style="color: #0000ff;">print<span style="color: #000000;">(fd,event)
<span style="color: #0000ff;">if fd ==<span style="color: #000000;"> s.fileno():
conn,addr =<span style="color: #000000;"> s.accept()
connections[conn.fileno()] =<span style="color: #000000;"> conn
epoll_obj.register(conn,select.EPOLLIN)
msg = conn.recv(200<span style="color: #000000;">)
conn.sendall(<span style="color: #800000;">'<span style="color: #800000;">ok<span style="color: #800000;">'<span style="color: #000000;">.encode())
<span style="color: #0000ff;">else<span style="color: #000000;">:
<span style="color: #0000ff;">try<span style="color: #000000;">:
fd_obj =<span style="color: #000000;"> connections[fd]
msg = fd_obj.recv(200<span style="color: #000000;">)
fd_obj.sendall(<span style="color: #800000;">'<span style="color: #800000;">ok<span style="color: #800000;">'<span style="color: #000000;">.encode())
<span style="color: #0000ff;">except<span style="color: #000000;"> BrokenPipeError:
epoll_obj.unregister(fd)
connections[fd].close()
<span style="color: #0000ff;">del<span style="color: #000000;"> connections[fd]
s.close()
epoll_obj.close()
flag = 1<span style="color: #000000;">
s =<span style="color: #000000;"> socket.socket()
s.connect((<span style="color: #800000;">'<span style="color: #800000;">127.0.0.1<span style="color: #800000;">',8888<span style="color: #000000;">))
<span style="color: #0000ff;">while<span style="color: #000000;"> flag:
input_msg = input(<span style="color: #800000;">'<span style="color: #800000;">input>>><span style="color: #800000;">'<span style="color: #000000;">)
<span style="color: #0000ff;">if input_msg == <span style="color: #800000;">'<span style="color: #800000;">0<span style="color: #800000;">'<span style="color: #000000;">:
<span style="color: #0000ff;">break<span style="color: #000000;">
s.sendall(input_msg.encode())
msg = s.recv(1024<span style="color: #000000;">)
<span style="color: #0000ff;">print<span style="color: #000000;">(msg.decode())
s.close()
selectors
python3.4新增selectors模块,封装了select,高层次、高效率的I/O多路复用,它具有根据操作系统平台选出最佳的IO多路机制,比如在win的系统上他默认的是select模式而在linux上它默认的epoll。
<div class="cnblogs_code">
sel =<span style="color: #000000;"> selectors.DefaultSelector()
<span style="color: #0000ff;">def<span style="color: #000000;"> accept(sock,mask):
conn,addr = sock.accept() <span style="color: #008000;">#
<span style="color: #008000;"> Should be ready
<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;">accepted<span style="color: #800000;">',conn,<span style="color: #800000;">'<span style="color: #800000;">from<span style="color: #800000;">'<span style="color: #000000;">,addr)
conn.setblocking(False)
sel.register(conn,selectors.EVENT_READ,read)
<span style="color: #0000ff;">def<span style="color: #000000;"> read(conn,mask):
data = conn.recv(1000) <span style="color: #008000;">#<span style="color: #008000;"> Should be ready
<span style="color: #0000ff;">if<span style="color: #000000;"> data:
<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;">echoing<span style="color: #800000;">',repr(data),<span style="color: #800000;">'<span style="color: #800000;">to<span style="color: #800000;">'<span style="color: #000000;">,conn)
conn.send(data) <span style="color: #008000;">#<span style="color: #008000;"> Hope it won't block
<span style="color: #0000ff;">else<span style="color: #000000;">:
<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;">closing<span style="color: #800000;">'<span style="color: #000000;">,conn)
sel.unregister(conn)
conn.close()
sock =<span style="color: #000000;"> socket.socket()
sock.bind((<span style="color: #800000;">'<span style="color: #800000;">localhost<span style="color: #800000;">',10000<span style="color: #000000;">))
sock.listen(100<span style="color: #000000;">)
sock.setblocking(False)
sel.register(sock,accept)
<span style="color: #0000ff;">while<span style="color: #000000;"> True:
events =<span style="color: #000000;"> sel.select()
<span style="color: #0000ff;">for key,mask <span style="color: #0000ff;">in<span style="color: #000000;"> events:
callback =<span style="color: #000000;"> key.data
callback(key.fileobj,mask)
(编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!