python的进程与线程
? 进程、线程的含义?? 1.什么是进程?进程是指运行中的应用程序,每个进程都有自己独立的地址空间(内存空间)。比如用户点击桌面的IE浏览器,就启动了一个进程,操作系统就会为该进程分配独立的地址空间。当用户再次点击IE浏览器,又启动了一个进程,操作系统将为新的进程分配新的独立的地址空间。多进程就是“多任务”,就像使用电脑时同时打开浏览器上网、打开播放器听歌、后台还默默运行着杀毒软件一样。现代操作系统如Mac OS X,UNIX,Linux,Windows等都支持多进程,每启动一个进程,操作系统便为该进程分配一个独立的内存空间。 2.什么是线程? 线程是进程中的一个实体,是被系统独立调度和分派的基本单位。一个进程可以有一个线程,也可以有多个线程。 为什么要有多进程和多线程? 每个进程至少要干一件事,比如一个编辑器既要打字输入同时又要检测打错的拼写有时候还要区分一些关键字高亮显示,它们同属于编辑器这个进程,我们把编辑器作为一个进程,而以上这些工作就是它的子任务,如何实现他们同时工作呢?就是让每个子任务即线程短暂运行交替执行,由于它们彼此之间交替太快了,看起来就像同时运行一样。(真正的多线程需要多核CPU才能实现) 实现多进程和多线程1.多进程 linux下可使用os模块的fork()。 import os print(‘Process (%s) start...‘ % os.getpid()) # Only works on Unix/Linux/Mac: pid = os.fork() if pid == 0: print(‘I am child process (%s) and my parent is %s.‘ % (os.getpid(),os.getppid())) else: print(‘I (%s) just created a child process (%s).‘ % (os.getpid(),pid)) ? windows下可以使用multiprocessing模块 from multiprocessing import Process import os # 子进程要执行的代码 def run_proc(name): print(‘Run child process %s (%s)...‘ % (name,os.getpid())) if __name__==‘__main__‘: print(‘Parent process %s.‘ % os.getpid()) p = Process(target=run_proc,args=(‘test‘,)) print(‘Child process will start.‘) p.start() p.join() print(‘Child process end.‘) ? 创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动。 ? Pool from multiprocessing import Pool import os,time,random def long_time_task(name): print(‘Run task %s (%s)...‘ % (name,os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print(‘Task %s runs %0.2f seconds.‘ % (name,(end - start))) if __name__==‘__main__‘: print(‘Parent process %s.‘ % os.getpid()) p = Pool(4) for i in range(5): p.apply_async(long_time_task,args=(i,)) print(‘Waiting for all subprocesses done...‘) p.close() p.join() print(‘All subprocesses done.‘) ? 对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。 ? 子进程 subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。 下面的例子演示了如何在Python代码中运行命令nslookup www.python.org,这和命令行直接运行的效果是一样的: import subprocess print(‘$ nslookup www.python.org‘) r = subprocess.call([‘nslookup‘,‘www.python.org‘]) print(‘Exit code:‘,r) ? 2.多线程使用threading模块实现多线程,Python的线程是真正的Posix Thread,而不是模拟出来的线程。 import time,threading def loop(): print(‘线程 %s 在运行‘ % threading.current_thread().name) n = 0 while n < 5: n = n + 1 print(‘线程 %s >>> %s‘ % (threading.current_thread().name,n)) time.sleep(1) print(‘线程 %s 结束.‘ % threading.current_thread().name) print(‘线程 %s 在运行‘ % threading.current_thread().name) t = threading.Thread(target=loop,name=‘子线程1‘) t2 = threading.Thread(target=loop,name=‘子线程2‘) t.start() t2.start() t.join() t2.join() print(‘线程 %s 结束.‘ % threading.current_thread().name) ? 或者 import time,threading ? 进程之间和线程之间的相互协调1.进程间的通信:Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。 以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据: from multiprocessing import Process,Queue import os,random # 写数据进程执行的代码: def write(q): print(‘Process to write: %s‘ % os.getpid()) for value in [‘A‘,‘B‘,‘C‘]: print(‘Put %s to queue...‘ % value) q.put(value) time.sleep(random.random()) # 读数据进程执行的代码: def read(q): print(‘Process to read: %s‘ % os.getpid()) while True: value = q.get(True) print(‘Get %s from queue.‘ % value) if __name__==‘__main__‘: # 父进程创建Queue,并传给各个子进程: q = Queue() pw = Process(target=write,args=(q,)) pr = Process(target=read,)) # 启动子进程pw,写入: pw.start() # 启动子进程pr,读取: pr.start() # 等待pw结束: pw.join() # pr进程里是死循环,无法等待其结束,只能强行终止: pr.terminate() ? 在Unix/Linux下,multiprocessing模块封装了fork()调用,使我们不需要关注fork()的细节。由于Windows没有fork调用,因此,multiprocessing需要“模拟”出fork的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,所有,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了。 2.线程间通信1.Queue使用线程队列有一个要注意的问题是,向队列中添加数据项时并不会复制此数据项,线程间通信实际上是在线程间传递对象引用。如果你担心对象的共享状态,那你最好只传递不可修改的数据结构(如:整型、字符串或者元组)或者一个对象的深拷贝。 Queue 对象提供一些在当前上下文很有用的附加特性。比如在创建 Queue 对象时提供可选的 size 参数来限制可以添加到队列中的元素数量。对于“生产者”与“消费者”速度有差异的情况,为队列中的元素数量添加上限是有意义的。比如,一个“生产者”产生项目的速度比“消费者”“消费”的速度快,那么使用固定大小的队列就可以在队列已满的时候阻塞队列,以免未预期的连锁效应扩散整个程序造成死锁或者程序运行失常。在通信的线程之间进行“流量控制”是一个看起来容易实现起来困难的问题。如果你发现自己曾经试图通过摆弄队列大小来解决一个问题,这也许就标志着你的程序可能存在脆弱设计或者固有的可伸缩问题。 get() 和 put() 方法都支持非阻塞方式和设定超时。 import queue ? 最后,有 q.qsize() , q.full() , q.empty() 等实用方法可以获取一个队列的当前大小和状态。但要注意,这些方法都不是线程安全的。可能你对一个队列使用empty() 判断出这个队列为空,但同时另外一个线程可能已经向这个队列中插入一个数据项。所以,你最好不要在你的代码中使用这些方法。 为了避免出现死锁的情况,使用锁机制的程序应该设定为每个线程一次只允许获取一个锁。如果不能这样做的话,你就需要更高级的死锁避免机制。在 threading 库中还提供了其他的同步原语,比如 RLock 和 Semaphore 对象。 ? Queue提供的方法: task_done() 意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。 如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。 join() 阻塞调用线程,直到队列中的所有任务被处理掉。 只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。 put(item[,block[,timeout]]) 将item放入队列中。 其非阻塞版本为put_nowait等同于put(item,False) get([block[,timeout]]) 从队列中移除并返回一个数据。block跟timeout参数同put方法 其非阻塞方法为get_nowait()相当与get(False) empty() 如果队列为空,返回True,反之返回False ? 2.同步机制Event线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用 threading 库中的 Event 对象。 Event 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在初始情况下,event 对象中的信号标志被设置假。如果有线程等待一个 event 对象,而这个 event 对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个 event 对象的信号标志设置为真,它将唤醒所有等待个 event 对象的线程。如果一个线程等待一个已经被设置为真的 event 对象,那么它将忽略这个事件,继续执行。 from threading import Thread,Event import time def countdown(n,start_evt): print(‘countdown is starting...‘) start_evt.set() while n > 0: print(‘T-minus‘,n) n -= 1 time.sleep(5) start_evt = Event() # 可通过Event 判断线程的是否已运行 t = Thread(target=countdown,args=(10,start_evt)) t.start() print(‘launching countdown...‘) start_evt.wait() # 等待countdown执行 # event 对象的一个重要特点是当它被设置为真时会唤醒所有等待它的线程 print(‘countdown is running...‘) ? Semaphore(信号量) from threading import Semaphore,Lock,RLock,Condition,Event,Thread import time # 信号量 sema = Semaphore(3) #限制同时能访问资源的数量为3 def foo(tid): with sema: print(‘{} acquire sema‘.format(tid)) time.sleep(1) print(‘{} release sema‘.format(tid)) threads = [] for i in range(5): t = Thread(target=foo,args=(i,)) threads.append(t) t.start() for t in threads: t.join() ? Lock(锁) 互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。 #创建锁 mutex = threading.Lock() #锁定 mutex.acquire([timeout]) #释放 mutex.release() RLock(可重入锁) import threading import time class MyThread(threading.Thread): def run(self): global num time.sleep(1) if mutex.acquire(1): num = num+1 msg = self.name+‘ set num to ‘+str(num) print msg mutex.acquire() mutex.release() mutex.release() ? Condition(条件变量) 可以认为Condition对象维护了一个锁(Lock/RLock)和一个waiting池。线程通过acquire获得Condition对象,当调用wait方法时,线程会释放Condition内部的锁并进入blocked状态,同时在waiting池中记录这个线程。当调用notify方法时,Condition对象会从waiting池中挑选一个线程,通知其调用acquire方法尝试取到锁。 Condition对象的构造函数可以接受一个Lock/RLock对象作为参数,如果没有指定,则Condition对象会在内部自行创建一个RLock。 除了notify方法外,Condition对象还提供了notifyAll方法,可以通知waiting池中的所有线程尝试acquire内部锁。由于上述机制,处于waiting状态的线程只能通过notify方法唤醒,所以notifyAll的作用在于防止有线程永远处于沉默状态。 import threading import time class Producer: def run(self): global count while True: if con.acquire(): if count > 1000: con.wait() else: count += 100 msg = threading.current_thread().name + ‘ produce 100,count=‘ + str(count) print(msg) con.notify() # 通知 waiting线程池中的线程 con.release() time.sleep(1) count = 0 con = threading.Condition() class Consumer: def run(self): global count while True: if con.acquire(): if count < 100: con.wait() else: count -= 3 msg = threading.current_thread().name + ‘ consumer 3,count=‘ + str(count) print(msg) con.notify() con.release() time.sleep(3) producer = Producer() ? 进程和线程的比较1.稳定性 多进程模式最大的优点就是稳定性高,因为一个子进程崩溃了它拥有自己独立的内存空间,不会影响主进程和其他子进程(主进程崩掉,子进程也难逃厄运)。多进程模式的缺点是创建进程的代价大,在Unix/Linux系统下,用fork调用还行,在Windows下创建进程开销巨大。另外,操作系统能同时运行的进程数也是有限的,在内存和CPU的限制下,如果有几千个进程同时运行,操作系统连调度都会成问题。 2.切换开销 首先上下文切换就是从当前执行任务切换到另一个任务执行的过程。但是,为了确保下次能从正确的位置继续执行,在切换之前,会保存上一个任务的状态。 3.计算密集型和IO密集型下的选择 我们可以把任务分为计算密集型和IO密集型。 ?
? (1)需要频繁创建销毁的优先用线程 原因请看上面的对比。 这种原则最常见的应用就是Web服务器了,来一个连接建立一个线程,断了就销毁线程,要是用进程,创建和销毁的代价是很难承受的 (2)需要进行大量计算的优先使用线程 所谓大量计算,当然就是要耗费很多CPU,切换频繁了,这种情况下线程是最合适的。 这种原则最常见的是图像处理、算法处理。 (3)强相关的处理用线程,弱相关的处理用进程 什么叫强相关、弱相关?理论上很难定义,给个简单的例子就明白了。 一般的Server需要完成如下任务:消息收发、消息处理。“消息收发”和“消息处理”就是弱相关的任务,而“消息处理”里面可能又分为“消息解码”、“业务处理”,这两个任务相对来说相关性就要强多了。因此“消息收发”和“消息处理”可以分进程设计,“消息解码”、“业务处理”可以分线程设计。 当然这种划分方式不是一成不变的,也可以根据实际情况进行调整。 (4)可能要扩展到多机分布的用进程,多核分布的用线程 原因请看上面对比。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |