加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

python - 线程

发布时间:2020-12-17 00:02:56 所属栏目:Python 来源:网络整理
导读:一,线程和python 1,理论知识 全局解释器锁GIL Python代码的执行由Python虚拟机(也叫解释器主循环)来控制。Python在设计之初就考虑到要在主循环中,同时只有一个线程在执行。虽然 Python 解释器中可以“运行”多个线程,但在任意时刻只有一个线程在解释器中

一,线程和python

1,理论知识

全局解释器锁GIL

  Python代码的执行由Python虚拟机(也叫解释器主循环)来控制。Python在设计之初就考虑到要在主循环中,同时只有一个线程在执行。虽然 Python 解释器中可以“运行”多个线程,但在任意时刻只有一个线程在解释器中运行。  对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同一时刻只有一个线程在运行。

  在多线程环境中,Python 虚拟机按以下方式执行:

  a、设置 GIL;

  b、切换到一个线程去运行;

  c、运行指定数量的字节码指令或者线程主动让出控制(可以调用 time.sleep(0));

  d、把线程设置为睡眠状态;

  e、解锁 GIL;

  d、再次重复以上所有步骤。  在调用外部代码(如 C/C++扩展函数)的时候,GIL将会被锁定,直到这个函数结束为止(由于在这期间没有Python的字节码被运行,所以不会做线程切换)编写扩展的程序员可以主动解锁GIL。

python线程模块的选择

  Python提供了几个用于多线程编程的模块,包括thread、threading和Queue等。thread和threading模块允许程序员创建和管理线程。thread模块提供了基本的线程和锁的支持,threading提供了更高级别、功能更强的线程管理的功能。Queue模块允许用户创建一个可以用于多个线程之间共享数据的队列数据结构。  避免使用thread模块,因为更高级别的threading模块更为先进,对线程的支持更为完善,而且使用thread模块里的属性有可能会与threading出现冲突;其次低级别的thread模块的同步原语很少(实际上只有一个),而threading模块则有很多;再者,thread模块中当主线程结束时,所有的线程都会被强制结束掉,没有警告也不会有正常的清除工作,至少threading模块能确保重要的子线程退出后进程才退出。?

  thread模块不支持守护线程,当主线程退出时,所有的子线程不论它们是否还在工作,都会被强行退出。而threading模块支持守护线程,守护线程一般是一个等待客户请求的服务器,如果没有客户提出请求它就在那等着,如果设定一个线程为守护线程,就表示这个线程是不重要的,在进程退出的时候,不用等待这个线程退出。

2,threading模块

multiprocess模块的完全模仿了threading模块的接口,二者在使用层面,有很大的相似性,因而不再详细介绍()

线程的创建Threading.Thread类

线程的创建

threading 2( %<span style="color: #0000ff;">if <span style="color: #800080;">name == <span style="color: #800000;">'<span style="color: #800000;">main<span style="color: #800000;">'<span style="color: #000000;">:
t
=Thread(target=sayhi,args=(<span style="color: #800000;">'
<span style="color: #800000;">egon
<span style="color: #800000;">'
<span style="color: #000000;">,))
t.start()
<span style="color: #0000ff;">print
(<span style="color: #800000;">'
<span style="color: #800000;">主线程<span style="color: #800000;">')

threading =2( %<span style="color: #0000ff;">if <span style="color: #800080;">name == <span style="color: #800000;">'<span style="color: #800000;">main<span style="color: #800000;">'<span style="color: #000000;">:
t
= Sayhi(<span style="color: #800000;">'
<span style="color: #800000;">egon
<span style="color: #800000;">'
<span style="color: #000000;">)
t.start()
<span style="color: #0000ff;">print
(<span style="color: #800000;">'
<span style="color: #800000;">主线程
<span style="color: #800000;">'
)

多线程与多进程

threading multiprocessing <span style="color: #0000ff;">def<span style="color: #000000;"> work():
<span style="color: #0000ff;">print
(<span style="color: #800000;">'
<span style="color: #800000;">hello
<span style="color: #800000;">'
<span style="color: #000000;">,os.getpid())

<span style="color: #0000ff;">if <span style="color: #800080;">name == <span style="color: #800000;">'<span style="color: #800000;">main<span style="color: #800000;">'<span style="color: #000000;">:
<span style="color: #008000;">#<span style="color: #008000;">part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
t1=Thread(target=<span style="color: #000000;">work)
t2=Thread(target=<span style="color: #000000;">work)
t1.start()
t2.start()
<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;">主线程/主进程pid<span style="color: #800000;">'<span style="color: #000000;">,os.getpid())

</span><span style="color: #008000;"&gt;#</span><span style="color: #008000;"&gt;part2:开多个进程,每个进程都有不同的pid</span>
p1=Process(target=<span style="color: #000000;"&gt;work)
p2</span>=Process(target=<span style="color: #000000;"&gt;work)
p1.start()
p2.start()
</span><span style="color: #0000ff;"&gt;print</span>(<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;主线程/主进程pid</span><span style="color: #800000;"&gt;'</span>,os.getpid())</pre>
threading multiprocessing <span style="color: #0000ff;">def<span style="color: #000000;"> work():
<span style="color: #0000ff;">print
(<span style="color: #800000;">'
<span style="color: #800000;">hello
<span style="color: #800000;">'
<span style="color: #000000;">)

<span style="color: #0000ff;">if <span style="color: #800080;">name == <span style="color: #800000;">'<span style="color: #800000;">main<span style="color: #800000;">'<span style="color: #000000;">:
<span style="color: #008000;">#<span style="color: #008000;">在主进程下开启线程
t=Thread(target=<span style="color: #000000;">work)
t.start()
<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;">主线程/主进程<span style="color: #800000;">'<span style="color: #000000;">)
<span style="color: #800000;">'''<span style="color: #800000;">
打印结果:
hello
主线程/主进程
<span style="color: #800000;">'''

<span style="color: #008000;"&gt;#</span><span style="color: #008000;"&gt;在主进程下开启子进程</span>
t=Process(target=<span style="color: #000000;"&gt;work)
t.start()
</span><span style="color: #0000ff;"&gt;print</span>(<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;主线程/主进程</span><span style="color: #800000;"&gt;'</span><span style="color: #000000;"&gt;)
</span><span style="color: #800000;"&gt;'''</span><span style="color: #800000;"&gt;
打印结果:
主线程/主进程
hello
</span><span style="color: #800000;"&gt;'''</span></pre>
threading multiprocessing =<span style="color: #0000ff;">if <span style="color: #800080;">name == <span style="color: #800000;">'<span style="color: #800000;">main<span style="color: #800000;">'<span style="color: #000000;">:
<span style="color: #008000;">#
<span style="color: #008000;"> n=100

<span style="color: #008000;">#
<span style="color: #008000;"> p=Process(target=work)

<span style="color: #008000;">#
<span style="color: #008000;"> p.start()

<span style="color: #008000;">#<span style="color: #008000;"> p.join()
<span style="color: #008000;">#<span style="color: #008000;"> print('主',n) #毫无疑问子进程p已经将自己的全局的n改成了0,但改的仅仅是它自己的,查看父进程的n仍然为100
<span style="color: #000000;">

n</span>=1<span style="color: #000000;"&gt;
t</span>=Thread(target=<span style="color: #000000;"&gt;work)
t.start()
t.join()
</span><span style="color: #0000ff;"&gt;print</span>(<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;主</span><span style="color: #800000;"&gt;'</span>,n) <span style="color: #008000;"&gt;#</span><span style="color: #008000;"&gt;查看结果为0,因为同一进程内的线程之间共享进程内的数据</span></pre>

练习:多线程实现socket

<span style="color: #0000ff;">import<span style="color: #000000;"> socket
s
=<span style="color: #000000;">socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind((
<span style="color: #800000;">'<span style="color: #800000;">127.0.0.1<span style="color: #800000;">',8080<span style="color: #000000;">))
s.listen(5<span style="color: #000000;">)

<span style="color: #0000ff;">def<span style="color: #000000;"> action(conn):
<span style="color: #0000ff;">while<span style="color: #000000;"> True:
data=conn.recv(1024<span style="color: #000000;">)
<span style="color: #0000ff;">print<span style="color: #000000;">(data)
conn.send(data.upper())

<span style="color: #0000ff;">if <span style="color: #800080;">name == <span style="color: #800000;">'<span style="color: #800000;">main<span style="color: #800000;">'<span style="color: #000000;">:

</span><span style="color: #0000ff;"&gt;while</span><span style="color: #000000;"&gt; True:
    conn,addr</span>=<span style="color: #000000;"&gt;s.accept()


    p</span>=threading.Thread(target=action,args=<span style="color: #000000;"&gt;(conn,))
    p.start()</span></pre>
s=<span style="color: #000000;">socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect((
<span style="color: #800000;">'<span style="color: #800000;">127.0.0.1<span style="color: #800000;">',8080<span style="color: #000000;">))

<span style="color: #0000ff;">while<span style="color: #000000;"> True:
msg=input(<span style="color: #800000;">'<span style="color: #800000;">>>: <span style="color: #800000;">'<span style="color: #000000;">).strip()
<span style="color: #0000ff;">if <span style="color: #0000ff;">not msg:<span style="color: #0000ff;">continue<span style="color: #000000;">

s.send(msg.encode(</span><span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;utf-8</span><span style="color: #800000;"&gt;'</span><span style="color: #000000;"&gt;))
data</span>=s.recv(1024<span style="color: #000000;"&gt;)
</span><span style="color: #0000ff;"&gt;print</span>(data)</pre>

Thread类的其他方法

threading multiprocessing <span style="color: #0000ff;">def<span style="color: #000000;"> work():
<span style="color: #0000ff;">import
<span style="color: #000000;"> time
time.sleep(
3<span style="color: #000000;">)
<span style="color: #0000ff;">print
<span style="color: #000000;">(threading.current_thread().getName())

<span style="color: #0000ff;">if <span style="color: #800080;">name == <span style="color: #800000;">'<span style="color: #800000;">main<span style="color: #800000;">'<span style="color: #000000;">:
<span style="color: #008000;">#<span style="color: #008000;">在主进程下开启线程
t=Thread(target=<span style="color: #000000;">work)
t.start()

</span><span style="color: #0000ff;"&gt;print</span><span style="color: #000000;"&gt;(threading.current_thread().getName())
</span><span style="color: #0000ff;"&gt;print</span>(threading.current_thread()) <span style="color: #008000;"&gt;#</span><span style="color: #008000;"&gt;主线程</span>
<span style="color: #0000ff;"&gt;print</span>(threading.enumerate()) <span style="color: #008000;"&gt;#</span><span style="color: #008000;"&gt;连同主线程在内有两个运行的线程</span>
<span style="color: #0000ff;"&gt;print</span><span style="color: #000000;"&gt;(threading.active_count())
</span><span style="color: #0000ff;"&gt;print</span>(<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;主线程/主进程</span><span style="color: #800000;"&gt;'</span><span style="color: #000000;"&gt;)

</span><span style="color: #800000;"&gt;'''</span><span style="color: #800000;"&gt;
打印结果:
MainThread
<_MainThread(MainThread,started 140735268892672)>
[<_MainThread(MainThread,started 140735268892672)>,<Thread(Thread-1,started 123145307557888)>]
主线程/主进程
Thread-1
</span><span style="color: #800000;"&gt;'''</span></pre>
threading 2( %<span style="color: #0000ff;">if <span style="color: #800080;">name == <span style="color: #800000;">'<span style="color: #800000;">main<span style="color: #800000;">'<span style="color: #000000;">:
t
=Thread(target=sayhi,))
t.start()
t.join()
<span style="color: #0000ff;">print
(<span style="color: #800000;">'
<span style="color: #800000;">主线程
<span style="color: #800000;">'
<span style="color: #000000;">)
<span style="color: #0000ff;">print<span style="color: #000000;">(t.is_alive())
<span style="color: #800000;">'''<span style="color: #800000;">
egon say hello
主线程
False
<span style="color: #800000;">'''

守护线程

无论是进程还是线程,都遵循:守护xx会等待主xx运行完毕后被销毁。需要强调的是:运行完毕并非终止运行

threading 2( %<span style="color: #0000ff;">if <span style="color: #800080;">name == <span style="color: #800000;">'<span style="color: #800000;">main<span style="color: #800000;">'<span style="color: #000000;">:
t
=Thread(target=sayhi,))
t.setDaemon(True)
<span style="color: #008000;">#
<span style="color: #008000;">必须在t.start()之前设置

<span style="color: #000000;"> t.start()

</span><span style="color: #0000ff;"&gt;print</span>(<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;主线程</span><span style="color: #800000;"&gt;'</span><span style="color: #000000;"&gt;)
</span><span style="color: #0000ff;"&gt;print</span><span style="color: #000000;"&gt;(t.is_alive())
</span><span style="color: #800000;"&gt;'''</span><span style="color: #800000;"&gt;
主线程
True</span></pre>
threading (1231(<span style="color: #0000ff;">def<span style="color: #000000;"> bar():
<span style="color: #0000ff;">print
(456<span style="color: #000000;">)
time.sleep(
3<span style="color: #000000;">)
<span style="color: #0000ff;">print
(<span style="color: #800000;">"
<span style="color: #800000;">end456
<span style="color: #800000;">"
<span style="color: #000000;">)

t1=Thread(target=<span style="color: #000000;">foo)
t2=Thread(target=<span style="color: #000000;">bar)

t1.daemon=<span style="color: #000000;">True
t1.start()
t2.start()
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;">main-------<span style="color: #800000;">")

3,锁 threading.Lock

锁与GIL

详情 :?

同步锁

threading =0.1=temp-1 == =100= i range(100=Thread(target= p </span><span style="color: #0000ff;"&gt;print</span>(n) <span style="color: #008000;"&gt;#</span><span style="color: #008000;"&gt;结果可能为99</span></pre>
=
threading =0.1=temp-1 == ==100= i range(100=Thread(target= p </span><span style="color: #0000ff;"&gt;print</span>(n) <span style="color: #008000;"&gt;#</span><span style="color: #008000;"&gt;结果肯定为0,由原来的并发执行变成串行,牺牲了执行效率保证了数据安全</span></pre>
threading ( %=0.5=temp-1

<span style="color: #0000ff;">if <span style="color: #800080;">name == <span style="color: #800000;">'<span style="color: #800000;">main<span style="color: #800000;">'<span style="color: #000000;">:
n=100<span style="color: #000000;">
lock=<span style="color: #000000;">Lock()
threads=<span style="color: #000000;">[]
start_time=<span style="color: #000000;">time.time()
<span style="color: #0000ff;">for i <span style="color: #0000ff;">in range(100<span style="color: #000000;">):
t=Thread(target=<span style="color: #000000;">task)
threads.append(t)
t.start()
<span style="color: #0000ff;">for t <span style="color: #0000ff;">in<span style="color: #000000;"> threads:
t.join()

stop_time</span>=<span style="color: #000000;"&gt;time.time()
</span><span style="color: #0000ff;"&gt;print</span>(<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;主:%s n:%s</span><span style="color: #800000;"&gt;'</span> %(stop_time-<span style="color: #000000;"&gt;start_time,n))

<span style="color: #800000;">'''<span style="color: #800000;">
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:0.5216062068939209 n:99
<span style="color: #800000;">'''

<span style="color: #008000;">#<span style="color: #008000;">不加锁:未加锁部分并发执行,加锁部分串行执行,速度慢,数据安全
<span style="color: #0000ff;">from threading <span style="color: #0000ff;">import<span style="color: #000000;"> current_thread,time
<span style="color: #0000ff;">def<span style="color: #000000;"> task():
<span style="color: #008000;">#<span style="color: #008000;">未加锁的代码并发运行
time.sleep(3<span style="color: #000000;">)
<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;">%s start to run<span style="color: #800000;">' %<span style="color: #000000;">current_thread().getName())
<span style="color: #0000ff;">global<span style="color: #000000;"> n
<span style="color: #008000;">#<span style="color: #008000;">加锁的代码串行运行
<span style="color: #000000;"> lock.acquire()
temp=<span style="color: #000000;">n
time.sleep(0.5<span style="color: #000000;">)
n=temp-1<span style="color: #000000;">
lock.release()

<span style="color: #0000ff;">if <span style="color: #800080;">name == <span style="color: #800000;">'<span style="color: #800000;">main<span style="color: #800000;">'<span style="color: #000000;">:
n=100<span style="color: #000000;">
lock=<span style="color: #000000;">Lock()
threads=<span style="color: #000000;">[]
start_time=<span style="color: #000000;">time.time()
<span style="color: #0000ff;">for i <span style="color: #0000ff;">in range(100<span style="color: #000000;">):
t=Thread(target=<span style="color: #000000;">task)
threads.append(t)
t.start()
<span style="color: #0000ff;">for t <span style="color: #0000ff;">in<span style="color: #000000;"> threads:
t.join()
stop_time=<span style="color: #000000;">time.time()
<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;">主:%s n:%s<span style="color: #800000;">' %(stop_time-<span style="color: #000000;">start_time,n))

<span style="color: #800000;">'''<span style="color: #800000;">
Thread-1 is running
Thread-2 is running
......
Thread-100 is running
主:53.294203758239746 n:0
<span style="color: #800000;">'''

<span style="color: #008000;">#<span style="color: #008000;">有的人可能有疑问:既然加锁会让运行变成串行,那么我在start之后立即使用join,就不用加锁了啊,也是串行的效果啊<span style="color: #008000;">

<span style="color: #008000;">没错:在start之后立刻使用jion,肯定会将100个任务的执行变成串行,毫无疑问,最终n的结果也肯定是0,是安全的,但问题是<span style="color: #008000;">

<span style="color: #008000;">start后立即join:任务内的所有代码都是串行执行的,而加锁,只是加锁的部分即修改共享数据的部分是串行的<span style="color: #008000;">

<span style="color: #008000;">单从保证数据安全方面,二者都可以实现,但很明显是加锁的效率更高.

<span style="color: #0000ff;">from threading <span style="color: #0000ff;">import<span style="color: #000000;"> current_thread,time
<span style="color: #0000ff;">def<span style="color: #000000;"> task():
time.sleep(3<span style="color: #000000;">)
<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;">%s start to run<span style="color: #800000;">' %<span style="color: #000000;">current_thread().getName())
<span style="color: #0000ff;">global<span style="color: #000000;"> n
temp=<span style="color: #000000;">n
time.sleep(0.5<span style="color: #000000;">)
n=temp-1

<span style="color: #0000ff;">if <span style="color: #800080;">name == <span style="color: #800000;">'<span style="color: #800000;">main<span style="color: #800000;">'<span style="color: #000000;">:
n=100<span style="color: #000000;">
lock=<span style="color: #000000;">Lock()
start_time=<span style="color: #000000;">time.time()
<span style="color: #0000ff;">for i <span style="color: #0000ff;">in range(100<span style="color: #000000;">):
t=Thread(target=<span style="color: #000000;">task)
t.start()
t.join()
stop_time=<span style="color: #000000;">time.time()
<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;">主:%s n:%s<span style="color: #800000;">' %(stop_time-<span style="color: #000000;">start_time,n))

<span style="color: #800000;">'''<span style="color: #800000;">
Thread-1 start to run
Thread-2 start to run
......
Thread-100 start to run
主:350.6937336921692 n:0 #耗时是多么的恐怖
<span style="color: #800000;">'''

死锁与递归锁

进程也有死锁与递归锁,在进程那里忘记说了,放到这里一切说了额

所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁

threading =(123

解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。

这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:

threading =(123

典型问题:科学家吃面

threading ==(%(%(%<span style="color: #0000ff;">def<span style="color: #000000;"> eat2(name):
fork_lock.acquire()
<span style="color: #0000ff;">print
(<span style="color: #800000;">'
<span style="color: #800000;">%s 抢到了叉子
<span style="color: #800000;">'
%<span style="color: #000000;"> name)
time.sleep(
1<span style="color: #000000;">)
noodle_lock.acquire()
<span style="color: #0000ff;">print
(<span style="color: #800000;">'
<span style="color: #800000;">%s 抢到了面条
<span style="color: #800000;">'
%<span style="color: #000000;"> name)
<span style="color: #0000ff;">print
(<span style="color: #800000;">'
<span style="color: #800000;">%s 吃面
<span style="color: #800000;">'
%<span style="color: #000000;"> name)
noodle_lock.release()
fork_lock.release()

<span style="color: #0000ff;">for name <span style="color: #0000ff;">in [<span style="color: #800000;">'<span style="color: #800000;">阿尔法博士<span style="color: #800000;">',<span style="color: #800000;">'<span style="color: #800000;">鲁班<span style="color: #800000;">',<span style="color: #800000;">'<span style="color: #800000;">霍金<span style="color: #800000;">'<span style="color: #000000;">]:
t1 = Thread(target=eat1,args=<span style="color: #000000;">(name,))
t2 = Thread(target=eat2,))
t1.start()
t2.start()

threading = noodle_lock =(%(%(%<span style="color: #0000ff;">def<span style="color: #000000;"> eat2(name):
fork_lock.acquire()
<span style="color: #0000ff;">print
(<span style="color: #800000;">'
<span style="color: #800000;">%s 抢到了叉子
<span style="color: #800000;">'
%<span style="color: #000000;"> name)
time.sleep(
1<span style="color: #000000;">)
noodle_lock.acquire()
<span style="color: #0000ff;">print
(<span style="color: #800000;">'
<span style="color: #800000;">%s 抢到了面条
<span style="color: #800000;">'
%<span style="color: #000000;"> name)
<span style="color: #0000ff;">print
(<span style="color: #800000;">'
<span style="color: #800000;">%s 吃面
<span style="color: #800000;">'
%<span style="color: #000000;"> name)
noodle_lock.release()
fork_lock.release()

<span style="color: #0000ff;">for name <span style="color: #0000ff;">in [<span style="color: #800000;">'<span style="color: #800000;">阿尔法博士<span style="color: #800000;">',))
t1.start()
t2.start()

4,信号量

同进程的一样

Semaphore管理一个内置的计数器,每当调用acquire()时内置计数器-1;调用release() 时内置计数器+1;计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):

threading ( %3 == =Semaphore(5 i range(23=Thread(target=
与进程池是完全不同的概念,进程池Pool(4),最大只能产生4个进程,而且从头到尾都只是这四个进程,不会产生新的,而信号量是产生一堆线程/进程

事件

同进程的一样

线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象,而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件,继续执行

==

例如,有多个工作线程尝试链接MySQL,我们想要在链接前确保MySQL服务正常才让那些工作线程去连接MySQL服务器,如果连接不成功,都会去尝试重新连接。那么我们就可以采用threading.Event机制来协调各个工作线程的连接操作

threading <span style="color: #0000ff;">def<span style="color: #000000;"> conn_mysql():
count
=1
<span style="color: #0000ff;">while
<span style="color: #0000ff;">not
<span style="color: #000000;"> event.is_set():
<span style="color: #0000ff;">if
count > 3<span style="color: #000000;">:
<span style="color: #0000ff;">raise TimeoutError(<span style="color: #800000;">'<span style="color: #800000;">链接超时<span style="color: #800000;">'<span style="color: #000000;">)
<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;"><%s>第%s次尝试链接<span style="color: #800000;">' %<span style="color: #000000;"> (threading.current_thread().getName(),count))
event.wait(0.5<span style="color: #000000;">)
count+=1
<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;"><%s>链接成功<span style="color: #800000;">' %<span style="color: #000000;">threading.current_thread().getName())

<span style="color: #0000ff;">def<span style="color: #000000;"> check_mysql():
<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;">33[45m[%s]正在检查mysql33[0m<span style="color: #800000;">' %<span style="color: #000000;"> threading.current_thread().getName())
time.sleep(random.randint(2,4<span style="color: #000000;">))
event.set()
<span style="color: #0000ff;">if <span style="color: #800080;">name == <span style="color: #800000;">'<span style="color: #800000;">main<span style="color: #800000;">'<span style="color: #000000;">:
event=<span style="color: #000000;">Event()
conn1=Thread(target=<span style="color: #000000;">conn_mysql)
conn2=Thread(target=<span style="color: #000000;">conn_mysql)
check=Thread(target=<span style="color: #000000;">check_mysql)

conn1.start()
conn2.start()
check.start()</span></pre>

条件

使得线程等待,只有满足某条件时,才释放n个线程

Python提供的Condition对象提供了对复杂线程同步问题的支持。Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。

代码说明:

<span style="color: #0000ff;">def<span style="color: #000000;"> run(n):
con.acquire()
con.wait()
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;">run the thread: %s<span style="color: #800000;">" %<span style="color: #000000;"> n)
con.release()

<span style="color: #0000ff;">if <span style="color: #800080;">name == <span style="color: #800000;">'<span style="color: #800000;">main<span style="color: #800000;">'<span style="color: #000000;">:

con </span>=<span style="color: #000000;"&gt; threading.Condition()
</span><span style="color: #0000ff;"&gt;for</span> i <span style="color: #0000ff;"&gt;in</span> range(10<span style="color: #000000;"&gt;):
    t </span>= threading.Thread(target=run,args=<span style="color: #000000;"&gt;(i,))
    t.start()

</span><span style="color: #0000ff;"&gt;while</span><span style="color: #000000;"&gt; True:
    inp </span>= input(<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;>>></span><span style="color: #800000;"&gt;'</span><span style="color: #000000;"&gt;)
    </span><span style="color: #0000ff;"&gt;if</span> inp == <span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;q</span><span style="color: #800000;"&gt;'</span><span style="color: #000000;"&gt;:
        </span><span style="color: #0000ff;"&gt;break</span><span style="color: #000000;"&gt;
    con.acquire()
    con.notify(int(inp))
    con.release()
    </span><span style="color: #0000ff;"&gt;print</span>(<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;****</span><span style="color: #800000;"&gt;'</span>)</pre>

计时器

定时器,指定n秒后执行某个操作

threading <span style="color: #0000ff;">def<span style="color: #000000;"> hello():
<span style="color: #0000ff;">print(<span style="color: #800000;">"<span style="color: #800000;">hello,world<span style="color: #800000;">"<span style="color: #000000;">)

t = Timer(1<span style="color: #000000;">,hello)
t.start() <span style="color: #008000;">#<span style="color: #008000;"> after 1 seconds,"hello,world" will be printed

线程队列

queue队列 :使用import queue,用法与进程Queue一样

queue is especially useful in threaded programming when information must be exchanged safely between multiple threads.

maxsize=0
q=<span style="color: #000000;">queue.Queue()
q.put(
<span style="color: #800000;">'<span style="color: #800000;">first<span style="color: #800000;">'<span style="color: #000000;">)
q.put(<span style="color: #800000;">'<span style="color: #800000;">second<span style="color: #800000;">'<span style="color: #000000;">)
q.put(<span style="color: #800000;">'<span style="color: #800000;">third<span style="color: #800000;">'<span style="color: #000000;">)

<span style="color: #0000ff;">print<span style="color: #000000;">(q.get())
<span style="color: #0000ff;">print<span style="color: #000000;">(q.get())
<span style="color: #0000ff;">print<span style="color: #000000;">(q.get())
<span style="color: #800000;">'''<span style="color: #800000;">
结果(先进先出):
first
second
third
<span style="color: #800000;">'''

maxsize=0

q=<span style="color: #000000;">queue.LifoQueue()
q.put(
<span style="color: #800000;">'<span style="color: #800000;">first<span style="color: #800000;">'<span style="color: #000000;">)
q.put(<span style="color: #800000;">'<span style="color: #800000;">second<span style="color: #800000;">'<span style="color: #000000;">)
q.put(<span style="color: #800000;">'<span style="color: #800000;">third<span style="color: #800000;">'<span style="color: #000000;">)

<span style="color: #0000ff;">print<span style="color: #000000;">(q.get())
<span style="color: #0000ff;">print<span style="color: #000000;">(q.get())
<span style="color: #0000ff;">print<span style="color: #000000;">(q.get())
<span style="color: #800000;">'''<span style="color: #800000;">
结果(后进先出):
third
second
first
<span style="color: #800000;">'''

maxsize=0

q=<span style="color: #000000;">queue.PriorityQueue()
<span style="color: #008000;">#<span style="color: #008000;">put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20,<span style="color: #800000;">'<span style="color: #800000;">a<span style="color: #800000;">'<span style="color: #000000;">))
q.put((10,<span style="color: #800000;">'<span style="color: #800000;">b<span style="color: #800000;">'<span style="color: #000000;">))
q.put((30,<span style="color: #800000;">'<span style="color: #800000;">c<span style="color: #800000;">'<span style="color: #000000;">))

<span style="color: #0000ff;">print<span style="color: #000000;">(q.get())
<span style="color: #0000ff;">print<span style="color: #000000;">(q.get())
<span style="color: #0000ff;">print<span style="color: #000000;">(q.get())
<span style="color: #800000;">'''<span style="color: #800000;">
结果(数字越小优先级越高,优先级高的优先出队):
(10,'b')
(20,'a')
(30,'c')
<span style="color: #800000;">'''

Constructor a priority queue. maxsize an integer that sets the upperbound limit on the number of items that can be placed the queue. Insertion will block once this size has been reached,until queue items are consumed. If maxsize less than equal to zero,the queue size The lowest valued entries are retrieved first (the lowest valued entry <span style="color: #0000ff;">is the one returned by sorted(list(entries))[0]). A typical pattern <span style="color: #0000ff;">for entries <span style="color: #0000ff;">is a tuple <span style="color: #0000ff;">in<span style="color: #000000;"> the form: (priority_number,data).

exception queue.Empty
Exception raised when non-blocking get() (<span style="color: #0000ff;">or get_nowait()) <span style="color: #0000ff;">is called on a Queue object which <span style="color: #0000ff;">is<span style="color: #000000;"> empty.

exception queue.Full
Exception raised when non-blocking put() (<span style="color: #0000ff;">or put_nowait()) <span style="color: #0000ff;">is called on a Queue object which <span style="color: #0000ff;">is<span style="color: #000000;"> full.

Queue.qsize()
Queue.empty() <span style="color: #008000;">#<span style="color: #008000;">return True if empty
Queue.full() <span style="color: #008000;">#<span style="color: #008000;"> return True if full
Queue.put(item,block=True,timeout=<span style="color: #000000;">None)
Put item into the queue. If optional args block <span style="color: #0000ff;">is true <span style="color: #0000ff;">and timeout <span style="color: #0000ff;">is None (the default),block <span style="color: #0000ff;">if necessary until a free slot <span style="color: #0000ff;">is available. If timeout <span style="color: #0000ff;">is a positive number,it blocks at most timeout seconds <span style="color: #0000ff;">and raises the Full exception <span style="color: #0000ff;">if no free slot was available within that time. Otherwise (block <span style="color: #0000ff;">is false),put an item on the queue <span style="color: #0000ff;">if a free slot <span style="color: #0000ff;">is immediately available,<span style="color: #0000ff;">else <span style="color: #0000ff;">raise the Full exception (timeout <span style="color: #0000ff;">is ignored <span style="color: #0000ff;">in<span style="color: #000000;"> that case).

Queue.put_nowait(item)
Equivalent to put(item,False).

Queue.get(block=True,timeout=<span style="color: #000000;">None)
Remove <span style="color: #0000ff;">and <span style="color: #0000ff;">return an item <span style="color: #0000ff;">from the queue. If optional args block <span style="color: #0000ff;">is true <span style="color: #0000ff;">and timeout <span style="color: #0000ff;">is None (the default),block <span style="color: #0000ff;">if necessary until an item <span style="color: #0000ff;">is available. If timeout <span style="color: #0000ff;">is a positive number,it blocks at most timeout seconds <span style="color: #0000ff;">and raises the Empty exception <span style="color: #0000ff;">if no item was available within that time. Otherwise (block <span style="color: #0000ff;">is false),<span style="color: #0000ff;">return an item <span style="color: #0000ff;">if one <span style="color: #0000ff;">is immediately available,<span style="color: #0000ff;">else <span style="color: #0000ff;">raise the Empty exception (timeout <span style="color: #0000ff;">is ignored <span style="color: #0000ff;">in<span style="color: #000000;"> that case).

Queue.get_nowait()
Equivalent to get(False).

Two methods are offered to support tracking whether enqueued tasks have been fully processed by daemon consumer threads.

Queue.task_done()
Indicate that a formerly enqueued task <span style="color: #0000ff;">is complete. Used by queue consumer threads. For each get() used to fetch a task,a subsequent call to task_done() tells the queue that the processing on the task <span style="color: #0000ff;">is<span style="color: #000000;"> complete.

If a join() <span style="color: #0000ff;">is currently blocking,it will resume when all items have been processed (meaning that a task_done() call was received <span style="color: #0000ff;">for<span style="color: #000000;"> every item that had been put() into the queue).

Raises a ValueError <span style="color: #0000ff;">if called more times than there were items placed <span style="color: #0000ff;">in<span style="color: #000000;"> the queue.

Queue.join() block直到queue被消费完毕

5,Python标准模块 -- concurrent.futures

官方:

defined by the abstract Executor <span style="color: #008000;">#<span style="color: #008000;">2 基本方法<span style="color: #008000;">

<span style="color: #008000;">submit(fn,*args,**kwargs)

<span style="color: #000000;">异步提交任务

<span style="color: #008000;">#<span style="color: #008000;">map(func,*iterables,timeout=None,chunksize=1)
<span style="color: #000000;">取代for循环submit的操作

<span style="color: #008000;">#<span style="color: #008000;">shutdown(wait=True)
相当于进程池的pool.close()+<span style="color: #000000;">pool.join()操作
wait=<span style="color: #000000;">True,等待池内所有任务执行完毕回收完资源后才继续
wait=<span style="color: #000000;">False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前

<span style="color: #008000;">#<span style="color: #008000;">result(timeout=None)
<span style="color: #000000;">取得结果

<span style="color: #008000;">#<span style="color: #008000;">add_done_callback(fn)
回调函数

The ProcessPoolExecutor an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module,which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed <span style="color: #0000ff;">class concurrent.futures.ProcessPoolExecutor(max_workers=None,mp_context=<span style="color: #000000;">None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers
<span style="color: #0000ff;">is
None <span style="color: #0000ff;">or
<span style="color: #0000ff;">not
given,it will default to the number of processors on the machine. If max_workers <span style="color: #0000ff;">is lower <span style="color: #0000ff;">or<span style="color: #000000;"> equal to 0,then a ValueError will be raised.

<span style="color: #008000;">#<span style="color: #008000;">用法
<span style="color: #0000ff;">from concurrent.futures <span style="color: #0000ff;">import<span style="color: #000000;"> ThreadPoolExecutor,ProcessPoolExecutor

<span style="color: #0000ff;">import<span style="color: #000000;"> os,time,random
<span style="color: #0000ff;">def<span style="color: #000000;"> task(n):
<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;">%s is runing<span style="color: #800000;">' %<span style="color: #000000;">os.getpid())
time.sleep(random.randint(1,3<span style="color: #000000;">))
<span style="color: #0000ff;">return n**2

<span style="color: #0000ff;">if <span style="color: #800080;">name == <span style="color: #800000;">'<span style="color: #800000;">main<span style="color: #800000;">'<span style="color: #000000;">:

executor</span>=ProcessPoolExecutor(max_workers=3<span style="color: #000000;"&gt;)

futures</span>=<span style="color: #000000;"&gt;[]
</span><span style="color: #0000ff;"&gt;for</span> i <span style="color: #0000ff;"&gt;in</span> range(11<span style="color: #000000;"&gt;):
    future</span>=<span style="color: #000000;"&gt;executor.submit(task,i)
    futures.append(future)
executor.shutdown(True)
</span><span style="color: #0000ff;"&gt;print</span>(<span style="color: #800000;"&gt;'</span><span style="color: #800000;"&gt;+++></span><span style="color: #800000;"&gt;'</span><span style="color: #000000;"&gt;)
</span><span style="color: #0000ff;"&gt;for</span> future <span style="color: #0000ff;"&gt;in</span><span style="color: #000000;"&gt; futures:
    </span><span style="color: #0000ff;"&gt;print</span>(future.result())</pre>
ThreadPoolExecutor concurrent.futures.ThreadPoolExecutor(max_workers=None,thread_name_prefix=Changed <span style="color: #0000ff;">in version 3.5: If max_workers <span style="color: #0000ff;">is None <span style="color: #0000ff;">or <span style="color: #0000ff;">not given,it will default to the number of processors on the machine,multiplied by 5,assuming that ThreadPoolExecutor <span style="color: #0000ff;">is often used to overlap I/O instead of CPU work <span style="color: #0000ff;">and the number of workers should be higher than the number of workers <span style="color: #0000ff;">for<span style="color: #000000;"> ProcessPoolExecutor.

New <span style="color: #0000ff;">in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names <span style="color: #0000ff;">for worker threads created by the pool <span style="color: #0000ff;">for<span style="color: #000000;"> easier debugging.

<span style="color: #008000;">#<span style="color: #008000;">用法
与ProcessPoolExecutor相同

concurrent.futures n**2

<span style="color: #0000ff;">if <span style="color: #800080;">name == <span style="color: #800000;">'<span style="color: #800000;">main<span style="color: #800000;">'<span style="color: #000000;">:

executor</span>=ThreadPoolExecutor(max_workers=3<span style="color: #000000;"&gt;)

</span><span style="color: #008000;"&gt;#</span><span style="color: #008000;"&gt; for i in range(11):</span>
<span style="color: #008000;"&gt;#</span><span style="color: #008000;"&gt;     future=executor.submit(task,i)</span>

<span style="color: #000000;">
executor.map(task,range(1,12)) <span style="color: #008000;">#<span style="color: #008000;">map取代了for+submit

concurrent.futures multiprocessing <span style="color: #0000ff;">def<span style="color: #000000;"> get_page(url):
<span style="color: #0000ff;">print
(<span style="color: #800000;">'
<span style="color: #800000;"><进程%s> get %s
<span style="color: #800000;">'
%<span style="color: #000000;">(os.getpid(),url))
respone
=<span style="color: #000000;">requests.get(url)
<span style="color: #0000ff;">if
respone.status_code == 200<span style="color: #000000;">:
<span style="color: #0000ff;">return
{<span style="color: #800000;">'<span style="color: #800000;">url<span style="color: #800000;">':url,<span style="color: #800000;">'<span style="color: #800000;">text<span style="color: #800000;">'<span style="color: #000000;">:respone.text}

<span style="color: #0000ff;">def<span style="color: #000000;"> parse_page(res):
res=<span style="color: #000000;">res.result()
<span style="color: #0000ff;">print(<span style="color: #800000;">'<span style="color: #800000;"><进程%s> parse %s<span style="color: #800000;">' %(os.getpid(),res[<span style="color: #800000;">'<span style="color: #800000;">url<span style="color: #800000;">'<span style="color: #000000;">]))
parse_res=<span style="color: #800000;">'<span style="color: #800000;">url:<%s> size:[%s]n<span style="color: #800000;">' %(res[<span style="color: #800000;">'<span style="color: #800000;">url<span style="color: #800000;">'],len(res[<span style="color: #800000;">'<span style="color: #800000;">text<span style="color: #800000;">'<span style="color: #000000;">]))
with open(<span style="color: #800000;">'<span style="color: #800000;">db.txt<span style="color: #800000;">',<span style="color: #800000;">'<span style="color: #800000;">a<span style="color: #800000;">'<span style="color: #000000;">) as f:
f.write(parse_res)

<span style="color: #0000ff;">if <span style="color: #800080;">name == <span style="color: #800000;">'<span style="color: #800000;">main<span style="color: #800000;">'<span style="color: #000000;">:
urls=<span style="color: #000000;">[
<span style="color: #800000;">'<span style="color: #800000;">https://www.baidu.com<span style="color: #800000;">'<span style="color: #000000;">,<span style="color: #800000;">'<span style="color: #800000;">https://www.python.org<span style="color: #800000;">'<span style="color: #000000;">,<span style="color: #800000;">'<span style="color: #800000;">https://www.openstack.org<span style="color: #800000;">'<span style="color: #000000;">,<span style="color: #800000;">'<span style="color: #800000;">https://help.github.com/<span style="color: #800000;">'<span style="color: #000000;">,<span style="color: #800000;">'<span style="color: #800000;">http://www.sina.com.cn/<span style="color: #800000;">'<span style="color: #000000;">
]

</span><span style="color: #008000;"&gt;#</span><span style="color: #008000;"&gt; p=Pool(3)</span>
<span style="color: #008000;"&gt;#</span><span style="color: #008000;"&gt; for url in urls:</span>
<span style="color: #008000;"&gt;#</span><span style="color: #008000;"&gt;     p.apply_async(get_page,args=(url,),callback=pasrse_page)</span>
<span style="color: #008000;"&gt;#</span><span style="color: #008000;"&gt; p.close()</span>
<span style="color: #008000;"&gt;#</span><span style="color: #008000;"&gt; p.join()</span>

<span style="color: #000000;">
p=ProcessPoolExecutor(3<span style="color: #000000;">)
<span style="color: #0000ff;">for url <span style="color: #0000ff;">in<span style="color: #000000;"> urls:
p.submit(get_page,url).add_done_callback(parse_page) <span style="color: #008000;">#<span style="color: #008000;">parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读