threading. active_count ()
返回当前存活的线程类?Thread ?对象。返回的计数等于?enumerate() ?返回的列表长度。
threading. current_thread ()
返回当前对应调用者的控制线程的?Thread ?对象。如果调用者的控制线程不是利用?threading ?创建,会返回一个功能受限的虚拟线程对象。
threading. get_ident ()
?
返回当前线程的 “线程标识符”。它是一个非零的整数。它的值没有直接含义,主要是用作 magic cookie,比如作为含有线程相关数据的字典的索引。线程标识符可能会在线程退出,新线程创建时被复用。
threading. enumerate ()
以列表形式返回当前所有存活的?Thread ?对象。 该列表包含守护线程,current_thread() ?创建的虚拟线程对象和主线程。它不包含已终结的线程和尚未开始的线程。
threading. main_thread ()
?
返回主?Thread ?对象。一般情况下,主线程是Python解释器开始时创建的线程。
threading. setprofile (func)
为所有?threading ?模块开始的线程设置性能测试函数。在每个线程的?run() ?方法被调用前,func?会被传递给?sys.setprofile() ?。
threading. stack_size ([size])
返回创建线程时用的堆栈大小。可选参数?size?指定之后新建的线程的堆栈大小,而且一定要是0(根据平台或者默认配置)或者最小是32,768(32KiB)的一个正整数。如果?size?没有指定,默认是0。如果不支持改变线程堆栈大小,会抛出?RuntimeError ?错误。如果指定的堆栈大小不合法,会抛出?ValueError ?错误并且不会修改堆栈大小。32KiB是当前最小的能保证解释器有足够堆栈空间的堆栈大小。需要注意的是部分平台对于堆栈大小会有特定的限制,例如要求大于32KiB的堆栈大小或者需要根据系统内存页面的整数倍进行分配 - 应当查阅平台文档有关详细信息(4KiB页面比较普遍,在没有更具体信息的情况下,建议的方法是使用4096的倍数作为堆栈大小)。
threading. TIMEOUT_MAX
阻塞函数(?Lock.acquire() ,?RLock.acquire() ,?Condition.wait() ,...)中形参?timeout?允许的最大值。传入超过这个值的 timeout 会抛出?OverflowError ?异常。
有个 "主线程" 对象;这对应Python程序里面初始的控制线程。它不是一个守护线程。
"虚拟线程对象" 是可以被创建的。这些是对应于“外部线程”的线程对象,它们是在线程模块外部启动的控制线程,例如直接来自C代码。虚拟线程对象功能受限;他们总是被认为是存活的和守护模式,不能被?join() ?。因为无法检测外来线程的终结,它们永远不会被删除。
-
class?
threading.
Thread
(group=None,?target=None,?name=None,?args=(),?kwargs={},?*,?daemon=None)
-
调用这个构造函数时,必需带有关键字参数。参数如下:
group?应该为?None ;为了日后扩展?ThreadGroup ?类实现而保留。
target?是用于?run() ?方法调用的可调用对象。默认是?None ,表示不需要调用任何方法。
name?是线程名称。默认情况下,由 "Thread-N" 格式构成一个唯一的名称,其中?N?是小的十进制数。
args?是用于调用目标函数的参数元组。默认是?() 。
kwargs?是用于调用目标函数的关键字参数字典。默认是?{} 。
如果?daemon?不是?None ,线程将被显式的设置为?守护模式,不管该线程是否是守护模式。如果是?None ?(默认值),线程将继承当前线程的守护模式属性。
如果子类型重载了构造函数,它一定要确保在做任何事前,先发起调用基类构造器(Thread.__init__() )。
-
start
()
-
开始线程活动。
它在一个线程里最多只能被调用一次。它安排对象的?run() ?方法在一个独立的控制进程中调用。
如果同一个线程对象中调用这个方法的次数大于一次,会抛出?RuntimeError ?。
-
run
()
-
代表线程活动的方法。
你可以在子类型里重载这个方法。 标准的?run() ?方法会对作为?target?参数传递给该对象构造器的可调用对象(如果存在)发起调用,并附带从?args?和?kwargs?参数分别获取的位置和关键字参数。
-
join
(timeout=None)
-
等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用?join() ?的线程终结 -- 不管是正常终结还是抛出未处理异常 -- 或者直到发生超时,超时选项是可选的。
当?timeout?参数存在而且不是?None ?时,它应该是一个用于指定操作超时的以秒为单位的浮点数(或者分数)。因为?join() ?总是返回?None ?,所以你一定要在?join() ?后调用?is_alive() ?才能判断是否发生超时 -- 如果线程仍然存货,则?join() ?超时。
当?timeout?参数不存在或者是?None ?,这个操作会阻塞直到线程终结。
一个线程可以被?join() ?很多次。
如果尝试加入当前线程会导致死锁,?join() ?会引起?RuntimeError ?异常。如果尝试?join() ?一个尚未开始的线程,也会抛出相同的异常。
-
name
-
只用于识别的字符串。它没有语义。多个线程可以赋予相同的名称。 初始名称由构造函数设置。
-
getName
()
-
setName
()
-
旧的?name ?取值/设值 API;直接当做特征属性使用它。
-
ident
-
这个线程的 ‘线程标识符‘,如果线程尚未开始则为?None ?。这是个非零整数。参见?get_ident() ?函数。当一个线程退出而另外一个线程被创建,线程标识符会被复用。即使线程退出后,仍可得到标识符。
-
is_alive
()
-
返回线程是否存活。
当?run() ?方法刚开始直到?run() ?方法刚结束,这个方法返回?True ?。模块函数?enumerate() ?返回包含所有存活线程的列表。
-
daemon
-
一个表示这个线程是(True)否(False)守护线程的布尔值。一定要在调用?start() ?前设置好,不然会抛出?RuntimeError ?。初始值继承于创建线程;主线程不是守护线程,因此主线程创建的所有线程默认都是?daemon ?=?False 。
当没有存活的非守护线程时,整个Python程序才会退出。
-
isDaemon
()
-
setDaemon
()
-
旧的?name ?取值/设值 API;建议直接当做特征属性使用它。
#线程是cpu最小的单元
from threading import Thread,active_count,current_thread,enumerate,main_thread,stack_size,Lock
import time
import random
# print(active_count()) #返回当前的线程数
# print(current_thread()) #返回当前的线程Thread对象
# print(enumerate()) #反回当前活动的线程
# print(main_thread()) #返回当前的主线程
# print(stack_size()) #返回创建线程时使用的栈的大小,如果size参数,则用来指定后续创建的线程使用的栈的大小,size必须是0,(表示使用系统默认的值)大于32k的正整数
#
arr = []
def task(num):
time.sleep(3)
print(num)
#
start = time.time()
for i in range(0,5):
t =Thread(target=task,args=(i,))
t.start()
arr.append(t)
# print(enumerate())
# t.join()
for j in arr:
j.join()
end = time.time()
print(‘时间:%s‘%(end-start))
‘‘‘
Thread对象常用方法:
start()运行线程
join()阻塞主线程 time阻塞时间
name 线程的名字
is_alive() 判断线程是否存活
ident 线程的标识
daemon 是否为守护进程
‘‘‘
# class MyI(Thread):
# def __init__(self):
# super(MyI,self).__init__()
# def run(self):
# time.sleep(3)
# print(self.name,self.ident,self.daemon)
#
# t1 = MyI()
# t1.name = "任务一"
# # t1.daemon=True
# t1.start()
#
# h2=[]
# l = Lock()
# class xm(Thread):
#
# def __init__(self):
# super(xm,self).__init__()
# self.h = True
# def run(self):
#
# if len(h2)<5:
# l.acquire()
# while self.h:
# h2.append(1)
# print(‘小明往锅里加了%s‘%len(h2)+‘个丸子‘,‘锅里有%s‘%len(h2)+‘个丸子‘)
# if len(h2)==5:
# l.release()
# self.h = False
#
# class xh(Thread):
# def __init__(self):
# super(xh,self).__init__()
# self.h = True
#
# def run(self):
# i = 0
# if len(h2)==5:
# l.acquire()
# while self.h:
# i+=1
# h2.pop(0)
# print(‘小红吃了锅里的%s‘%(i)+‘个丸子‘,‘锅里有%s‘%len(h2)+‘个丸子‘)
# if len(h2)==0:
# l.release()
# self.h = False
#
#
# x1 = xm()
# x1.start()
#
# x2 = xh()
# x2.start()
#
# ‘‘‘
# def m(*args):
# print(random.randint(1,args))
# Thread(group=None,target=m(),args=(1,2,34,6)) #构造函数
# target 线程启动执行函数
# args 是元组 是线程执行函数的参数
# kwargs 是线程执行函数的参数
# ‘‘‘
multiprocessing ?是一个用与?threading ?模块相似API的支持产生进程的包。?multiprocessing ?包同时提供本地和远程并发,使用子进程代替线程,有效避免?Global Interpreter Lock?带来的影响。因此,?multiprocessing ?模块允许程序员充分利用机器上的多个核心。Unix 和 Windows 上都可以运行。
multiprocessing ?模块还引入了在?threading ?模块中没有类似物的API。这方面的一个主要例子是?Pool ?对象,它提供了一种方便的方法,可以跨多个输入值并行化函数的执行,跨进程分配输入数据(数据并行)。以下示例演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个数据并行的基本例子使用?Pool ?,
from multiprocessing import Pool def f(x): return x*x if __name__ == ‘__main__‘: with Pool(5) as p: print(p.map(f,[1,2,3]))
将打印到标准输出
[1,4,9]
Process ?类
在?multiprocessing ?中,通过创建一个?Process ?对象然后调用它的?start() ?方法来生成进程。?Process ?和?threading.Thread ?API 相同。 一个简单的多进程程序示例是:
from multiprocessing import Process def f(name): print(‘hello‘,name) if __name__ == ‘__main__‘: p = Process(target=f,args=(‘bob‘,)) p.start() p.join()
要显示所涉及的各个进程ID,这是一个扩展示例:
from multiprocessing import Process import os def info(title): print(title) print(‘module name:‘,__name__) print(‘parent process:‘,os.getppid()) print(‘process id:‘,os.getpid()) def f(name): info(‘function f‘) print(‘hello‘,name) if __name__ == ‘__main__‘: info(‘main line‘) p = Process(target=f,)) p.start() p.join()
POOL进程池
# from multiprocessing import Pool # def fun(): # sum = 0 # for i in range(0,100000): # sum+=i # print(sum) # # if __name__ == ‘__main__‘: # pool = Pool(5) #5个进程的进程池 # start = time.time() # for i in range(100): # # pool.apply(fun) # obj = pool.apply_async(fun) # print(obj) # # end =time.time() # print(end-start)
队列
Queue ?类是一个近似?queue.Queue ?的克隆。 例如:
from multiprocessing import Process,Queue def f(q): q.put([42,None,‘hello‘]) if __name__ == ‘__main__‘: q = Queue() p = Process(target=f,args=(q,)) p.start() print(q.get()) # prints "[42,None,‘hello‘]" p.join()
队列是线程和进程安全的。
管道
Pipe() ?函数返回一个由管道连接的连接对象,默认情况下是双工(双向)。例如:
from multiprocessing import Process,Pipe def f(conn): conn.send([42,‘hello‘]) conn.close() if __name__ == ‘__main__‘: parent_conn,child_conn = Pipe() p = Process(target=f,args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42,‘hello‘]" p.join()
返回的两个连接对象?Pipe() ?表示管道的两端。每个连接对象都有?send() ?和?recv() ?方法(相互之间的)。请注意,如果两个进程(或线程)同时尝试读取或写入管道的?同一?端,则管道中的数据可能会损坏。当然,同时使用管道的不同端的进程不存在损坏的风险。
进程之间的同步
multiprocessing ?包含来自?threading ?的所有同步基本体的等价物。例如,可以使用锁来确保一次只有一个进程打印到标准输出:
from multiprocessing import Process,Lock def f(l,i): l.acquire() try: print(‘hello world‘,i) finally: l.release() if __name__ == ‘__main__‘: lock = Lock() for num in range(10): Process(target=f,args=(lock,num)).start()
不使用来自不同进程的锁输出容易产生混淆。
在进程之间共享状态
如上所述,在进行并发编程时,通常最好尽量避免使用共享状态。使用多个进程时尤其如此。
但是,如果你真的需要使用一些共享数据,那么?multiprocessing ?提供了两种方法。
共享内存
可以使用?Value ?或?Array ?将数据存储在共享内存映射中。例如,以下代码:
from multiprocessing import Process,Value,Array def f(n,a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == ‘__main__‘: num = Value(‘d‘,0.0) arr = Array(‘i‘,range(10)) p = Process(target=f,args=(num,arr)) p.start() p.join() print(num.value) print(arr[:])
将打印
3.1415927
[0,-1,-2,-3,-4,-5,-6,-7,-8,-9]
创建?num ?和?arr ?时使用的?‘d‘ ?和?‘i‘ ?参数是?array ?模块使用的类型的 typecode :?‘d‘ ?表示双精度浮点数,?‘i‘ ?表示有符号整数。这些共享对象将是进程和线程安全的。
为了更灵活地使用共享内存,可以使用?multiprocessing.sharedctypes ?模块,该模块支持创建从共享内存分配的任意ctypes对象。
服务器进程
由?Manager() ?返回的管理器对象控制一个服务器进程,该进程保存Python对象并允许其他进程使用代理操作它们。
Manager() ?返回的管理器支持类型:?list ?、?dict ?、?Namespace ?、?Lock ?、?RLock ?、?Semaphore ?、?BoundedSemaphore ?、?Condition ?、?Event ?、?Barrier ?、?Queue ?、?Value ?和?Array ?。例如
from multiprocessing import Process,Manager def f(d,l): d[1] = ‘1‘ d[‘2‘] = 2 d[0.25] = None l.reverse() if __name__ == ‘__main__‘: with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f,args=(d,l)) p.start() p.join() print(d) print(l)
将打印
{0.25: None,1: ‘1‘,‘2‘: 2} [9,8,7,6,5,3,1,0]
服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络由不同计算机上的进程共享。但是,它们比使用共享内存慢。
服务端的创建
from multiprocessing.managers import BaseManager from queue import Queue queue = Queue() queue.put(‘lolololol‘) class QueueManager(BaseManager): pass QueueManager.register(‘get_queue‘,callable=lambda:queue) m = QueueManager(address=(‘127.0.0.1‘,5000),authkey=b‘2ab‘) s = m.get_server() s.serve_forever() #创建服务器
客户端的创建
from multiprocessing.managers import BaseManagerclass QueueManager(BaseManager): passQueueManager.register(‘get_queue‘)m = QueueManager(address=(‘127.0.0.1‘,authkey=b‘2ab‘)m.connect()queue = m.get_queue()print(queue.get())
(编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|