加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

python – Gevent.monkey.patch_all打破了依赖于socket.shutdown

发布时间:2020-12-20 13:41:07 所属栏目:Python 来源:网络整理
导读:我目前正在努力为现有的 django项目增加对gevent-socketio的支持.我发现gevent.monkey.patch_all()调用正在破坏负责从套接字接收数据的线程的取消机制,我们现在将调用类SocketReadThread. SocketReadThread非常简单,它在阻塞套接字上调用recv().当它接收数据
我目前正在努力为现有的 django项目增加对gevent-socketio的支持.我发现gevent.monkey.patch_all()调用正在破坏负责从套接字接收数据的线程的取消机制,我们现在将调用类SocketReadThread.

SocketReadThread非常简单,它在阻塞套接字上调用recv().当它接收数据时处理它并再次调用recv().发生异常时或当recv()返回0字节时线程停止,就像在SocketReadThread.stop_reading()中调用socket.shutdown(SHUT_RDWR)时发生的那样

当gevent.monkey.patch_all()替换默认套接字实现时,会发生此问题.而不是很好地关闭我得到以下异常:

error: [Errno 9] File descriptor was closed in another greenlet

我假设这种情况正在发生,因为gevent使我的套接字无阻塞以便发挥其魔力.这意味着当我调用socket.shutdown(socket.SHUT_RDWR)时,正在为猴子修补socket.recv调用的greenlet尝试从已关闭的文件描述符中读取.

我编写了一个示例来隔离此问题:

from gevent import monkey

monkey.patch_all()

import socket
import sys
import threading
import time


class SocketReadThread(threading.Thread):
    def __init__(self,socket):
        super(SocketReadThread,self).__init__()
        self._socket = socket

    def run(self):
        connected = True
        while connected:
            try:
                print "calling socket.recv"
                data = self._socket.recv(1024)
                if (len(data) < 1):
                    print "received nothing,assuming socket shutdown"
                    connected = False
                else :
                    print "Recieved something: {}".format(data)
            except socket.timeout as e:
                print "Socket timeout: {}".format(e)
                connected = false
            except :
                ex = sys.exc_info()[1]
                print "Unexpected exception occurrred: {}".format(str(ex))
                raise ex

    def stop_reading(self):
        self._socket.shutdown(socket.SHUT_RDWR)
        self._socket.close()


if __name__ == '__main__':

    sock = socket.socket()
    sock.connect(('127.0.0.1',4242))

    st = SocketReadThread(sock)
    st.start()
    time.sleep(3)
    st.stop_reading()
    st.join()

如果你打开一个终端运行nc -lp 4242& (为了给这个程序提供连接的东西)然后运行这个程序你会看到上面提到的异常.如果你删除对monkey.patch_all()的调用,你会发现它工作得很好.

我的问题是:如何支持取消SocketReadThread以适用或不使用gevent monkey补丁的方式,并且不需要使用会导致取消缓慢的任意超时(即调用带有超时的recv()并检查有条件的)?

解决方法

我发现有两种不同的解决方法.第一个是简单地捕获和抑制异常.这似乎工作正常,因为一个线程通常会关闭套接字以使另一个线程退出阻塞读取.除了调试辅助工具之外,我不知道或理解为什么greenlets会抱怨这个问题.这真的只是一个烦恼.

第二种选择是使用自管道技巧(快速搜索产生许多解释)作为唤醒被阻塞线程的机制.基本上,我们创建第二个文件描述符(套接字就像是OS的一种文件描述符),用于信令取消.然后我们使用select作为阻塞来等待套接字上的传入数据或取消请求进入取消文件描述符.请参阅下面的示例代码.

from gevent import monkey

monkey.patch_all()

import os
import select
import socket
import sys
import threading
import time


class SocketReadThread(threading.Thread):
    def __init__(self,self).__init__()
        self._socket = socket
        self._socket.setblocking(0)
        r,w = os.pipe()
        self._cancelpipe_r = os.fdopen(r,'r')
        self._cancelpipe_w = os.fdopen(w,'w')

    def run(self):
        connected = True
        read_fds = [self._socket,self._cancelpipe_r]
        while connected:
            print "Calling select"
            read_list,write_list,x_list = select.select(read_fds,[],[])
            print "Select returned"
            if self._cancelpipe_r in read_list :
                print "exiting"
                self._cleanup()
                connected = False
            elif self._socket in read_list:
                print "calling socket.recv"
                data = self._socket.recv(1024)
                if (len(data) < 1):
                    print "received nothing,assuming socket shutdown"
                    connected = False
                    self._cleanup()
                else :
                    print "Recieved something: {}".format(data)


    def stop_reading(self):
        print "writing to pipe"
        self._cancelpipe_w.write("n")
        self._cancelpipe_w.flush()
        print "joining"
        self.join()
        print "joined"

    def _cleanup(self):
        self._cancelpipe_r.close()
        self._cancelpipe_w.close()
        self._socket.shutdown(socket.SHUT_RDWR)
        self._socket.close()


if __name__ == '__main__':

    sock = socket.socket()
    sock.connect(('127.0.0.1',4242))

    st = SocketReadThread(sock)
    st.start()
    time.sleep(3)
    st.stop_reading()

再次,在运行上述程序之前运行netcat -lp 4242&给它一个连接的监听套接字.

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读