Python协程的用法和例子详解
从句法上看,协程与生成器类似,都是定义体中包含 yield 关键字的函数。可是,在协程中, yield 通常出现在表达式的右边(例如, datum = yield),可以产出值,也可以不产出 ―― 如果 yield 关键字后面没有表达式,那么生成器产出 None。 协程可能会从调用方接收数据,不过调用方把数据提供给协程使用的是 .send(datum) 方法,而不是next(…) 函数。 ==yield 关键字甚至还可以不接收或传出数据。不管数据如何流动, yield 都是一种流程控制工具,使用它可以实现协作式多任务:协程可以把控制器让步给中心调度程序,从而激活其他的协程==。 协程的生成器的基本行为 这里有一个最简单的协程代码: def simple_coroutine(): print('-> start') x = yield print('-> recived',x) sc = simple_coroutine() next(sc) sc.send('zhexiao') 解释: 1. 协程使用生成器函数定义:定义体中有 yield 关键字。 ==注意:send方法只有当协程处于 GEN_SUSPENDED 状态下时才会运作,所以我们使用 next() 方法激活协程到 yield 表达式处停止,或者我们也可以使用 sc.send(None),效果与 next(sc) 一样==。 协程的四个状态: 协程可以身处四个状态中的一个。当前状态可以使用inspect.getgeneratorstate(…) 函数确定,该函数会返回下述字符串中的一个: ==最先调用 next(sc) 函数这一步通常称为“预激”(prime)协程==(即,让协程向前执行到第一个 yield 表达式,准备好作为活跃的协程使用)。 import inspect def simple_coroutine(a): print('-> start') b = yield a print('-> recived',a,b) c = yield a + b print('-> recived',b,c) # run sc = simple_coroutine(5) next(sc) sc.send(6) # 5,6 sc.send(7) # 5,6,7 示例:使用协程计算移动平均值 def averager(): total = 0.0 count = 0 avg = None while True: num = yield avg total += num count += 1 avg = total/count # run ag = averager() # 预激协程 print(next(ag)) # None print(ag.send(10)) # 10 print(ag.send(20)) # 15 解释: 1. 调用 next(ag) 函数后,协程会向前执行到 yield 表达式,产出 average 变量的初始值――None。 终止协程和异常处理 协程中未处理的异常会向上冒泡,传给 next 函数或 send 方法的调用方(即触发协程的对象)。 ==终止协程的一种方式:发送某个哨符值,让协程退出。内置的 None 和Ellipsis 等常量经常用作哨符值==。 显式地把异常发给协程 从 Python 2.5 开始,客户代码可以在生成器对象上调用两个方法,显式地把异常发给协程。 generator.throw(exc_type[,exc_value[,traceback]]) 致使生成器在暂停的 yield 表达式处抛出指定的异常。如果生成器处理了抛出的异常,代码会向前执行到下一个 yield 表达式,而产出的值会成为调用 generator.throw方法得到的返回值。如果生成器没有处理抛出的异常,异常会向上冒泡,传到调用方的上下文中。 generator.close() 致使生成器在暂停的 yield 表达式处抛出 GeneratorExit 异常。如果生成器没有处理这个异常,或者抛出了 StopIteration 异常(通常是指运行到结尾),调用方不会报错。如果收到 GeneratorExit 异常,生成器一定不能产出值,否则解释器会抛出RuntimeError 异常。生成器抛出的其他异常会向上冒泡,传给调用方。 异常处理示例: class DemoException(Exception): """ custom exception """ def handle_exception(): print('-> start') while True: try: x = yield except DemoException: print('-> run demo exception') else: print('-> recived x:',x) raise RuntimeError('this line should never run') he = handle_exception() next(he) he.send(10) # recived x: 10 he.send(20) # recived x: 20 he.throw(DemoException) # run demo exception he.send(40) # recived x: 40 he.close() 如果传入无法处理的异常,则协程会终止: he.throw(Exception) # run demo exception yield from获取协程的返回值 为了得到返回值,协程必须正常终止;然后生成器对象会抛出StopIteration 异常,异常对象的 value 属性保存着返回的值。 ==yield from 结构会在内部自动捕获 StopIteration 异常==。对 yield from 结构来说,解释器不仅会捕获 StopIteration 异常,还会把value 属性的值变成 yield from 表达式的值。 yield from基本用法 ==在生成器 gen 中使用 yield from subgen() 时, subgen 会获得控制权,把产出的值传给 gen 的调用方,即调用方可以直接控制 subgen。与此同时, gen 会阻塞,等待 subgen 终止==。 下面2个函数的作用一样,只是使用了 yield from 的更加简洁: def gen(): for c in 'AB': yield c print(list(gen())) def gen_new(): yield from 'AB' print(list(gen_new())) ==yield from x 表达式对 x 对象所做的第一件事是,调用 iter(x),从中获取迭代器,因此, x 可以是任何可迭代的对象,这只是 yield from 最基础的用法==。 yield from高级用法 ==yield from 的主要功能是打开双向通道,把最外层的调用方与最内层的子生成器连接起来,这样二者可以直接发送和产出值,还可以直接传入异常,而不用在位于中间的协程中添加大量处理异常的样板代码==。 yield from 专门的术语 委派生成器:包含 yield from 表达式的生成器函数。 图示 解释: 1. 委派生成器在 yield from 表达式处暂停时,调用方可以直接把数据发给子生成器。 高级示例 from collections import namedtuple ResClass = namedtuple('Res','count average') # 子生成器 def averager(): total = 0.0 count = 0 average = None while True: term = yield if term is None: break total += term count += 1 average = total / count return ResClass(count,average) # 委派生成器 def grouper(storages,key): while True: # 获取averager()返回的值 storages[key] = yield from averager() # 客户端代码 def client(): process_data = { 'boys_2': [39.0,40.8,43.2,43.1,38.6,41.4,40.6,36.3],'boys_1': [1.38,1.5,1.32,1.25,1.37,1.48,1.49,1.46] } storages = {} for k,v in process_data.items(): # 获得协程 coroutine = grouper(storages,k) # 预激协程 next(coroutine) # 发送数据到协程 for dt in v: coroutine.send(dt) # 终止协程 coroutine.send(None) print(storages) # run client() 解释: 1. 外层 for 循环每次迭代会新建一个 grouper 实例,赋值给 coroutine 变量; grouper 是委派生成器。 2. 调用 next(coroutine),预激委派生成器 grouper,此时进入 while True 循环,调用子生成器 averager 后,在 yield from 表达式处暂停。 yield from的意义
使用案例 协程能自然地表述很多算法,例如仿真、游戏、异步 I/O,以及其他事件驱动型编程形式或协作式多任务。协程是 asyncio 包的基础构建。通过仿真系统能说明如何使用协程代替线程实现并发的活动。 在仿真领域,进程这个术语指代模型中某个实体的活动,与操作系统中的进程无关。仿真系统中的一个进程可以使用操作系统中的一个进程实现,但是通常会使用一个线程或一个协程实现。 出租车示例 import collections # time 字段是事件发生时的仿真时间, # proc 字段是出租车进程实例的编号, # action 字段是描述活动的字符串。 Event = collections.namedtuple('Event','time proc action') def taxi_process(proc_num,trips_num,start_time=0): """ 每次改变状态时创建事件,把控制权让给仿真器 :param proc_num: :param trips_num: :param start_time: :return: """ time = yield Event(start_time,proc_num,'leave garage') for i in range(trips_num): time = yield Event(time,'pick up people') time = yield Event(time,'drop off people') yield Event(time,'go home') # run t1 = taxi_process(1,1) a = next(t1) print(a) # Event(time=0,proc=1,action='leave garage') b = t1.send(a.time + 6) print(b) # Event(time=6,action='pick up people') c = t1.send(b.time + 12) print(c) # Event(time=18,action='drop off people') d = t1.send(c.time + 1) print(d) # Event(time=19,action='go home') 模拟控制台控制3个出租车异步 import collections import queue import random # time 字段是事件发生时的仿真时间, # proc 字段是出租车进程实例的编号, # action 字段是描述活动的字符串。 Event = collections.namedtuple('Event','go home') class SimulateTaxi(object): """ 模拟出租车控制台 """ def __init__(self,proc_map): # 保存排定事件的 PriorityQueue 对象, # 如果进来的是tuple类型,则默认使用tuple[0]做排序 self.events = queue.PriorityQueue() # procs_map 参数是一个字典,使用dict构建本地副本 self.procs = dict(proc_map) def run(self,end_time): """ 排定并显示事件,直到时间结束 :param end_time: :return: """ for _,taxi_gen in self.procs.items(): leave_evt = next(taxi_gen) self.events.put(leave_evt) # 仿真系统的主循环 simulate_time = 0 while simulate_time < end_time: if self.events.empty(): print('*** end of events ***') break # 第一个事件的发生 current_evt = self.events.get() simulate_time,action = current_evt print('taxi:',',at time:',simulate_time,action) # 准备下个事件的发生 proc_gen = self.procs[proc_num] next_simulate_time = simulate_time + self.compute_duration() try: next_evt = proc_gen.send(next_simulate_time) except StopIteration: del self.procs[proc_num] else: self.events.put(next_evt) else: msg = '*** end of simulation time: {} events pending ***' print(msg.format(self.events.qsize())) @staticmethod def compute_duration(): """ 随机产生下个事件发生的时间 :return: """ duration_time = random.randint(1,20) return duration_time # 生成3个出租车,现在全部都没有离开garage taxis = {i: taxi_process(i,(i + 1) * 2,i * 5) for i in range(3)} # 模拟运行 st = SimulateTaxi(taxis) st.run(100) 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持编程小技巧。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |