python3 网络编程-进程间通讯(队列和管道)、数据交互(Manager)
不同进程内存是不共享的,要实现两个进程间的数据交换(传递),可以使用队列Queue和管道Pipe 一、进程队列????Queue ????????使用技巧和线程队列queue一样。 ????????Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。 ????????get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常 代码示例: from?multiprocessing?import?Process,?Queue import?queue def?f(q,?n): ????#?q.put([123,?456,?'hello']) ????q.put(n) ????print("son?process",?id(q)) if?__name__?==?'__main__': ????q?=?Queue()??#?try:?q=queue.Queue() ????print("main?process",?id(q)) ????for?i?in?range(3): ????????p?=?Process(target=f,?args=(q,?i)) ????????p.start() ????print(q.get()) ????print(q.get()) ????print(q.get()) 输出结果: main?process?2141958683840 son?process?2307961302376 0 son?process?2580504423728 1 son?process?1862767455592 2 二、管道????Pipe 管道原理:并发编程——pipe管道 ????????Pipe方法返回(conn1,conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。 ????????send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError ????????Pipe()函数返回通过管道连接的一对连接对象,默认情况下是双向(双向)。? from?multiprocessing?import?Process,?Pipe def?f(conn): ????conn.send([12,?{"name":?"yuan"},?'hello']) ????response?=?conn.recv() ????print("response",?response) ????conn.close() ????print("q_ID2:",?id(conn)) if?__name__?==?'__main__': ????parent_conn,?child_conn?=?Pipe() ????print("q_ID1:",?id(child_conn)) ????p?=?Process(target=f,?args=(child_conn,)) ????p.start() ????print(parent_conn.recv())??#?prints?"[42,?None,?'hello']" ????parent_conn.send("儿子你好!") ????p.join() ????????Pipe()返回的两个连接对象表示管道的两端。 每个连接对象都有send()和recv()方法(等等)。 请注意,如果两个进程(或线程)尝试同时读取或写入管道的同一端,则管道中的数据可能会损坏。 当然,同时使用管道的不同端的进程没有损坏的风险。 ????????管道和队列只是实现数据交互,一般使用队列比较多。 三、数据交互????Manager()
????????Manager()返回的管理器对象控制一个服务器进程,该进程持有Python对象,并允许其他进程使用代理来操作它们。 ????????Manager()返回的管理器将支持类型列表,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Barrier,Queue,Value和Array。 注意 Manager和队列(queue)、管道(pipi)区别: ????? ? 1、队列和管道只能进行数据交互,但是不能对数据进行修改 ????????2、manager可以实现多个进程数据间修改和交互 代码示例: from?multiprocessing?import?Process,?Manager import?time def?f(d,?l,?n): ????d[1]?=?'1' ????d['2']?=?2 ????d[n]?=?n ????l.append(n) ????#?print(l) if?__name__?==?'__main__': ????with?Manager()?as?manager: ????????d?=?manager.dict() ????????l?=?manager.list(range(5)) ????????p_list?=?[] ????????for?i?in?range(10): ????????????p?=?Process(target=f,?args=(d,?i)) ????????????#?time.sleep(0.05)????#?使用这一步,就可以让结果按照顺序输出 ??????????????????????????????????#?之所以乱序输出是因为我的电脑运行太快 ????????????p.start() ????????????p_list.append(p) ????????for?res?in?p_list: ????????????res.join() ????????print(d) ????????print(l) 输出结果: {0:?0,?1:?'1',?2:?2,?3:?3,?4:?4,?5:?5,?6:?6,?7:?7,?8:?8,?9:?9,?'2':?2} [0,?1,?2,?3,?4,?5,?0,?6,?7,?9,?8] 结果分析: ????????创建一个空字典(d)和一个列表[0,1,2,3,4],然后作为进程中函数f的参数d,l 循环执行10次。 四、进程同步(进程锁Lock) ????????没有使用锁从不同进程的输出容易得到所有混合。 用途: ????????输出在一个屏幕,或者是写入到一个文件里面。 代码示例: from?multiprocessing?import?Process,?Lock ? def?f(l,?i): ????l.acquire() ????try: ????????print('hello?world',?i) ????finally: ????????l.release() ? if?__name__?==?'__main__': ????lock?=?Lock() ? ????for?num?in?range(10): ????????Process(target=f,?args=(lock,?num)).start() python 2输出的结果会串行,python 3不会出现 代码示例1: from?multiprocessing?import?Process,?Pool,?Lock import?time,?os def?Foo(i): ????time.sleep(1) ????print(i) ????print("son",?os.getpid()) def?Bar(arg): ????print("hello") ????print("Bar",?os.getpid()) if?__name__?==?'__main__': ????pool?=?Pool(5) ????print("main?pid",?os.getpid()) ????print("----------------") ????for?i?in?range(100): ????????#?pool.apply(func=Foo,?args=(i,))????????????#?同步接口 ????????#?pool.apply_async(func=Foo,))??????#?异步接口 ????????pool.apply_async(func=Foo,),?callback=Bar)??#?callback?回调函数 ????pool.close()???#?进程池必须把.close()放在.join()上面 ????pool.join()???#?固定步骤,缺一不可 ????print('end') 输出结果: main?pid?9720 ---------------- 0 son?8896 hello Bar?9720 1 son?10028 hello Bar?9720 进程数量控制在100以内,线程数量控制在1000以内 五、进程池(线程池python默认不带,因为线程资源消耗小)
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。 进程池中有两个方法:
进程池代码示例: from??multiprocessing?import?Process,?Pool import?time def?Foo(i): ????time.sleep(1) ????print("Foo",i) ????return?i def?Bar(arg): ????print('-->exec?done:',?arg) if?__name__?==?'__main__': ????pool?=?Pool(5) ????for?i?in?range(10): ????????pool.apply_async(func=Foo,?callback=Bar) ????????#?pool.apply(func=Foo,))????#?阻塞 ????print('end') ????pool.close() ????pool.join()??#?进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。 注意: ? from??multiprocessing?import?Process,?cpu_count import?time def?Foo(i): ????time.sleep(1) ????print("Foo",?i) ????return?i def?Bar(arg): ????print('-->exec?done:',?arg) if?__name__?==?'__main__': ????pool?=?Pool(cpu_count())?????#?获取cpu的核数 ????result?=?[] ????for?i?in?range(10): ????????result.append(pool.apply_async(func=Foo,))) ????????#?pool.apply(func=Foo,)) ????pool.close() ????pool.join()??#?进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。 ????for?res?in?result: ????????print("res:",res.get()) ????print('end') 输出结果: Foo?0 Foo?1 Foo?2 Foo?3 Foo?4 Foo?5 Foo?6 Foo?7 Foo?8 Foo?9 res:?0 res:?1 res:?2 res:?3 res:?4 res:?5 res:?6 res:?7 res:?8 res:?9 end 可以从该例子看到,使用result.get()方法会是函数阻塞。 参考文章: http://www.cnblogs.com/kaituorensheng/p/4465768.html http://www.cnblogs.com/kaituorensheng/p/4465768.html (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |