加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

python多线程,多进程

发布时间:2020-12-20 10:29:11 所属栏目:Python 来源:网络整理
导读:threading. active_count ( ) 返回当前存活的线程类? Thread ?对象。返回的计数等于? enumerate() ?返回的列表长度。 threading. current_thread ( ) 返回当前对应调用者的控制线程的? Thread ?对象。如果调用者的控制线程不是利用? threading ?创建,会返回

threading.active_count()

返回当前存活的线程类?Thread?对象。返回的计数等于?enumerate()?返回的列表长度。

threading.current_thread()

返回当前对应调用者的控制线程的?Thread?对象。如果调用者的控制线程不是利用?threading?创建,会返回一个功能受限的虚拟线程对象。

threading.get_ident()

?

返回当前线程的 “线程标识符”。它是一个非零的整数。它的值没有直接含义,主要是用作 magic cookie,比如作为含有线程相关数据的字典的索引。线程标识符可能会在线程退出,新线程创建时被复用。

threading.enumerate()

以列表形式返回当前所有存活的?Thread?对象。 该列表包含守护线程,current_thread()?创建的虚拟线程对象和主线程。它不包含已终结的线程和尚未开始的线程。

threading.main_thread()

?

返回主?Thread?对象。一般情况下,主线程是Python解释器开始时创建的线程。

threading.setprofile(func)

为所有?threading?模块开始的线程设置性能测试函数。在每个线程的?run()?方法被调用前,func?会被传递给?sys.setprofile()?。

threading.stack_size([size])

返回创建线程时用的堆栈大小。可选参数?size?指定之后新建的线程的堆栈大小,而且一定要是0(根据平台或者默认配置)或者最小是32,768(32KiB)的一个正整数。如果?size?没有指定,默认是0。如果不支持改变线程堆栈大小,会抛出?RuntimeError?错误。如果指定的堆栈大小不合法,会抛出?ValueError?错误并且不会修改堆栈大小。32KiB是当前最小的能保证解释器有足够堆栈空间的堆栈大小。需要注意的是部分平台对于堆栈大小会有特定的限制,例如要求大于32KiB的堆栈大小或者需要根据系统内存页面的整数倍进行分配 - 应当查阅平台文档有关详细信息(4KiB页面比较普遍,在没有更具体信息的情况下,建议的方法是使用4096的倍数作为堆栈大小)。

threading.TIMEOUT_MAX

阻塞函数(?Lock.acquire(),?RLock.acquire(),?Condition.wait(),...)中形参?timeout?允许的最大值。传入超过这个值的 timeout 会抛出?OverflowError?异常。

有个 "主线程" 对象;这对应Python程序里面初始的控制线程。它不是一个守护线程。

"虚拟线程对象" 是可以被创建的。这些是对应于“外部线程”的线程对象,它们是在线程模块外部启动的控制线程,例如直接来自C代码。虚拟线程对象功能受限;他们总是被认为是存活的和守护模式,不能被?join()?。因为无法检测外来线程的终结,它们永远不会被删除。

class? threading. Thread (group=None,?target=None,?name=None,?args=(),?kwargs={},?*,?daemon=None)

调用这个构造函数时,必需带有关键字参数。参数如下:

group?应该为?None;为了日后扩展?ThreadGroup?类实现而保留。

target?是用于?run()?方法调用的可调用对象。默认是?None,表示不需要调用任何方法。

name?是线程名称。默认情况下,由 "Thread-N" 格式构成一个唯一的名称,其中?N?是小的十进制数。

args?是用于调用目标函数的参数元组。默认是?()

kwargs?是用于调用目标函数的关键字参数字典。默认是?{}

如果?daemon?不是?None,线程将被显式的设置为?守护模式,不管该线程是否是守护模式。如果是?None?(默认值),线程将继承当前线程的守护模式属性。

如果子类型重载了构造函数,它一定要确保在做任何事前,先发起调用基类构造器(Thread.__init__())。

在 3.3 版更改:?加入?daemon?参数。

start ()

开始线程活动。

它在一个线程里最多只能被调用一次。它安排对象的?run()?方法在一个独立的控制进程中调用。

如果同一个线程对象中调用这个方法的次数大于一次,会抛出?RuntimeError?。

run ()

代表线程活动的方法。

你可以在子类型里重载这个方法。 标准的?run()?方法会对作为?target?参数传递给该对象构造器的可调用对象(如果存在)发起调用,并附带从?args?和?kwargs?参数分别获取的位置和关键字参数。

join (timeout=None)

等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用?join()?的线程终结 -- 不管是正常终结还是抛出未处理异常 -- 或者直到发生超时,超时选项是可选的。

当?timeout?参数存在而且不是?None?时,它应该是一个用于指定操作超时的以秒为单位的浮点数(或者分数)。因为?join()?总是返回?None?,所以你一定要在?join()?后调用?is_alive()?才能判断是否发生超时 -- 如果线程仍然存货,则?join()?超时。

当?timeout?参数不存在或者是?None?,这个操作会阻塞直到线程终结。

一个线程可以被?join()?很多次。

如果尝试加入当前线程会导致死锁,?join()?会引起?RuntimeError?异常。如果尝试?join()?一个尚未开始的线程,也会抛出相同的异常。

name

只用于识别的字符串。它没有语义。多个线程可以赋予相同的名称。 初始名称由构造函数设置。

getName ()
setName ()

旧的?name?取值/设值 API;直接当做特征属性使用它。

ident

这个线程的 ‘线程标识符‘,如果线程尚未开始则为?None?。这是个非零整数。参见?get_ident()?函数。当一个线程退出而另外一个线程被创建,线程标识符会被复用。即使线程退出后,仍可得到标识符。

is_alive ()

返回线程是否存活。

当?run()?方法刚开始直到?run()?方法刚结束,这个方法返回?True?。模块函数?enumerate()?返回包含所有存活线程的列表。

daemon

一个表示这个线程是(True)否(False)守护线程的布尔值。一定要在调用?start()?前设置好,不然会抛出?RuntimeError?。初始值继承于创建线程;主线程不是守护线程,因此主线程创建的所有线程默认都是?daemon?=?False

当没有存活的非守护线程时,整个Python程序才会退出。

isDaemon ()
setDaemon ()

旧的?name?取值/设值 API;建议直接当做特征属性使用它。

#线程是cpu最小的单元

from threading import Thread,active_count,current_thread,enumerate,main_thread,stack_size,Lock
import time
import random

# print(active_count()) #返回当前的线程数
# print(current_thread()) #返回当前的线程Thread对象
# print(enumerate()) #反回当前活动的线程
# print(main_thread()) #返回当前的主线程
# print(stack_size()) #返回创建线程时使用的栈的大小,如果size参数,则用来指定后续创建的线程使用的栈的大小,size必须是0,(表示使用系统默认的值)大于32k的正整数

#
arr = []
def task(num):
    time.sleep(3)
    print(num)
#
start = time.time()

for i in range(0,5):
    t =Thread(target=task,args=(i,))
    t.start()
    arr.append(t)
    # print(enumerate())
    # t.join()

for j in arr:
    j.join()

end = time.time()
print(时间:%s%(end-start))

‘‘‘
Thread对象常用方法:
start()运行线程
join()阻塞主线程 time阻塞时间
name 线程的名字
is_alive() 判断线程是否存活
ident 线程的标识
daemon 是否为守护进程
‘‘‘

# class MyI(Thread):
#     def __init__(self):
#         super(MyI,self).__init__()
#     def run(self):
#         time.sleep(3)
#         print(self.name,self.ident,self.daemon)
#
# t1 = MyI()
# t1.name = "任务一"
# # t1.daemon=True
# t1.start()

#
# h2=[]
# l = Lock()
# class xm(Thread):
#
#     def __init__(self):
#         super(xm,self).__init__()
#         self.h = True
#     def run(self):
#
#         if len(h2)<5:
#             l.acquire()
#             while self.h:
#                 h2.append(1)
#                 print(‘小明往锅里加了%s‘%len(h2)+‘个丸子‘,‘锅里有%s‘%len(h2)+‘个丸子‘)
#                 if len(h2)==5:
#                     l.release()
#                     self.h = False
#
# class xh(Thread):
#     def __init__(self):
#         super(xh,self).__init__()
#         self.h = True
#
#     def run(self):
#         i = 0
#         if len(h2)==5:
#             l.acquire()
#             while self.h:
#                 i+=1
#                 h2.pop(0)
#                 print(‘小红吃了锅里的%s‘%(i)+‘个丸子‘,‘锅里有%s‘%len(h2)+‘个丸子‘)
#                 if len(h2)==0:
#                     l.release()
#                     self.h = False
#
#
# x1 = xm()
# x1.start()
#
# x2 = xh()
# x2.start()
#
# ‘‘‘
# def m(*args):
#     print(random.randint(1,args))
# Thread(group=None,target=m(),args=(1,2,34,6)) #构造函数
# target 线程启动执行函数
# args 是元组 是线程执行函数的参数
# kwargs 是线程执行函数的参数
# ‘‘‘

multiprocessing?是一个用与?threading?模块相似API的支持产生进程的包。?multiprocessing?包同时提供本地和远程并发,使用子进程代替线程,有效避免?Global Interpreter Lock?带来的影响。因此,?multiprocessing?模块允许程序员充分利用机器上的多个核心。Unix 和 Windows 上都可以运行。

multiprocessing?模块还引入了在?threading?模块中没有类似物的API。这方面的一个主要例子是?Pool?对象,它提供了一种方便的方法,可以跨多个输入值并行化函数的执行,跨进程分配输入数据(数据并行)。以下示例演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个数据并行的基本例子使用?Pool?,

from multiprocessing import Pool def f(x): return x*x if __name__ == ‘__main__‘: with Pool(5) as p: print(p.map(f,[1,2,3])) 

将打印到标准输出

[1,4,9]

Process?类

在?multiprocessing?中,通过创建一个?Process?对象然后调用它的?start()?方法来生成进程。?Process?和?threading.Thread?API 相同。 一个简单的多进程程序示例是:

from multiprocessing import Process def f(name): print(‘hello‘,name) if __name__ == ‘__main__‘: p = Process(target=f,args=(‘bob‘,)) p.start() p.join() 

要显示所涉及的各个进程ID,这是一个扩展示例:

from multiprocessing import Process import os def info(title): print(title) print(‘module name:‘,__name__) print(‘parent process:‘,os.getppid()) print(‘process id:‘,os.getpid()) def f(name): info(‘function f‘) print(‘hello‘,name) if __name__ == ‘__main__‘: info(‘main line‘) p = Process(target=f,)) p.start() p.join()

POOL进程池
# from multiprocessing import Pool
# def fun():
# sum = 0
# for i in range(0,100000):
# sum+=i
# print(sum)
#
# if __name__ == ‘__main__‘:
# pool = Pool(5) #5个进程的进程池
# start = time.time()
# for i in range(100):
# # pool.apply(fun)
# obj = pool.apply_async(fun)
# print(obj)
#
# end =time.time()
# print(end-start)

 
      

队列

Queue?类是一个近似?queue.Queue?的克隆。 例如:

from multiprocessing import Process,Queue def f(q): q.put([42,None,‘hello‘]) if __name__ == ‘__main__‘: q = Queue() p = Process(target=f,args=(q,)) p.start() print(q.get()) # prints "[42,None,‘hello‘]" p.join() 

队列是线程和进程安全的。

管道

Pipe()?函数返回一个由管道连接的连接对象,默认情况下是双工(双向)。例如:

from multiprocessing import Process,Pipe def f(conn): conn.send([42,‘hello‘]) conn.close() if __name__ == ‘__main__‘: parent_conn,child_conn = Pipe() p = Process(target=f,args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42,‘hello‘]" p.join() 

返回的两个连接对象?Pipe()?表示管道的两端。每个连接对象都有?send()?和?recv()?方法(相互之间的)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的?同一?端,则管道中的数据可能会损坏。当然,同时使用管道的不同端的进程不存在损坏的风险。

 
      

进程之间的同步

multiprocessing?包含来自?threading?的所有同步基本体的等价物。例如,可以使用锁来确保一次只有一个进程打印到标准输出:

from multiprocessing import Process,Lock def f(l,i): l.acquire() try: print(‘hello world‘,i) finally: l.release() if __name__ == ‘__main__‘: lock = Lock() for num in range(10): Process(target=f,args=(lock,num)).start() 

不使用来自不同进程的锁输出容易产生混淆。

 
      

在进程之间共享状态

如上所述,在进行并发编程时,通常最好尽量避免使用共享状态。使用多个进程时尤其如此。

但是,如果你真的需要使用一些共享数据,那么?multiprocessing?提供了两种方法。

共享内存

可以使用?Value?或?Array?将数据存储在共享内存映射中。例如,以下代码:

from multiprocessing import Process,Value,Array def f(n,a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == ‘__main__‘: num = Value(‘d‘,0.0) arr = Array(‘i‘,range(10)) p = Process(target=f,args=(num,arr)) p.start() p.join() print(num.value) print(arr[:]) 

将打印

3.1415927
[0,-1,-2,-3,-4,-5,-6,-7,-8,-9] 

创建?num?和?arr?时使用的?‘d‘?和?‘i‘?参数是?array?模块使用的类型的 typecode :?‘d‘?表示双精度浮点数,?‘i‘?表示有符号整数。这些共享对象将是进程和线程安全的。

为了更灵活地使用共享内存,可以使用?multiprocessing.sharedctypes?模块,该模块支持创建从共享内存分配的任意ctypes对象。

服务器进程

由?Manager()?返回的管理器对象控制一个服务器进程,该进程保存Python对象并允许其他进程使用代理操作它们。

Manager()?返回的管理器支持类型:?list?、?dict?、?Namespace?、?Lock?、?RLock?、?Semaphore?、?BoundedSemaphore?、?Condition?、?Event?、?Barrier?、?Queue?、?Value?和?Array?。例如

from multiprocessing import Process,Manager def f(d,l): d[1] = ‘1‘ d[‘2‘] = 2 d[0.25] = None l.reverse() if __name__ == ‘__main__‘: with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f,args=(d,l)) p.start() p.join() print(d) print(l) 

将打印

{0.25: None,1: ‘1‘,‘2‘: 2} [9,8,7,6,5,3,1,0] 

服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络由不同计算机上的进程共享。但是,它们比使用共享内存慢。

服务端的创建
from multiprocessing.managers import BaseManager
from queue import Queue
queue = Queue()
queue.put(‘lolololol‘)
class QueueManager(BaseManager): pass
QueueManager.register(‘get_queue‘,callable=lambda:queue)
m = QueueManager(address=(‘127.0.0.1‘,5000),authkey=b‘2ab‘)
s = m.get_server()
s.serve_forever() #创建服务器

客户端的创建
from multiprocessing.managers import BaseManagerclass QueueManager(BaseManager): passQueueManager.register(‘get_queue‘)m = QueueManager(address=(‘127.0.0.1‘,authkey=b‘2ab‘)m.connect()queue = m.get_queue()print(queue.get())

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读