加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

python3 网络编程-进程间通讯(队列和管道)、数据交互(Manager)

发布时间:2020-12-17 17:00:22 所属栏目:Python 来源:网络整理
导读:不同进程内存是不共享的,要实现两个进程间的数据交换(传递),可以使用队列Queue和管道Pipe 一、进程队列????Queue ????????使用技巧和线程队列queue一样。 ????????Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到

不同进程内存是不共享的,要实现两个进程间的数据交换(传递),可以使用队列Queue和管道Pipe

一、进程队列????Queue

????????使用技巧和线程队列queue一样。

????????Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。

????????get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常

代码示例:

from?multiprocessing?import?Process,?Queue
import?queue


def?f(q,?n):
????#?q.put([123,?456,?'hello'])
????q.put(n)
????print("son?process",?id(q))


if?__name__?==?'__main__':
????q?=?Queue()??#?try:?q=queue.Queue()
????print("main?process",?id(q))

????for?i?in?range(3):
????????p?=?Process(target=f,?args=(q,?i))
????????p.start()

????print(q.get())
????print(q.get())
????print(q.get())

输出结果:

main?process?2141958683840
son?process?2307961302376
0
son?process?2580504423728
1
son?process?1862767455592
2

二、管道????Pipe

管道原理:并发编程——pipe管道

????????Pipe方法返回(conn1,conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。

????????send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError

????????Pipe()函数返回通过管道连接的一对连接对象,默认情况下是双向(双向)。?

from?multiprocessing?import?Process,?Pipe


def?f(conn):
????conn.send([12,?{"name":?"yuan"},?'hello'])
????response?=?conn.recv()
????print("response",?response)
????conn.close()
????print("q_ID2:",?id(conn))


if?__name__?==?'__main__':
????parent_conn,?child_conn?=?Pipe()
????print("q_ID1:",?id(child_conn))
????p?=?Process(target=f,?args=(child_conn,))
????p.start()
????print(parent_conn.recv())??#?prints?"[42,?None,?'hello']"
????parent_conn.send("儿子你好!")
????p.join()

????????Pipe()返回的两个连接对象表示管道的两端。 每个连接对象都有send()和recv()方法(等等)。 请注意,如果两个进程(或线程)尝试同时读取或写入管道的同一端,则管道中的数据可能会损坏。 当然,同时使用管道的不同端的进程没有损坏的风险。

????????管道和队列只是实现数据交互,一般使用队列比较多。

三、数据交互????Manager()

????????Manager()返回的管理器对象控制一个服务器进程,该进程持有Python对象,并允许其他进程使用代理来操作它们。

????????Manager()返回的管理器将支持类型列表,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Barrier,Queue,Value和Array。

注意 Manager和队列(queue)、管道(pipi)区别:

????? ? 1、队列和管道只能进行数据交互,但是不能对数据进行修改

????????2、manager可以实现多个进程数据间修改和交互

代码示例:

from?multiprocessing?import?Process,?Manager
import?time

def?f(d,?l,?n):
????d[1]?=?'1'
????d['2']?=?2
????d[n]?=?n
????l.append(n)
????#?print(l)


if?__name__?==?'__main__':
????with?Manager()?as?manager:
????????d?=?manager.dict()

????????l?=?manager.list(range(5))
????????p_list?=?[]
????????for?i?in?range(10):
????????????p?=?Process(target=f,?args=(d,?i))
????????????#?time.sleep(0.05)????#?使用这一步,就可以让结果按照顺序输出
??????????????????????????????????#?之所以乱序输出是因为我的电脑运行太快
????????????p.start()
????????????p_list.append(p)
????????for?res?in?p_list:
????????????res.join()

????????print(d)
????????print(l)

输出结果:

{0:?0,?1:?'1',?2:?2,?3:?3,?4:?4,?5:?5,?6:?6,?7:?7,?8:?8,?9:?9,?'2':?2}
[0,?1,?2,?3,?4,?5,?0,?6,?7,?9,?8]

结果分析:

????????创建一个空字典(d)和一个列表[0,1,2,3,4],然后作为进程中函数f的参数d,l 循环执行10次。

四、进程同步(进程锁Lock)

????????没有使用锁从不同进程的输出容易得到所有混合。

用途:

????????输出在一个屏幕,或者是写入到一个文件里面。

代码示例:

from?multiprocessing?import?Process,?Lock
?
def?f(l,?i):
????l.acquire()
????try:
????????print('hello?world',?i)
????finally:
????????l.release()
?
if?__name__?==?'__main__':
????lock?=?Lock()
?
????for?num?in?range(10):
????????Process(target=f,?args=(lock,?num)).start()

python 2输出的结果会串行,python 3不会出现

代码示例1:

from?multiprocessing?import?Process,?Pool,?Lock
import?time,?os


def?Foo(i):
????time.sleep(1)
????print(i)
????print("son",?os.getpid())


def?Bar(arg):
????print("hello")
????print("Bar",?os.getpid())


if?__name__?==?'__main__':

????pool?=?Pool(5)
????print("main?pid",?os.getpid())
????print("----------------")

????for?i?in?range(100):
????????#?pool.apply(func=Foo,?args=(i,))????????????#?同步接口
????????#?pool.apply_async(func=Foo,))??????#?异步接口
????????pool.apply_async(func=Foo,),?callback=Bar)??#?callback?回调函数

????pool.close()???#?进程池必须把.close()放在.join()上面
????pool.join()???#?固定步骤,缺一不可
????print('end')

输出结果:

main?pid?9720
----------------
0
son?8896
hello
Bar?9720
1
son?10028
hello
Bar?9720

进程数量控制在100以内,线程数量控制在1000以内


五、进程池(线程池python默认不带,因为线程资源消耗小)

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

    • apply????????????????没有callback参数????????????阻塞函数(不建议使用)

    • apply_async?????callback 回执???????????????? ?非阻塞函数(支持结果返回后进行回调)

进程池代码示例:

from??multiprocessing?import?Process,?Pool
import?time


def?Foo(i):
????time.sleep(1)
????print("Foo",i)
????return?i


def?Bar(arg):
????print('-->exec?done:',?arg)

if?__name__?==?'__main__':

????pool?=?Pool(5)

????for?i?in?range(10):
????????pool.apply_async(func=Foo,?callback=Bar)
????????#?pool.apply(func=Foo,))????#?阻塞

????print('end')
????pool.close()
????pool.join()??#?进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

注意: ?
????????虽然 apply_async是非阻塞的,但其返回结果的get方法却是阻塞的,如使用result.get()会阻塞主进程。 ?
????????如果我们对返回结果不感兴趣, 那么可以在主进程中使用pool.close与pool.join来防止主进程退出。注意join方法一定要在closeterminate之后调用。 ?
进程池,并关注结果代码示例:

from??multiprocessing?import?Process,?cpu_count
import?time


def?Foo(i):
????time.sleep(1)
????print("Foo",?i)
????return?i


def?Bar(arg):
????print('-->exec?done:',?arg)


if?__name__?==?'__main__':

????pool?=?Pool(cpu_count())?????#?获取cpu的核数
????result?=?[]
????for?i?in?range(10):
????????result.append(pool.apply_async(func=Foo,)))
????????#?pool.apply(func=Foo,))

????pool.close()
????pool.join()??#?进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

????for?res?in?result:
????????print("res:",res.get())
????print('end')

输出结果:

Foo?0
Foo?1
Foo?2
Foo?3
Foo?4
Foo?5
Foo?6
Foo?7
Foo?8
Foo?9
res:?0
res:?1
res:?2
res:?3
res:?4
res:?5
res:?6
res:?7
res:?8
res:?9
end

可以从该例子看到,使用result.get()方法会是函数阻塞。
get()方法得到每个结果返回的值


参考文章:

http://www.cnblogs.com/kaituorensheng/p/4465768.html

http://www.cnblogs.com/kaituorensheng/p/4465768.html


(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读