day33:进程锁&事件&进程队列&进程间共享数据
目录1.锁:Lock 2.信号量:Semaphone 3.事件:Event 4.进程队列:Queue 5.生产者和消费者模型 6.JoinableQueue 7.Manager:进程之间共享数据 锁:Lock1.锁的基本概念上锁和解锁是一对,只上锁不解锁会发生死锁现象(代码阻塞,不往下执行了) 互斥锁 : 互斥锁是进程之间的互相排斥,谁先抢到这个锁资源就先使用,后抢到后使用 2.锁的基本用法# 创建一把锁 lock = Lock() 上锁 lock.acquire() 连续上锁不解锁是死锁 lock.acquire() error print("厕所中...") 解锁 lock.release() 执行程序 ... ") 对上面代码的一波分析:因为上锁后又解锁,所以最后一行的执行程序...可以打印出 但是如果连续上两把锁,解一把锁,则会产生死锁状态。无法打印后面的执行程序... 3.模拟12306抢票软件读写数据库中的票数 def wr_info(sign,dic=None): if sign == r: with open(ticket.txt",mode=utf-8) as fp: dic = json.load(fp) return dic elif sign == w) as fp: json.dump(dic,fp) 抢票方法 def get_ticket(person): 获取数据库中实际的票数 dic = wr_info() print(dic) 模拟一下网络延迟 time.sleep(0.5) 判断票数 if dic[count"] > 0: %s抢到票了" % (person)) dic["] -= 1 wr_info(else: %s没有抢到这张票 (person)) run(person,lock): 查看剩余票数 dic = wr_info(%s 查询票数: %s" % (person,dic[])) lock.acquire() 开始抢票 get_ticket(person) lock.release() if __name__ == __main__: lock = Lock() lst = [FlyHurtMojoSnow1dao770JieJ139GeminiSK] for i in lst: p = Process(target=run,args=(i,lock)) p.start() 运行结果如下图所示 代码分析: 创建进程的时候,仍然是异步并发, 在执行到上锁时,多个进程之间变成了同步程序. 先来的先上锁,先执行,后来的进程后上锁,后执行 信号量:Semaphone信号量 Semaphore 本质上就是锁,只不过可以控制上锁的数量 1.Semaphore的基本用法sem = Semaphore(4) 锁的数量为4 sem.acquire() sem.acquire() sem.acquire() sem.acquire() sem.acquire() # 上第五把锁出现死锁状态. 执行相应的操作) sem.release() 2.模拟KTV房间唱歌ktv(person,sem): sem.acquire() %s进入了ktv,正在唱歌 (person)) 开始唱歌,唱一段时间 time.sleep(random.randrange(3,7)) 3 4 5 6 %s离开了ktv,唱完了 (person)) sem.release() : sem = Semaphore(4) lst = [Giao lst: p = Process(target=ktv,sem)) p.start() 运行结果如下图所示 代码分析: Semaphore 可以设置上锁的数量 同一时间最多允许几个进程上锁 创建进程的时候,是异步并发 执行任务的时候,遇到锁会变成同步程序. 事件:Event1.Event中的基本方法阻塞事件: e = Event()生成事件对象e e.wait()动态给程序加阻塞,程序当中是否加阻塞完全取决于该对象中的is_set() [默认返回值是False] 如果是True 不加阻塞 如果是False 加阻塞 set()方法 将这个属性的值改成True clear()方法 将这个属性的值改成False is_set()方法 判断当前的属性是否为True (默认上来是False) 2.Event的基本语法1.小试牛刀 e = Event() print(e.is_set()) 默认是False状态 e.wait() 程序运行中 ... ") 运行结果如下图所示 2.is_set=True e = Event() e.set() 将内部成员属性值由False -> True (e.is_set()) e.wait() ) e.clear() 将内部成员属性值由True => False 程序运行中2 ... ") 运行结果如下图所示 3.wait里加参数 e = wait参数 可以写时间 wait(3) 代表最多等待3秒钟 e.wait(3) 程序运行中3 ... ") 3.模拟经典红绿灯效果traffic_light(e): 红灯亮while True: if e.is_set(): 绿灯状态,亮1秒钟 time.sleep(1) ) e.clear() : 红灯状态,1)">绿灯亮) e.set() car(e,i): not False => True => 目前是红灯,小车在等待 if not e.is_set(): car%s 在等待 (i)) 加阻塞 e.wait() car%s 通行了 (i)) 不关红绿灯,一直跑 """ if __name__ == "__main__": e = Event() # 创建交通灯对象 p1 = Process(target=traffic_light,args=(e,)) p1.start() # 创建车对象 for i in range(1,21): time.sleep(random.randrange(0,2)) # 0 1 p2 = Process(target=car,i)) p2.start() """ 当所有小车都跑完之后,把红绿灯收拾起来,省电 : lst = [] e = Event() 创建交通灯对象 p1 = Process(target=traffic_light,1)">(e,)) 设置红绿灯为守护进程 p1.daemon = True p1.start() 创建车对象 in range(1,21): time.sleep(random.randrange(0,2)) 0 1 p2 = Process(target=car,i)) p2.start() lst.append(p2) 让所有的小车都通行之后,在结束交通灯 lst: i.join() 程序结束 ... ") 运行结果如下图所示 那么怎样才能做到,当20辆车已经通行之后,停止红绿灯的交替闪烁呢? True p1.start() ") 运行结果如下图所示 代码分析: 1.设置p1交通信号灯为deamon守护进程,当主进程结束,守护进程-红绿灯进程也结束 2.i.join:当所有小车都通行之后,再结束交通信号灯进程 进程队列:Queue什么是队列?? 队列特点: 先进先出,后进后出 1.put() 往队列里放值q = Queue() 1.put 往队列中放值 q.put(100) q.put(101) q.put(102) 2.get() 从队列里取值2.get 从队列中取值 res = q.get() (res) res =print(res) 3.队列中如果已经没有数据了,在调用get会发生阻塞.res =print(res)
执行结果如下图所示 4.get_nowait 拿不到数据就报错get_nowait??存在系统兼容性问题[windows]好用 [linux]不好用 不推荐 res = q.get_nowait() print(res) ? 当然,我们可以使用try捕获到这个错误 try: res = q.get_nowait() (res) except queue.Empty: pass 5.设置队列的长度q2 = Queue(4) q2.put(200) q2.put(201) q2.put(202) q2.put(203) 如果超过了队列的指定长度,在继续存值会出现阻塞现象 # q2.put(204) # 超出长度会阻塞 6.put_nowait() 非阻塞版本的put,超出长度后,直接报错q = Queue(3) q.put(100) q.put(102) q.put_nowait(204) 超出队列长度,直接报错 ? ?同样,我们可以使用try捕获异常 : q2.put_nowait(205) queue.Full: pass 7.进程之间的数据共享func(q3): 2.子进程获取数据 res = q3.get() (res) 3.子进程存数据 q3.put(马生平) : q3 = Queue() p = Process(target=func,1)">(q3,)) p.start() 1.主进程添加数据 q3.put(王凡 为了等待子进程把数据放到队列中,需要加join p.join() 4.主进程获取数据 res =主程序结束 ... ") 生产者和消费者模型1.基本模型consumer(q,name): True: food = q.get() time.sleep(random.uniform(0.1,1)) %s 吃了一个%s (name,food)) 生产者模型 producer(q,name,food): in range(5): time.sleep(random.uniform(0.1,1)"> 打印生产的数据 %s 生产了 %s%s 存储生产的数据 q.put(food + str(i)) : q = Queue() 消费者 p1 = Process(target=consumer,args=(q,1)">宋云杰)) 生产者 p2 = Process(target=producer,1)">黄瓜)) p1.start() p2.start() 2.优化版消费者模型 q.get() if food is None: break time.sleep(random.uniform(0.1,1)">)) p1.start() p2.start() 在生产者生产完所有数据之后,在队列的末尾添加一个None p2.join() 添加None q.put(None) 优化版和普通版有什么不同呢? 运行结果如下图所示 JoinableQueue1.前戏上面的生产者-消费者模型只是针对于一个生产者和一个消费者 那如果是多个生产者和多个消费者呢? 2.JoinableQueue基本语法put 存储 get 获取 task_done join task_done 和 join 配合使用的 队列中 1 2 3 4 5 put 一次 内部的队列计数器加1 get 一次 通过task_done让队列计数器减1 join函数,会根据队列计数器来判断是阻塞还是放行 队列计数器 = 0,意味着放行 队列计数器 != 0,意味着阻塞 jq =JoinableQueue() jq.put(a") 向队列中存储一个a 并且队列计数器会加1 print(jq.get()) 取出队列的a并打印 jq.task_done() 通过task_done让队列计数器减1,此时队列计数器为0 jq.join() 队列计数器为0,放行,打印下面的内容 finish") 3.用JoinableQueue优化生产者-消费者模型当队列计数器减到0的时,意味着进程队列中的数据消费完毕 q.task_done() str(i)) JoinableQueue() )) p3 = Process(target=consumer,1)">李博伦)) 设置p1消费者为守护进程 p1.daemon = True p3.daemon = True p1.start() p2.start() p3.start() 把所有生产者生产的数据存放到进程队列中 为了保证消费者能够消费完所有数据,加上队列.join q.join() ") 运行结果如下图所示 Manager:进程之间共享数据work(data,lock): 1.正常写法 修改数据 data["] -= 1 lock.release() 2.使用with语法可以简化上锁和解锁两步操作 with lock: data[0] += 1 [] lock = Lock() m = Manager() data = m.dict( {":20000} ) data = m.list( [1,2,3] ) in range(50): p = Process(target=work,1)">(data,lock)) p.start() lst.append(p) 确保所有进程执行完毕之后,在向下执行,打印数据,否则报错. lst: i.join() (data) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |