Python-22-并发编程
一、进程1. 什么是进程狭义定义:进程是正在运行的程序的实例(an instance of a computer program that is being executed)。 2. 并发与并行无论是并行还是并发,在用户看来都是‘同时‘运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务
3. 同步与异步同步就是当一个进程发起一个函数(任务)调用的时候,一直等到函数(任务)完成,而进程继续处于激活状态。
4. 阻塞与非阻塞阻塞是当请求不能满足的时候就将进程挂起,而非阻塞则不会阻塞当前进程
5. multiprocessing模块python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。 multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。 multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。 需要强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。 ?Process([group [,target [,name [,args [,kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动) group参数未使用,值始终为None target表示调用对象,即子进程要执行的任务 args表示调用对象的位置参数元组,args=(1,2,‘egon‘,) kwargs表示调用对象的字典,kwargs={‘name‘:‘egon‘,‘age‘:18} name为子进程的名称 p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置 p.name:进程的名称 p.pid:进程的pid p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可) p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可) 常用方法: p.start():启动进程,并调用该子进程中的p.run() p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁 p.is_alive():如果p仍然运行,返回True p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程 创建子进程的两种方式: import multiprocessing def foo(name): print(name) if __name__ == ‘__main__‘: p1 = multiprocessing.Process(target=foo,args=(‘alex‘,)) p2 = multiprocessing.Process(target=foo,args=(‘Tom‘,)) p1.start() p2.start() import multiprocessing class MyProcess(multiprocessing.Process): def __init__(self,name): super().__init__() self.name = name def run(self): print(self.pid,self.name) if __name__ == ‘__main__‘: p1 = MyProcess(‘alex‘) p2 = MyProcess(‘Tom‘) p1.start() p2.start() join() from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print(‘%s is piaoing‘ %self.name) time.sleep(random.randrange(1,3)) print(‘%s is piao end‘ %self.name) p=Piao(‘egon‘) p.start() p.join(0.0001) #等待p停止,等0.0001秒就不再等了 print(‘开始‘) join:主进程等,等待子进程结束吧 6. 守护进程主进程创建守护进程 其一:守护进程会在主进程代码执行结束后就终止 其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children 注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止 from multiprocessing import Process import time import random class Foo(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print(self.name) time.sleep(random.randrange(1,3)) print(‘%s ending‘ % self.name) p=Foo(‘test‘) p.daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行 p.start() print(‘主‘) 7. 进程同步(锁)进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的, 而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理 #文件db的内容为:{"count":1} #注意一定要用双引号,不然json无法识别 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open(‘db.txt‘)) print(‘ 33[43m剩余票数%s 33[0m‘ %dic[‘count‘]) def get(): dic=json.load(open(‘db.txt‘)) time.sleep(0.1) #模拟读数据的网络延迟 if dic[‘count‘] >0: dic[‘count‘]-=1 time.sleep(0.2) #模拟写数据的网络延迟 json.dump(dic,open(‘db.txt‘,‘w‘)) print(‘ 33[43m购票成功 33[0m‘) def task(lock): search() lock.acquire() get() lock.release() if __name__ == ‘__main__‘: lock=Lock() for i in range(100): #模拟并发100个客户端抢票 p=Process(target=task,args=(lock,)) p.start() 加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。 虽然可以用文件共享数据实现进程间通信,但问题是: 1.效率低(共享数据基于文件,而文件是硬盘上的数据) 2.需要自己加锁处理 ?因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。 8. 队列q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。 ‘‘‘ multiprocessing模块支持进程间通信的两种主要形式:管道和队列 都是基于消息传递实现的,但是队列接口 ‘‘‘ from multiprocessing import Process,Queue import time q=Queue(3) 9. 管道创建管道: Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道 dumplex:默认管道是全双工的,如果将duplex设成False,conn1只能用于接收,conn2只能用于发送。 主要方法: conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。 conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象 其他方法: conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法 conn1.fileno():返回连接使用的整数文件描述符 conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。 10. 进程池创建进程池: Pool([numprocess [,initializer [,initargs]]])? ?创建进程池? 参数: numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值 initializer:是每个工作进程启动时要执行的可调用对象,默认为None initargs:是要传给initializer的参数组 主要方法: p.apply(func [,kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。 其他方法: 方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法 obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。 obj.ready():如果调用完成,返回True obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常 obj.wait([timeout]):等待结果变为可用。 obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数 1 from multiprocessing import Pool 2 import os,time 3 def work(n): 4 print(‘%s run‘ %os.getpid()) 5 time.sleep(3) 6 return n**2 7 8 if __name__ == ‘__main__‘: 9 p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务 10 res_l=[] 11 for i in range(10): 12 res=p.apply(work,args=(i,)) #同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞,但不管该任务是否存在阻塞,同步调用都会在原地等着,只是等的过程中若是任务发生了阻塞就会被夺走cpu的执行权限 13 res_l.append(res) 14 print(res_l) 1 from multiprocessing import Pool 2 import os,以后一直是这三个进程在执行任务 10 res_l=[] 11 for i in range(10): 12 res=p.apply_async(work,)) #同步运行,阻塞、直到本次任务执行完毕拿到res 13 res_l.append(res) 14 15 #异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了 16 p.close() 17 p.join() 18 for res in res_l: 19 print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get 二、线程1. 什么是线程线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。一个线程是一个execution context(执行上下文),即一个cpu执行时所需要的一串指令。 2. 进程与线程的区别
3. threading模块相关方法
? threading.currentThread(): 返回当前的线程变量。 threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。 threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
创建线程
import threading import time def music(): print(‘begin to listen %s‘ % time.strftime(‘%Y-%m-%d %X‘,time.localtime())) time.sleep(5) print(‘stop to listen %s‘ % time.strftime(‘%Y-%m-%d %X‘,time.localtime())) def game(): print(‘begin to play %s‘ % time.strftime(‘%Y-%m-%d %X‘,time.localtime())) time.sleep(3) print(‘stop to play %s‘ % time.strftime(‘%Y-%m-%d %X‘,time.localtime())) if __name__ == ‘__main__‘: t1 = threading.Thread(target=music) t1.start() t2 = threading.Thread(target=game) t2.start() # begin to listen 2019-07-20 13:35:57 # begin to play 2019-07-20 13:35:57 # stop to play 2019-07-20 13:36:00 # stop to listen 2019-07-20 13:36:02
import threading class Mythread(threading.Thread): def __init__(self,name): super().__init__() self.name = name def run(self): print(self.name) if __name__ == ‘__main__‘: t1 = Mythread(‘t1‘) t1.start() join()的用法 import threading import time def music(): print(‘begin to listen %s‘ % time.strftime(‘%Y-%m-%d %X‘,time.localtime())) if __name__ == ‘__main__‘: t1 = threading.Thread(target=music) t2 = threading.Thread(target=game) t1.start() t2.start() # t1.join() t2.join() print(‘end‘) begin to listen 2019-07-20 13:43:28 begin to play 2019-07-20 13:43:28 stop to play 2019-07-20 13:43:31 end stop to listen 2019-07-20 13:43:33 4. 守护线程无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁 需要强调的是:运行完毕并非终止运行 对主进程来说,运行完毕指的是主进程代码运行完毕
对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束
from threading import Thread import time def sayhi(name): time.sleep(2) print(‘%s say hello‘ %name) if __name__ == ‘__main__‘: t=Thread(target=sayhi,args=(‘egon‘,)) t.setDaemon(True) #必须在t.start()之前设置 t.start() print(‘主线程‘) print(t.is_alive()) ‘‘‘ 主线程 True ‘‘‘ 5. GILGIL本质就是一把互斥锁,既然是互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。 GIL保护的是解释器级的数据 所有线程的任务,都需要将任务的代码当做参数传给解释器的代码去执行,即所有的线程要想运行自己的任务,首先需要解决的是能够访问到解释器的代码。 ? 如果多个线程的target=work,那么执行流程是多个线程先访问到解释器的代码,即拿到执行权限,然后将target的代码交给解释器的代码去执行 解释器的代码是所有线程共享的,所以垃圾回收线程也可能访问到解释器的代码而去执行,这就导致了一个问题:对于同一个数据100,可能线程1执行x=100的同时,而垃圾回收执行的是回收100的操作,解决这种问题没有什么高明的方法,就是加锁处理,如下图的GIL,保证python解释器同一时间只能执行一个任务的代码 有了GIL的存在,同一时刻同一进程中只有一个线程被执行 #分析: 我们有四个任务需要处理,处理方式肯定是要玩出并发的效果,解决方案可以是: 方案一:开启四个进程 方案二:一个进程下,开启四个线程 #单核情况下,分析结果: 如果四个任务是计算密集型,没有多核来并行计算,方案一徒增了创建进程的开销,方案二胜 如果四个任务是I/O密集型,方案一创建进程的开销大,且进程的切换速度远不如线程,方案二胜 #多核情况下,分析结果: 如果四个任务是计算密集型,多核意味着并行计算,在python中一个进程中同一时刻只有一个线程执行用不上多核,方案一胜 如果四个任务是I/O密集型,再多的核也解决不了I/O问题,方案二胜 #结论:现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。 1 from multiprocessing import Process 2 from threading import Thread 3 import os,time 4 def work(): 5 res=0 6 for i in range(100000000): 7 res*=i 8 9 10 if __name__ == ‘__main__‘: 11 l=[] 12 print(os.cpu_count()) #本机为4核 13 start=time.time() 14 for i in range(4): 15 p=Process(target=work) #耗时5s多 16 p=Thread(target=work) #耗时18s多 17 l.append(p) 18 p.start() 19 for p in l: 20 p.join() 21 stop=time.time() 22 print(‘run time is %s‘ %(stop-start)) 23 24 计算密集型:多进程效率高 1 from multiprocessing import Process 2 from threading import Thread 3 import threading 4 import os,time 5 def work(): 6 time.sleep(2) 7 print(‘===>‘) 8 9 if __name__ == ‘__main__‘: 10 l=[] 11 print(os.cpu_count()) #本机为4核 12 start=time.time() 13 for i in range(400): 14 # p=Process(target=work) #耗时12s多,大部分时间耗费在创建进程上 15 p=Thread(target=work) #耗时2s多 16 l.append(p) 17 p.start() 18 for p in l: 19 p.join() 20 stop=time.time() 21 print(‘run time is %s‘ %(stop-start)) 22 23 I/O密集型:多线程效率高 6. 同步锁锁的目的是为了保护共享的数据,同一时间只能有一个线程来修改共享的数据保护不同的数据就应该加不同的锁。 GIL 与Lock是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),后者是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自定义加锁处理,即Lock 过程分析:所有线程抢的是GIL锁,或者说所有线程抢的是执行权限 线程1抢到GIL锁,拿到执行权限,开始执行,然后加了一把Lock,还没有执行完毕,即线程1还未释放Lock,有可能线程2抢到GIL锁,开始执行,执行过程中发现Lock还没有被线程1释放,于是线程2进入阻塞,被夺走执行权限,有可能线程1拿到GIL,然后正常执行到释放Lock。。。这就导致了串行运行的效果 锁的使用: import threading R=threading.Lock() R.acquire() ‘‘‘ 对公共数据的操作 ‘‘‘ R.release() #不加锁:并发执行,速度快,数据不安全 from threading import current_thread,Thread,Lock import os,time def task(): global n print(‘%s is running‘ %current_thread().getName()) temp=n time.sleep(0.5) n=temp-1 if __name__ == ‘__main__‘: n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print(‘主:%s n:%s‘ %(stop_time-start_time,n)) ‘‘‘ Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:0.5216062068939209 n:99 ‘‘‘ #加锁:未加锁部分并发执行,加锁部分串行执行,速度慢,数据安全 from threading import current_thread,time def task(): #未加锁的代码并发运行 time.sleep(3) print(‘%s start to run‘ %current_thread().getName()) global n #加锁的代码串行运行 lock.acquire() temp=n time.sleep(0.5) n=temp-1 lock.release() if __name__ == ‘__main__‘: n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print(‘主:%s n:%s‘ %(stop_time-start_time,n)) ‘‘‘ Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:53.294203758239746 n:0 ‘‘‘ #有的同学可能有疑问:既然加锁会让运行变成串行,那么我在start之后立即使用join,就不用加锁了啊,也是串行的效果啊 #没错:在start之后立刻使用jion,肯定会将100个任务的执行变成串行,毫无疑问,最终n的结果也肯定是0,是安全的,但问题是 #start后立即join:任务内的所有代码都是串行执行的,而加锁,只是加锁的部分即修改共享数据的部分是串行的 #单从保证数据安全方面,二者都可以实现,但很明显是加锁的效率更高. from threading import current_thread,time def task(): time.sleep(3) print(‘%s start to run‘ %current_thread().getName()) global n temp=n time.sleep(0.5) n=temp-1 if __name__ == ‘__main__‘: n=100 lock=Lock() start_time=time.time() for i in range(100): t=Thread(target=task) t.start() t.join() stop_time=time.time() print(‘主:%s n:%s‘ %(stop_time-start_time,n)) ‘‘‘ Thread-1 start to run Thread-2 start to run ...... Thread-100 start to run 主:350.6937336921692 n:0 #耗时是多么的恐怖 ‘‘‘ 7. 死锁与递归锁所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。 解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。 这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁: mutexA=mutexB=threading.RLock() #一个线程拿到锁,counter加1,该线程内又碰到加锁的情况,则counter继续加1,这期间所有其他线程都只能等待,等待该线程释放所有锁,即counter递减到0为止 8. 信号量Semaphore管理一个内置的计数器, 实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5): from threading import Thread,Semaphore import threading import time # def func(): # if sm.acquire(): # print (threading.currentThread().getName() + ‘ get semaphore‘) # time.sleep(2) # sm.release() def func(): sm.acquire() print(‘%s get sm‘ %threading.current_thread().getName()) time.sleep(3) sm.release() if __name__ == ‘__main__‘: sm=Semaphore(5) for i in range(23): t=Thread(target=func) t.start() 9. Event线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象,而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件,继续执行。 使用方法: event.isSet():返回event的状态值; 10. 线程queue使用import queue, 用法与进程queue一样
import queue # 线程队列 q = queue.Queue(3) # 默认FIFO先进先出,3表示最多只能存3个数据 q.put(12) q.put(‘hello‘) q.put({‘name‘: ‘alex‘,‘age‘: 18}) while True: data = q.get() print(data) 运行结果: 12 hello {‘name‘: ‘alex‘,‘age‘: 18}
import queue q = queue.LifoQueue(3) q.put(12) q.put(‘hello‘) q.put({‘name‘: ‘alex‘,‘age‘: 18}) while True: data = q.get() print(data) 运行结果: {‘name‘: ‘alex‘,‘age‘: 18} hello 12
import queue q = queue.PriorityQueue(3) q.put([2,12]) q.put([1,‘hello‘]) q.put([3,{‘name‘: ‘alex‘,‘age‘: 18}]) while True: data = q.get() print(data) 运行结果: [1,‘hello‘] [2,12] [3,‘age‘: 18}] 三、协程?1. 什么是协程协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的 优点: 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级 单线程内就可以实现并发的效果,最大限度地利用cpu 缺点: 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程 2. Greenlet安装 pip3 install greenlet #顺序执行 import time def f1(): res=1 for i in range(100000000): res+=i def f2(): res=1 for i in range(100000000): res*=i start=time.time() f1() f2() stop=time.time() print(‘run time is %s‘ %(stop-start)) #10.985628366470337 #切换 from greenlet import greenlet import time def f1(): res=1 for i in range(100000000): res+=i g2.switch() def f2(): res=1 for i in range(100000000): res*=i g1.switch() start=time.time() g1=greenlet(f1) g2=greenlet(f2) g1.switch() stop=time.time() print(‘run time is %s‘ %(stop-start)) # 52.763017892837524 greenlet只是提供了一种比generator更加便捷的切换方式,当切到一个任务执行时如果遇到io,那就原地阻塞,仍然是没有解决遇到IO自动切换来提升效率的问题。 3. GeventGevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet,它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。 安装: pip3 install gevent 用法: g1=gevent.spawn(func,1,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的 g2=gevent.spawn(func2) g1.join() #等待g1结束 g2.join() #等待g2结束 #或者上述两步合作一步:gevent.joinall([g1,g2]) g1.value#拿到func1的返回值 1 from gevent import monkey;monkey.patch_all() 2 from socket import * 3 import gevent 4 5 #如果不想用money.patch_all()打补丁,可以用gevent自带的socket 6 # from gevent import socket 7 # s=socket.socket() 8 9 def server(server_ip,port): 10 s=socket(AF_INET,SOCK_STREAM) 11 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) 12 s.bind((server_ip,port)) 13 s.listen(5) 14 while True: 15 conn,addr=s.accept() 16 gevent.spawn(talk,conn,addr) 17 18 def talk(conn,addr): 19 try: 20 while True: 21 res=conn.recv(1024) 22 print(‘client %s:%s msg: %s‘ %(addr[0],addr[1],res)) 23 conn.send(res.upper()) 24 except Exception as e: 25 print(e) 26 finally: 27 conn.close() 28 29 if __name__ == ‘__main__‘: 30 server(‘127.0.0.1‘,8080) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |