使用numpy / scipy最大限度地减少Python multiprocessing.Pool的
我花了几个小时来尝试并行化我的数字运算代码,但是当我这样做时它只会变慢.不幸的是,当我尝试将其减少到下面的示例时,问题就消失了,我真的不想在这里发布整个程序.所以问题是:在这类程序中我应该避免哪些陷阱?
(注意:Unutbu的答案在底部后跟进.) 以下是情况: >它是关于一个模块,它定义了一个包含大量内部数据的类BigData.在该示例中,存在一个插值函数列表ff;在实际程序中,还有更多,例如ffA [k],ffB [k],ffC [k]. #!/usr/bin/python2.7 import numpy as np,time,sys from multiprocessing import Pool from scipy.interpolate import RectBivariateSpline _tm=0 def stopwatch(msg=''): tm = time.time() global _tm if _tm==0: _tm = tm; return print("%s: %.2f seconds" % (msg,tm-_tm)) _tm = tm class BigData: def __init__(self,n): z = np.random.uniform(size=n*n*n).reshape((n,n,n)) self.ff = [] for i in range(n): f = RectBivariateSpline(np.arange(n),np.arange(n),z[i],kx=1,ky=1) self.ff.append(f) self.n = n def do_chunk(self,k,xi,yi): s = np.sum(np.exp(self.ff[k].ev(xi,yi))) sys.stderr.write(".") return s def do_multi(self,numproc,yi): procs = [] pool = Pool(numproc) stopwatch('Pool setup') for k in range(self.n): p = pool.apply_async( _do_chunk_wrapper,(self,yi)) procs.append(p) stopwatch('Jobs queued (%d processes)' % numproc) sum = 0.0 for k in range(self.n): # Edit/bugfix: replaced p.get by procs[k].get sum += np.sum(procs[k].get(timeout=30)) # timeout allows ctrl-C interrupt if k == 0: stopwatch("nFirst get() done") stopwatch('Jobs done') pool.close() pool.join() return sum def do_single(self,yi): sum = 0.0 for k in range(self.n): sum += self.do_chunk(k,yi) stopwatch('nAll in single process') return sum def _do_chunk_wrapper(bd,yi): # must be outside class for apply_async to chunk return bd.do_chunk(k,yi) if __name__ == "__main__": stopwatch() n = 50 bd = BigData(n) m = 1000*1000 xi,yi = np.random.uniform(0,size=m*2).reshape((2,m)) stopwatch('Initialized') bd.do_multi(2,yi) bd.do_multi(3,yi) bd.do_single(xi,yi) 输出: Initialized: 0.06 seconds Pool setup: 0.01 seconds Jobs queued (2 processes): 0.03 seconds .. First get() done: 0.34 seconds ................................................Jobs done: 7.89 seconds Pool setup: 0.05 seconds Jobs queued (3 processes): 0.03 seconds .. First get() done: 0.50 seconds ................................................Jobs done: 6.19 seconds .................................................. All in single process: 11.41 seconds 计时采用Intel Core i3-3227 CPU,具有2个内核,4个线程,运行64位Linux.对于实际程序,多处理版本(池机制,即使只使用一个核心)比单进程版本慢10倍. 跟进 Unutbu的回答让我走上正轨.在实际的程序中,self被腌制成一个需要传递给工作进程的37到140 MB的对象.更糟糕的是,Python酸洗非常缓慢;酸洗本身花了几秒钟,这发生在传递给工人流程的每一块工作中.除了挑选和传递大数据对象之外,Linux中apply_async的开销非常小;对于一个小函数(添加几个整数参数),每个apply_async / get对只需0.2 ms.因此,以非常小的块分割工作本身并不是问题.所以,我将所有大数组参数作为索引传递给全局变量.为了CPU缓存优化,我保持小块大小. 全局变量存储在全局字典中;在设置工作池之后,将立即在父进程中删除这些条目.只有dict的密钥才会传送给工作人员.酸洗/ IPC唯一的大数据是工人创建的新数据. #!/usr/bin/python2.7 import numpy as np,sys from multiprocessing import Pool _mproc_data = {} # global storage for objects during multiprocessing. class BigData: def __init__(self,size): self.blah = np.random.uniform(0,1,size=size) def do_chunk(self,yi): # do the work and return an array of the same shape as xi,yi zi = k*np.ones_like(xi) return zi def do_all_work(self,yi,num_proc): global _mproc_data mp_key = str(id(self)) _mproc_data['bd'+mp_key] = self # BigData _mproc_data['xi'+mp_key] = xi _mproc_data['yi'+mp_key] = yi pool = Pool(processes=num_proc) # processes have now inherited the global variabele; clean up in the parent process for v in ['bd','xi','yi']: del _mproc_data[v+mp_key] # setup indices for the worker processes (placeholder) n_chunks = 45 n = len(xi) chunk_len = n//n_chunks i1list = np.arange(0,chunk_len) i2list = i1list + chunk_len i2list[-1] = n klist = range(n_chunks) # placeholder procs = [] for i in range(n_chunks): p = pool.apply_async( _do_chunk_wrapper,(mp_key,i1list[i],i2list[i],klist[i]) ) sys.stderr.write(".") procs.append(p) sys.stderr.write("n") # allocate space for combined results zi = np.zeros_like(xi) # get data from workers and finish for i,p in enumerate(procs): zi[i1list[i]:i2list[i]] = p.get(timeout=30) # timeout allows ctrl-C handling pool.close() pool.join() return zi def _do_chunk_wrapper(key,i1,i2,k): """All arguments are small objects.""" global _mproc_data bd = _mproc_data['bd'+key] xi = _mproc_data['xi'+key][i1:i2] yi = _mproc_data['yi'+key][i1:i2] return bd.do_chunk(k,yi) if __name__ == "__main__": xi,yi = np.linspace(1,100,100001),np.linspace(1,100001) bd = BigData(int(1e7)) bd.do_all_work(xi,4) 以下是速度测试的结果(同样,2个内核,4个线程),改变了工作进程的数量和块中的内存量(xi,zi数组切片的总字节数).这些数字是“每秒百万结果值”,但这对比较并不重要. “1 process”的行是带有完整输入数据的do_chunk的直接调用,没有任何子进程. #Proc 125K 250K 500K 1000K unlimited 1 0.82 2 4.28 1.96 1.3 1.31 3 2.69 1.06 1.06 1.07 4 2.17 1.27 1.23 1.28 数据大小对内存的影响非常大. CPU具有3 MB共享L3缓存,每个核心具有256 KB L2缓存.请注意,计算还需要访问BigData对象的几MB内部数据.因此,我们从中学到的是进行这种速度测试很有用.对于这个程序,2个进程最快,其次是4个,3个是最慢的. 解决方法
尝试减少进程间通信.
在多处理模块中,通过队列完成所有(单机)进程间通信.通过队列传递的对象 被腌制.因此,尝试通过队列发送更少和/或更小的对象. >不要通过队列发送自我,BigData的实例.它相当大,随着自我数据量的增加而变大: In [6]: import pickle In [14]: len(pickle.dumps(BigData(50))) Out[14]: 1052187 一切
因此,您可以避免通过Queue传递BigData实例 p = pool.apply_async(_do_chunk_wrapper,(k_start,k_end,yi)) 并将bd作为全局访问,并对do_chunk_wrapper的呼叫签名进行必要的附加更改. 下面,我修改了_do_chunk_wrapper以接受k_start和k_end参数,这样每次调用pool.apply_async都会在返回结果之前计算k的许多值的总和. import math import numpy as np import time import sys import multiprocessing as mp import scipy.interpolate as interpolate _tm=0 def stopwatch(msg=''): tm = time.time() global _tm if _tm==0: _tm = tm; return print("%s: %.2f seconds" % (msg,n)) self.ff = [] for i in range(n): f = interpolate.RectBivariateSpline( np.arange(n),yi): n = self.n s = np.sum(np.exp(self.ff[k].ev(xi,yi))) sys.stderr.write(".") return s def do_chunk_of_chunks(self,k_start,yi): s = sum(np.sum(np.exp(self.ff[k].ev(xi,yi))) for k in range(k_start,k_end)) sys.stderr.write(".") return s def do_multi(self,yi): procs = [] pool = mp.Pool(numproc) stopwatch('nPool setup') ks = list(map(int,np.linspace(0,self.n,numproc+1))) for i in range(len(ks)-1): k_start,k_end = ks[i:i+2] p = pool.apply_async(_do_chunk_wrapper,yi)) procs.append(p) stopwatch('Jobs queued (%d processes)' % numproc) total = 0.0 for k,p in enumerate(procs): total += np.sum(p.get(timeout=30)) # timeout allows ctrl-C interrupt if k == 0: stopwatch("nFirst get() done") print(total) stopwatch('Jobs done') pool.close() pool.join() return total def do_single(self,yi): total = 0.0 for k in range(self.n): total += self.do_chunk(k,yi) stopwatch('nAll in single process') return total def _do_chunk_wrapper(k_start,yi): return bd.do_chunk_of_chunks(k_start,yi) 产量 Initialized: 0.15 seconds Pool setup: 0.06 seconds Jobs queued (2 processes): 0.00 seconds First get() done: 6.56 seconds 83963796.0404 Jobs done: 0.55 seconds .. Pool setup: 0.08 seconds Jobs queued (3 processes): 0.00 seconds First get() done: 5.19 seconds 83963796.0404 Jobs done: 1.57 seconds ... All in single process: 12.13 seconds 与原始代码相比: Initialized: 0.10 seconds Pool setup: 0.03 seconds Jobs queued (2 processes): 0.00 seconds First get() done: 10.47 seconds Jobs done: 0.00 seconds .................................................. Pool setup: 0.12 seconds Jobs queued (3 processes): 0.00 seconds First get() done: 9.21 seconds Jobs done: 0.00 seconds .................................................. All in single process: 12.12 seconds (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |