线程queue、事件event及协程
线程queue、事件event及协程线程queue多线程抢占资源,让其保持串行的两种方式: ? 1、互斥锁 ? 2、队列 线程队列分为以下三种: 1、Queue(先进先出) import queue q = queue.Queue(3) q.put(1) q.put(2) q.put(3) # q.put(4,block=False) # 若不设置block参数,默认为True,大于队列长度进入阻塞状态,若设置block为False,大于对列长度直接报错 print(q.get()) print(q.get()) print(q.get()) # print(q.get(timeout=2)) 阻塞2s 还没有值直接报错 # 结果 1 2 3 2、LifoQueue(后进先出) import queue q = queue.LifoQueue(3) q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) # 结果: 3 2 1 3、PriorityQueue(优先级队列) import queue q = queue.PriorityQueue(3) q.put((-1,'awe')) # 操作对象为元祖,第一个位置的数字越小,优先级越高 q.put((2,6)) q.put((0,3)) print(q.get()) print(q.get()) print(q.get()) # 结果: (-1,'awe') (0,3) (2,6) 事件event开启两个线程,一个线程运行到中间的某个阶段,触发另个线程执行.两个线程增加了耦合性. 引入事件event的两个阶段: 版本1:(判断全局变量状态) from threading import Thread from threading import current_thread import time flag =False def check(): print(f'{current_thread().name}监测服务器是否开启') time.sleep(3) global flag flag = True print('服务器已开启') def connect(): while not flag: print(f'{current_thread().name}等待连接') time.sleep(0.5) else: print(f'{current_thread().name} 连接成功...') t1 = Thread(target=check,) t2 = Thread(target=connect,) t1.start() t2.start() # 结果: Thread-1监测服务器是否开启 Thread-2等待连接 Thread-2等待连接 Thread-2等待连接 Thread-2等待连接 Thread-2等待连接 Thread-2等待连接 服务器已开启 Thread-2 连接成功... 版本2:(事件Event) from threading import Thread from threading import current_thread from threading import Event import time event = Event() # 创建事件对象 def check(): print(f'{current_thread().name}监测服务器是否开启') time.sleep(3) print(event.is_set()) # 判断事件是否设置 event.set() # 设置事件 print(event.is_set()) print('服务器已开启') def connect(): print(f'{current_thread().name}等待连接') event.wait() # 等待事件设置,阻塞状态 print(f'{current_thread().name} 连接成功...') t1 = Thread(target=check,) t1.start() t2.start() 小练习: 将上述例子改为一个线程监测服务器状态,另一个线程判断服务器状态,如果服务器状态开启,则显示连接成功,此线程每1秒尝试连接服务器一次,一共连接3次,还没连接成功,则显示连接失败 from threading import Thread from threading import current_thread from threading import Event import time event = Event() # 创建事件对象 def check(): print(f'{current_thread().name}监测服务器是否开启') time.sleep(3) event.set() # 设置事件 print('服务器已开启') def connect(): count = 1 while 1: print(f'{current_thread().name}等待连接') event.wait(1) # 等待事件设置,阻塞状态 if count == 4: print(f'{current_thread().name}连接成功') count += 1 print(f'{current_thread().name}尝试连接{count}次...') else: print(f'{current_thread().name}连接成功') t1 = Thread(target=check,) t1.start() t2.start() 协程协程:简单的来说就是一个线程并发的处理任务. 串行:一个线程执行一个任务,执行完毕之后,执行下一个任务. 并行: 多个cpu执行多个任务,4个cpu 执行4个任务. 并发: 一个cpu执行多个任务,看起来像是同时运行. 并发真正的核心:切换并且保持状态. 多线程的并发: 3个线程处理10个任务,如果线程1处理的这个任务,遇到阻塞,cpu被操作系统切换到另一个线程, 一个线程并发处理任务:以一个线程执行3个任务为例: 协程定义:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。 单个cpu并发执行10个任务的三种方式: ? 1、方式一:开启多进程并发执行,操作系统切换+保持状态. ? 2、方式二:开启多线程并发执行,操作系统切换+保持状态. ? 3、方式三:开启协程并发的执行,自己的程序 把控着cpu 在3个任务之间来回切换+保持状态. 以上三种实现方式,协程最好,这是因为: ? 1.协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级 ? 2.协程的运行速度更快 ? 3.协程会长期霸占cpu只执行我程序里面的所有任务. 协程的特点:
GreenletGreenlet是python中的一个第三方模块,真正的协程模块就是使用greenlet完成的切换 并发的两个核心:切换并且保持状态.接下来我们从一个例子慢慢引入此模块的用法 # 版本一:单切换 def func1(): print('in func1') def func2(): print('in func2') func1() print('end') func2() # 版本二:切换+保持状态 import time def gen(): while 1: yield 1 time.sleep(0.5) # 手动设置IO,遇到IO无法自动切换 def func(): obj = gen() for i in range(10): next(obj) func() # 版本三:切换+保持状态,遇到IO自动切换 from greenlet import greenlet import time def eat(name): print('%s eat 1' %name) # 2 g2.switch('taibai') # 3 time.sleep(3) print('%s eat 2' %name) # 6 g2.switch() # 7 def play(name): print('%s play 1' %name) # 4 g1.switch() # 5 print('%s play 2' %name) # 8 g1=greenlet(eat) g2=greenlet(play) g1.switch('taibai') # 1 切换到eat任务 协程模块geventgevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是greenlet,它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。 # gevent模块的几个用法 # 用法: g1=gevent.spawn(func,1,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的,spawn是异步提交任务 g2=gevent.spawn(func2) g1.join() # 等待g1结束 g2.join() # 等待g2结束 有人测试的时候会发现,不写第二个join也能执行g2,是的,协程帮你切换执行了,但是你会发现,如果g2里面的任务执行的时间长,但是不写join的话,就不会执行完等到g2剩下的任务了 # 或者上述两步合作一步:gevent.joinall([g1,g2]) 使用time.sleep模拟程序中遇到的阻塞: import gevent import time from threading import current_thread def eat(name): print('%s eat 1' %name) print(current_thread().name) # gevent.sleep(2) time.sleep(2) print('%s eat 2' %name) def play(name): print('%s play 1' %name) print(current_thread().name) # gevent.sleep(1) # gevent.sleep(1)模拟的是gevent可以识别的io阻塞 time.sleep(1) # time.sleep(1)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了 print('%s play 2' %name) g1 = gevent.spawn(eat,'egon') g2 = gevent.spawn(play,name='egon') print(f'主{current_thread().name}') g1.join() g2.join() # 结果: 主MainThread egon eat 1 MainThread egon eat 2 egon play 1 MainThread egon play 2 最终版本: import gevent from gevent import monkey monkey.patch_all() # 打补丁: 将下面的所有的任务的阻塞都打上标记 def eat(name): print('%s eat 1' %name) time.sleep(2) print('%s eat 2' %name) def play(name): print('%s play 1' %name) time.sleep(1) print('%s play 2' %name) g1 = gevent.spawn(eat,name='egon') # g1.join() # g2.join() gevent.joinall([g1,g2]) # 结果: egon eat 1 egon play 1 egon play 2 egon eat 2 负载均衡:就是指将负载(工作任务)进行平衡、分摊到多个操作单元上进行运行 Nginx:Nginx是一款轻量级的Web服务器/反向代理服务器及电子邮件(IMAP/POP3)代理服务器,其特点是占有内存少,并发能力强。 一般在工作中我们都是进程+线程+协程的方式来实现并发,以达到最好的并发效果,如果是4核的cpu,一般起5个进程,每个进程中20个线程(5倍cpu数量),每个线程可以起500个协程,大规模爬取页面的时候,等待网络延迟的时间的时候,我们就可以用协程去实现并发。 并发数量 = 5 * 20 * 500 = 50000个并发,这是一般一个4cpu的机器最大的并发数。nginx在负载均衡的时候最大承载量就是5w个 单线程里的这20个任务的代码通常会既有计算操作又有阻塞操作,我们完全可以在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2。。。。如此,才能提高效率,这就用到了Gevent模块。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |