深入解析Python中的线程同步方法
同步访问共享资源 在使用线程的时候,一个很重要的问题是要避免多个线程对同一变量或其它资源的访问冲突。一旦你稍不留神,重叠访问、在多个线程中修改(共享资源)等这些操作会导致各种各样的问题;更严重的是,这些问题一般只会在比较极端(比如高并发、生产服务器、甚至在性能更好的硬件设备上)的情况下才会出现。 counter = 0 def process_item(item): global counter ... do something with item ... counter += 1 如果你在多个线程中同时调用这个函数,你会发现counter的值不是那么准确。在大多数情况下它是对的,但有时它会比实际的少几个。
考虑一下这种情况:在当前线程获取到counter值后,另一个线程抢占到了CPU,然后同样也获取到了counter值,并进一步将counter值重新计算并完成回写;之后时间片重新轮到当前线程(这里仅作标识区分,并非实际当前),此时当前线程获取到counter值还是原来的,完成后续两步操作后counter的值实际只加上1。 原子操作
注意,上面提到过,对一个变量或者属性进行读操作,然后修改它,最终将其回写不是线程安全的。因为另外一个线程会在这个线程读完却没有修改或回写完成之前更改这个共享变量/属性。 锁 锁是Python的threading模块提供的最基本的同步机制。在任一时刻,一个锁对象可能被一个线程获取,或者不被任何线程获取。如果一个线程尝试去获取一个已经被另一个线程获取到的锁对象,那么这个想要获取锁对象的线程只能暂时终止执行直到锁对象被另一个线程释放掉。 lock = Lock() lock.acquire() #: will block if lock is already held ... access shared resource lock.release() 注意,即使在访问共享资源的过程中出错了也应该释放锁,可以用try-finally来达到这一目的: lock.acquire() try: ... access shared resource finally: lock.release() #: release lock,no matter what 在Python 2.5及以后的版本中,你可以使用with语句。在使用锁的时候,with语句会在进入语句块之前自动的获取到该锁对象,然后在语句块执行完成后自动释放掉锁: from __future__ import with_statement #: 2.5 only with lock: ... access shared resource acquire方法带一个可选的等待标识,它可用于设定当有其它线程占有锁时是否阻塞。如果你将其值设为False,那么acquire方法将不再阻塞,只是如果该锁被占有时它会返回False: if not lock.acquire(False): ... 锁资源失败 else: try: ... access shared resource finally: lock.release() 你可以使用locked方法来检查一个锁对象是否已被获取,注意不能用该方法来判断调用acquire方法时是否会阻塞,因为在locked方法调用完成到下一条语句(比如acquire)执行之间该锁有可能被其它线程占有。 if not lock.locked(): #: 其它线程可能在下一条语句执行之前占有了该锁 lock.acquire() #: 可能会阻塞 简单锁的缺点 lock = threading.Lock() def get_first_part(): lock.acquire() try: ... 从共享对象中获取第一部分数据 finally: lock.release() return data def get_second_part(): lock.acquire() try: ... 从共享对象中获取第二部分数据 finally: lock.release() return data 示例中,我们有一个共享资源,有两个分别取这个共享资源第一部分和第二部分的函数。两个访问函数都使用了锁来确保在获取数据时没有其它线程修改对应的共享数据。 def get_both_parts(): first = get_first_part() seconde = get_second_part() return first,second 这里的问题是,如有某个线程在两个函数调用之间修改了共享资源,那么我们最终会得到不一致的数据。最明显的解决方法是在这个函数中也使用lock: def get_both_parts(): lock.acquire() try: first = get_first_part() seconde = get_second_part() finally: lock.release() return first,second 然而,这是不可行的。里面的两个访问函数将会阻塞,因为外层语句已经占有了该锁。为了解决这个问题,你可以通过使用标记在访问函数中让外层语句释放锁,但这样容易失去控制并导致出错。幸运的是,threading模块包含了一个更加实用的锁实现:re-entrant锁。 RLock类是简单锁的另一个版本,它的特点在于,同一个锁对象只有在被其它的线程占有时尝试获取才会发生阻塞;而简单锁在同一个线程中同时只能被占有一次。如果当前线程已经占有了某个RLock锁对象,那么当前线程仍能再次获取到该RLock锁对象。 lock = threading.Lock() lock.acquire() lock.acquire() #: 这里将会阻塞 lock = threading.RLock() lock.acquire() lock.acquire() #: 这里不会发生阻塞 RLock的主要作用是解决嵌套访问共享资源的问题,就像前面描述的示例。要想解决前面示例中的问题,我们只需要将Lock换为RLock对象,这样嵌套调用也会OK. lock = threading.RLock() def get_first_part(): ... see above def get_second_part(): ... see above def get_both_parts(): ... see above 这样既可以单独访问两部分数据也可以一次访问两部分数据而不会被锁阻塞或者获得不一致的数据。 信号量是一个更高级的锁机制。信号量内部有一个计数器而不像锁对象内部有锁标识,而且只有当占用信号量的线程数超过信号量时线程才阻塞。这允许了多个线程可以同时访问相同的代码区。 semaphore = threading.BoundedSemaphore() semaphore.acquire() #: counter减小 ... 访问共享资源 semaphore.release() #: counter增大 当信号量被获取的时候,计数器减小;当信号量被释放的时候,计数器增大。当获取信号量的时候,如果计数器值为0,则该进程将阻塞。当某一信号量被释放,counter值增加为1时,被阻塞的线程(如果有的话)中会有一个得以继续运行。 max_connections = 10 semaphore = threading.BoundedSemaphore(max_connections)
如果你不传任何初始化参数,计数器的值会被初始化为1. 锁可以用在线程间的同步上。threading模块包含了一些用于线程间同步的类。 一个事件是一个简单的同步对象,事件表示为一个内部标识(internal flag),线程等待这个标识被其它线程设定,或者自己设定、清除这个标识。 event = threading.Event() #: 一个客户端线程等待flag被设定 event.wait() #: 服务端线程设置或者清除flag event.set() event.clear() 一旦标识被设定,wait方法就不做任何处理(不会阻塞),当标识被清除时,wait将被阻塞直至其被重新设定。任意数量的线程可能会等待同一个事件。 条件是事件对象的高级版本。条件表现为程序中的某种状态改变,线程可以等待给定条件或者条件发生的信号。 #: 表示一个资源的附属项 condition = threading.Condition() 生产者线程在通知消费者线程有新生成资源之前需要获得条件: #: 生产者线程 ... 生产资源项 condition.acquire() ... 将资源项添加到资源中 condition.notify() #: 发出有可用资源的信号 condition.release() 消费者必须获取条件(以及相关联的锁),然后尝试从资源中获取资源项: #: 消费者线程 condition.acquire() while True: ...从资源中获取资源项 if item: break condition.wait() #: 休眠,直至有新的资源 condition.release() ... 处理资源 wait方法释放了锁,然后将当前线程阻塞,直到有其它线程调用了同一条件对象的notify或者notifyAll方法,然后又重新拿到锁。如果同时有多个线程在等待,那么notify方法只会唤醒其中的一个线程,而notifyAll则会唤醒全部线程。 lock = threading.RLock() condition_1 = threading.Condition(lock) condition_2 = threading.Condition(lock) 互斥锁同步 #!/usr/bin/env python # -*- coding: utf-8 -*- import time,threading # 假定这是你的银行存款: balance = 0 muxlock = threading.Lock() def change_it(n): # 先存后取,结果应该为0: global balance balance = balance + n balance = balance - n def run_thread(n): # 循环次数一旦多起来,最后的数字就变成非0 for i in range(100000): change_it(n) t1 = threading.Thread(target=run_thread,args=(5,)) t2 = threading.Thread(target=run_thread,args=(8,)) t3 = threading.Thread(target=run_thread,args=(9,)) t1.start() t2.start() t3.start() t1.join() t2.join() t3.join() print balance 结果 : [/data/web/test_python]$ python multhread_threading.py 0 [/data/web/test_python]$ python multhread_threading.py 61 [/data/web/test_python]$ python multhread_threading.py 0 [/data/web/test_python]$ python multhread_threading.py 24 上面的例子引出了多线程编程的最常见问题:数据共享。当多个线程都修改某一个共享数据的时候,需要进行同步控制。 threading模块中定义了Lock类,可以方便的处理锁定: #创建锁mutex = threading.Lock() #锁定mutex.acquire([timeout]) #释放mutex.release() 其中,锁定方法acquire可以有一个超时时间的可选参数timeout。如果设定了timeout,则在超时后通过返回值可以判断是否得到了锁,从而可以进行一些其他的处理。 balance = 0 muxlock = threading.Lock() def change_it(n): # 获取锁,确保只有一个线程操作这个数 muxlock.acquire() global balance balance = balance + n balance = balance - n # 释放锁,给其他被阻塞的线程继续操作 muxlock.release() def run_thread(n): for i in range(10000): change_it(n) 加锁后的结果,就能确保数据正确: [/data/web/test_python]$ python multhread_threading.py 0 [/data/web/test_python]$ python multhread_threading.py 0 [/data/web/test_python]$ python multhread_threading.py 0 [/data/web/test_python]$ python multhread_threading.py 0 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |