Python内置库:threading(多线程)
Python的线程操作在旧版本中使用的是thread模块,在Python27和Python3中引入了threading模块,同时thread模块在Python3中改名为_thread模块,threading模块相较于thread模块,对于线程的操作更加的丰富,而且threading模块本身也是相当于对thread模块的进一步封装而成,thread模块有的功能threading模块也都有,所以涉及到多线程的操作,推荐使用threading模块。 threading模块中包含了关于线程操作的丰富功能,包括:常用线程函数,线程对象,锁对象,递归锁对象,事件对象,条件变量对象,信号量对象,定时器对象,栅栏对象。 注:本文使用的Python版本是Python3.6 ? ?一、with语法 这个模块中所有带有acquire()和release()方法的对象,都可以使用with语句。当进入with语句块时,acquire()方法被自动调用,当离开with语句块时,release()语句块被自动调用。包括Lock、RLock、Condition、Semaphore。 以下语句: with some_lock: # do something pass ? 相当于: some_lock.acquire() try: pass finally: some_lock.release() ? ? 二、threading函数 在Python3中方法名和函数名统一成了以字母小写加下划线的命令方式,但是Python2.x中threading模块的某些以驼峰命名的方法和函数仍然可用,如threading.active_count()和threading.activeCount()是一样的。 通常情况下,Python程序启动时,Python解释器会启动一个继承自threading.Thread的threading._MainThread线程对象作为主线程,所以涉及到threading.Thread的方法和函数时通常都算上了这个主线程的,比如在启动程序时打印threading.active_count()的结果就已经是1了。
? 三、threading常量 threading.TIMEOUT_MAX:指定阻塞函数(如Lock.acquire()、RLock.acquire()、Condition.wait()等)中参数timeout的最大值,在给这些阻塞函数传参时如果超过了这个指定的最大值会抛出OverflowError错误。 ? 四、线程对象:threading.Thread threading.Thread目前还没有优先级和线程组的功能,而且创建的线程也不能被销毁、停止、暂定、恢复或中断。 守护线程:只有所有守护线程都结束,整个Python程序才会退出,但并不是说Python程序会等待守护线程运行完毕,相反,当程序退出时,如果还有守护线程在运行,程序会去强制终结所有守护线程,当守所有护线程都终结后,程序才会真正退出。可以通过修改daemon属性或者初始化线程时指定daemon参数来指定某个线程为守护线程。 非守护线程:一般创建的线程默认就是非守护线程,包括主线程也是,即在Python程序退出时,如果还有非守护线程在运行,程序会等待直到所有非守护线程都结束后才会退出。 注:守护线程会在程序关闭时突然关闭(如果守护线程在程序关闭时还在运行),它们占用的资源可能没有被正确释放,比如正在修改文档内容等,需要谨慎使用。 threading.Thread(group=None,target=None,name=None,args=(),kwargs={},*,daemon=None) 如果这个类的初始化方法被重写,请确保在重写的初始化方法中做任何事之前先调用threading.Thread类的__init__方法。
使用threading.Thread类创建线程简单示例: """ 通过实例化threading.Thread类创建线程 """ import time threading def test_thread(para='hi',sleep=3): 线程运行函数""" time.sleep(sleep) print(para) def main(): 创建线程 thread_hi = threading.Thread(target=test_thread) thread_hello = threading.Thread(target=test_thread,args=(hello)) 启动线程 thread_hi.start() thread_hello.start() print(Main thread has ended!') if __name__ == __main__: main() Main thread has ended! hello hi 使用threading.Thread类的子类创建线程简单示例: 通过继承threading.Thread的子类创建线程 class TestThread(threading.Thread): def __init__(self,para=): 重写threading.Thread的__init__方法时,确保在所有操作之前先调用threading.Thread.__init__方法 super().__init__() self.para = para self.sleep = sleep run(self): 线程内容 time.sleep(self.sleep) (self.para) 创建线程 thread_hi = TestThread() thread_hello = TestThread() : main() Main thread has ended! hello hi join方法简单示例: 使用join方法阻塞主线程 thread_hi.start() thread_hello.start() time.sleep(2马上执行join方法了 执行join方法会阻塞调用线程(主线程),直到调用join方法的线程(thread_hi)结束 thread_hi.join() 线程thread_hi已结束 这里不会阻塞主线程,因为运行到这里的时候,线程thread_hello已经运行结束了 thread_hello.join() ) 以上代码只是为了展示join方法的效果 如果想要等所有线程都运行完成后再做其他操作,可以使用for循环 for thd in (thread_hi,thread_hello): thd.join() # print('所有线程执行结束后的其他操作') : main() hello 马上执行join方法了 hi 线程thread_hi已结束 Main thread has ended! ? 五、锁对象:threading.Lock threading.Lock是直接通过_thread模块扩展实现的。 当锁在被锁定时,它并不属于某一个特定的线程。 锁只有“锁定”和“非锁定”两种状态,当锁被创建时,是处于“非锁定”状态的。当锁已经被锁定时,再次调用acquire()方法会被阻塞执行,直到锁被调用release()方法释放掉锁并将其状态改为“非锁定”。 同一个线程获取锁后,如果在释放锁之前再次获取锁会导致当前线程阻塞,除非有另外的线程来释放锁,如果只有一个线程,并且发生了这种情况,会导致这个线程一直阻塞下去,即形成了死锁。所以在获取锁时需要保证锁已经被释放掉了,或者使用递归锁来解决这种情况。
使用锁实现线程同步的简单示例: 使用锁实现线程同步 threading 创建锁 lock = threading.Lock() 全局变量 global_resource = [None] * 5 change_resource(para,sleep): 请求锁 lock.acquire() 这段代码如果不加锁,第一个线程运行结束后global_resource中是乱的,输出为:修改全局变量为: ['hello','hi','hello','hello'] 第二个线程运行结束后,global_resource中还是乱的,输出为:修改全局变量为: ['hello','hi'] global global_resource for i in range(len(global_resource)): global_resource[i] = para time.sleep(sleep) "修改全局变量为:",global_resource) 释放锁 lock.release() main(): thread_hi = threading.Thread(target=change_resource,2)) thread_hello = threading.Thread(target=change_resource,1)">)) thread_hi.start() thread_hello.start() : main() 修改全局变量为: ['hi','hi'] 修改全局变量为: ['hello','hello'] ? 六、递归锁对象:threading.RLock 递归锁和普通锁的差别在于加入了“所属线程”和“递归等级”的概念,释放锁必须有获取锁的线程来进行释放,同时,同一个线程在释放锁之前再次获取锁将不会阻塞当前线程,只是在锁的递归等级上加了1(获得锁时的初始递归等级为1)。 使用普通锁时,对于一些可能造成死锁的情况,可以考虑使用递归锁来解决。
递归锁使用简单示例: 在普通锁中可能造成死锁的情况,可以考虑使用递归锁解决 如果是使用的两个普通锁,那么就会造成死锁的情况,程序一直阻塞而不会退出 # rlock_hi = threading.Lock() rlock_hello = threading.Lock() 使用成一个递归锁就可以解决当前这种死锁情况 rlock_hi = rlock_hello = threading.RLock() test_thread_hi(): 初始时锁内部的递归等级为1 rlock_hi.acquire() 线程test_thread_hi获得了锁rlock_hi) time.sleep(2 如果再次获取同样一把锁,则不会阻塞,只是内部的递归等级加1 rlock_hello.acquire() 线程test_thread_hi获得了锁rlock_hello 释放一次锁,内部递归等级减1 rlock_hello.release() 这里再次减,当递归等级为0时,其他线程才可获取到此锁 rlock_hi.release() test_thread_hello(): rlock_hello.acquire() 线程test_thread_hello获得了锁rlock_hello) rlock_hi.acquire() 线程test_thread_hello获得了锁rlock_hi) rlock_hi.release() rlock_hello.release() main(): thread_hi = threading.Thread(target=test_thread_hi) thread_hello = threading.Thread(target=test_thread_hello) thread_hi.start() thread_hello.start() : main() 线程test_thread_hi获得了锁rlock_hi 线程test_thread_hi获得了锁rlock_hello 线程test_thread_hello获得了锁rlock_hello 线程test_thread_hello获得了锁rlock_hi ? ? 七、条件变量对象:threading.Condition 它的wait()方法释放锁,并阻塞程序直到其他线程调用notify()或者notify_all()方法唤醒,然后wait()方法重新获取锁,这个方法也可以指定timeout超时时间。 threading.Condition(lock=None):一个条件变量对象允许一个或多个线程等待,直到被另一个线程通知。lock参数必须是一个Lock对象或者RLock对象,并且会作为底层锁使用,默认使用RLock。
条件变量简单示例: 让一个线程等待,直到另一个线程通知 创建条件变量对象 condition_lock = threading.Condition() PRE = 0 predicate可调用函数 pre(): (PRE) return PRE 在使用wait/wait_for之前必须先获得锁 condition_lock.acquire() 等待线程test_thread_hello的通知 先执行一次pre,返回False后释放掉锁,等另一个线程释放掉锁后再次执行pre,返回True后再次获取锁 wait_for的返回值不是True和False,而是predicate参数的返回值 condition_lock.wait_for(pre) condition_lock.wait() 继续执行 不要忘记使用wait/wait_for之后要释放锁 condition_lock.release() test_thread_hello(): time.sleep(1) condition_lock.acquire() PRE PRE = 1 修改PRE值为1通知线程test_thread_hi可以准备获取锁了) condition_lock.notify() 先notify/notify_all之后在释放锁 condition_lock.release() 你获取锁吧: main() 等待线程test_thread_hello的通知 0 修改PRE值为1 通知线程test_thread_hi可以准备获取锁了 你获取锁吧 1 继续执行 ? ? 八、信号量对象:threading.Semaphore 一个信号量管理一个内部计数器,acquire()方法会减少计数器,release()方法则增加计数器,计数器的值永远不会小于零,当调用acquire()时,如果发现该计数器为零,则阻塞线程,直到调用release()方法使计数器增加。 threading.Semaphore(value=1):value参数默认值为1,如果指定的值小于0,则会报ValueError错误。一个信号量对象管理一个原子性的计数器,代表release()方法调用的次数减去acquire()方法的调用次数,再加上一个初始值。
信号量对象简单示例: 通过信号量对象管理一次性运行的线程数量 创建信号量对象,初始化计数器值为3 semaphore3 = threading.Semaphore(3 thread_semaphore(index): 信号量计数器减1 semaphore3.acquire() time.sleep(2thread_%s is running...' % index) 信号量计数器加1 semaphore3.release() 虽然会有9个线程运行,但是通过信号量控制同时只能有3个线程运行 第4个线程启动时,调用acquire发现计数器为0了,所以就会阻塞等待计数器大于0的时候 for index in range(9): threading.Thread(target=thread_semaphore,args=(index,)).start() : main() ? ? 九、事件对象:threading.Event 一个事件对象管理一个内部标志,初始状态默认为False,set()方法可将它设置为True,clear()方法可将它设置为False,wait()方法将线程阻塞直到内部标志的值为True。 如果一个或多个线程需要知道另一个线程的某个状态才能进行下一步的操作,就可以使用线程的event事件对象来处理。
事件对象简单示例: 事件对象使用实例 创建事件对象,内部标志默认为False event = threading.Event() student_exam(student_id): 学生%s等监考老师发卷。。。 student_id) event.wait() 开始考试了! invigilate_teacher(): time.sleep(5考试时间到,学生们可以开始考试了! 设置内部标志为True,并唤醒所有等待的线程 event.set() for student_id in range(3): threading.Thread(target=student_exam,1)">(student_id,)).start() threading.Thread(target=invigilate_teacher).start() : main() 学生0等监考老师发卷。。。 学生1等监考老师发卷。。。 学生2等监考老师发卷。。。 考试时间到,学生们可以开始考试了! 开始考试了! 开始考试了! 开始考试了! ? ? 十、定时器对象:threading.Timer 表示一个操作需要在等待一定时间之后执行,相当于一个定时器。Timer类是threading.Thread的子类,所以它可以像一个自定义线程一样工作。 threading.Timer(interval,function,args=None,kwargs=None)
cancel():停止计时器,并取消对应函数的执行,这个方法只有在计时器还没有计时结束之前才会生效,如果已经开始执行函数,则不会生效。 ? 十一、栅栏对象:threading.Barrier 栅栏对象用于一个固定数量的线程,而这些线程需要等待彼此的情况。这些线程中的每个线程都会尝试调用wait()方法,然后阻塞,直到所有线程都调用了wait()方法,然后所有线程会被同时释放。 threading.Barrier(parties,action=None,timeout=None)
栅栏对象简单示例: 栅栏对象使用示例 test_action(): 所有栅栏线程释放前调用此函数! 创建线程数为3的栅栏对象,当“拦住”3个线程的wait后放行,然后又继续“拦”(如果有的话) barrier = threading.Barrier(3 barrier_thread(sleep): time.sleep(sleep) barrier thread-%s wait... sleep) 阻塞线程,直到阻塞线程数达到栅栏指定数量 barrier.wait() barrier thread-%s end! sleep) 这里开启了6个线程,则一次会拦截3个 for sleep in range(6): threading.Thread(target=barrier_thread,1)">(sleep,1)">: main() ? barrier thread-0 wait... barrier thread-1 wait... barrier thread-2 wait... 所有栅栏线程释放前调用此函数! barrier thread-2 end! barrier thread-0 end! barrier thread-1 end! barrier thread-3 wait... barrier thread-4 wait... barrier thread-5 wait... 所有栅栏线程释放前调用此函数! barrier thread-5 end! barrier thread-3 end! barrier thread-4 end! ? ? 十二、多线程与多进程的理解 在网上查多线程资料的时候,很多文章讲到了对Python的多线程与多进程的理解,包括相同之处和它们之间的区别,我就整理了一些点放在这里: 进程ID:多线程的主进程和它的子线程的进程ID,即os.getpid(),都是相同的,都是主进程的进程ID。多进程则是主进程和它的子进程都有各自的进程ID,都不相同。 共享数据:多线程可以共享主进程内的数据,但是多进程用的都是各自的数据,无法共享。 主线程:由Python解释器运行主py时,也就是开启了一个Python进程,而这个py是这个进程内的一个线程,不过不同于其他线程,它是主线程,同时这个进程内还有其他的比如垃圾回收等解释器级别的线程,所以进程就等于主线程这种理解是有误的。 CPU多核利用:Python解释器的线程只能在CPU单核上运行,开销小,但是这也是缺点,因为没有利用CPU多核的特点。Python的多进程是可以利用多个CPU核心的,但也有其他语言的多线程是可以利用多核的。 单核与多核:一个CPU的主要作用是用来做计算的,多个CPU核心如果都用来做计算,那么效率肯定会提高很多,但是对于IO来说,多个CPU核心也没有太大用处,因为没有输入,后面的动作也无法执行。所以如果一个程序是计算密集型的,那么就该利用多核的优势(比如使用Python的多进程),如果是IO密集型的,那么使用单核的多线程就完全够了。 线程或进程间的切换:线程间的切换是要快于进程间的切换的。 死锁:指的是两个或两个以上的线程或进程在请求锁的时候形成了互相等待阻塞的情况,导致这些线程或进程无法继续执行下去,这时候称系统处于死锁状态或者系统产生了死锁,这些线程或进程就称为死锁线程或死锁进程。解决死锁的办法可以使用递归锁,即threading.RLock,然后线程或进程就可以随意请求和释放锁了,而不用担心别的线程或进程也在请求锁而产生死锁的情况。 信号量与进程池:进程池Pool(n)只能是“池”中的n个进程运行,不能有新的进程,信号量只要保证最大线程数就行,而不是只有这几个线程,旧的线程运行结束,就可以继续来新的线程。 ? (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |