python – 进程与线程关于使用Queue()/ deque()和类变量进行通信
我想创建一个Thread或一个在While True循环中永远运行的进程.
我需要以队列的形式向工作人员发送和接收数据,可以是multiprocessing.Queue()或collections.deque().我更喜欢使用collections.deque(),因为它明显更快. 我还需要能够最终杀死这个工作者(因为它运行了一段时间的True循环.这里有一些测试代码,我把它们放在一起试图理解线程,进程,队列和deque之间的区别. import time from multiprocessing import Process,Queue from threading import Thread from collections import deque class ThreadingTest(Thread): def __init__(self,q): super(ThreadingTest,self).__init__() self.q = q self.toRun = False def run(self): print("Started Thread") self.toRun = True while self.toRun: if type(self.q) == type(deque()): if self.q: i = self.q.popleft() print("Thread deque: " + str(i)) elif type(self.q) == type(Queue()): if not self.q.empty(): i = self.q.get_nowait() print("Thread Queue: " + str(i)) def stop(self): print("Trying to stop Thread") self.toRun = False while self.isAlive(): time.sleep(0.1) print("Stopped Thread") class ProcessTest(Process): def __init__(self,q): super(ProcessTest,self).__init__() self.q = q self.toRun = False self.ctr = 0 def run(self): print("Started Process") self.toRun = True while self.toRun: if type(self.q) == type(deque()): if self.q: i = self.q.popleft() print("Process deque: " + str(i)) elif type(self.q) == type(Queue()): if not self.q.empty(): i = self.q.get_nowait() print("Process Queue: " + str(i)) def stop(self): print("Trying to stop Process") self.toRun = False while self.is_alive(): time.sleep(0.1) print("Stopped Process") if __name__ == '__main__': q = Queue() t1 = ProcessTest(q) t1.start() for i in range(10): if type(q) == type(deque()): q.append(i) elif type(q) == type(Queue()): q.put_nowait(i) time.sleep(1) t1.stop() t1.join() if type(q) == type(deque()): print(q) elif type(q) == type(Queue()): while q.qsize() > 0: print(str(q.get_nowait())) 如您所见,t1可以是ThreadingTest,也可以是ProcessTest.此外,传递给它的队列可以是multiprocessing.Queue或collections.deque. ThreadingTest适用于Queue或deque().当调用stop()方法时,它还会正确杀死run(). Started Thread Thread deque: 0 Thread deque: 1 Thread deque: 2 Thread deque: 3 Thread deque: 4 Thread deque: 5 Thread deque: 6 Thread deque: 7 Thread deque: 8 Thread deque: 9 Trying to stop Thread Stopped Thread deque([]) 如果ProcessTest类型为multiprocessing.Queue,则它只能从队列中读取.它不适用于collections.deque.此外,我无法使用stop()终止进程. Process Queue: 0 Process Queue: 1 Process Queue: 2 Process Queue: 3 Process Queue: 4 Process Queue: 5 Process Queue: 6 Process Queue: 7 Process Queue: 8 Process Queue: 9 Trying to stop Process 我想弄明白为什么?另外,在进程中使用双端队列的最佳方法是什么?而且,我将如何使用某种stop()方法杀死进程. 解决方法
您不能使用collections.deque在两个multiprocessing.Process实例之间传递数据,因为collections.deque不支持进程. multiprocessing.Queue在内部将其内容写入multiprocessing.Pipe,这意味着其中的数据可以在一次进程中排队并在另一个进程中检索. collections.deque没有那种管道,所以它不起作用.当您在一个进程中写入双端队列时,另一个进程中的双端队列实例根本不会受到影响;他们是完全独立的实例.
您的stop()方法也会遇到类似的问题.您正在主进程中更改toRun的值,但这根本不会影响子进程.它们是完全独立的实例.结束孩子的最好方法是将一些哨兵送到队列.当你获得孩子的哨兵时,突破无限循环: def run(self): print("Started Process") self.toRun = True while self.toRun: if type(self.q) == type(deque()): if self.q: i = self.q.popleft() print("Process deque: " + str(i)) elif type(self.q) == type(Queue()): if not self.q.empty(): i = self.q.get_nowait() if i is None: break # Got sentinel,so break print("Process Queue: " + str(i)) def stop(self): print("Trying to stop Process") self.q.put(None) # Send sentinel while self.is_alive(): time.sleep(0.1) print("Stopped Process") 编辑: 如果您确实需要在两个进程之间使用deque语义,则可以使用custom import time from multiprocessing import Process from multiprocessing.managers import SyncManager from collections import deque SyncManager.register('deque',deque) def Manager(): m = SyncManager() m.start() return m class ProcessTest(Process): def __init__(self,self).__init__() self.q = q self.ctr = 0 def run(self): print("Started Process") self.toRun = True while self.toRun: if self.q._getvalue(): i = self.q.popleft() if i is None: break print("Process deque: " + str(i)) def stop(self): print("Trying to stop Process") self.q.append(None) while self.is_alive(): time.sleep(0.1) print("Stopped Process") if __name__ == '__main__': m = Manager() q = m.deque() t1 = ProcessTest(q) t1.start() for i in range(10): q.append(i) time.sleep(1) t1.stop() t1.join() print(q) 请注意,这可能不会比multiprocessing.Queue更快,因为每次访问双端队列时都会有IPC成本.它也是一种不那么自然的数据结构,可以按照您的方式传递消息. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |