python – 无法使用Pool 更改共享内存对象
发布时间:2020-12-20 13:14:46 所属栏目:Python 来源:网络整理
导读:参见英文答案 How to combine Pool.map with Array (shared memory) in Python multiprocessing?????????????????????????????????????4个 我正在玩python的 multiprocessing模块和共享内存.我能够使用Process的共享内存对象,但不能使用Pool.我为我的Pool添
|
参见英文答案 >
How to combine Pool.map with Array (shared memory) in Python multiprocessing?????????????????????????????????????4个
我正在玩python的 multiprocessing模块和共享内存.我能够使用Process的共享内存对象,但不能使用Pool.我为我的Pool添加了一个回调,并且似乎也没有调用回调. from multiprocessing import Array,Pool,Process
def flip(x,a):
a[x] = 0 if a[x] else 1
return (x,a[x])
def cb(result):
print(result)
if __name__ == '__main__':
# size of array
N = 10
# shared array - N bytes - unsynchronized - initialized to zeros
a = Array('B',N,lock=False)
# flip values to ones using Process
processes = [Process(target=flip,args=(x,a)) for x in range(N)]
for p in processes: p.start()
for p in processes: p.join()
print([a[i] for i in range(N)])
# flip values back to zeros using Pool
pool = Pool(processes=4)
for x in range(N):
pool.apply_async(flip,a),callback=cb)
pool.close()
pool.join()
print([a[i] for i in range(N)])
我希望我的共享阵列能够以1的全部打印一次,然后是回调打印的单行和全部为0的数组,但是请改为使用它. [1,1,1] [1,1] 为什么Pool不运行任务? 取出共享内存,为了一个最小的例子; def f(x):
return x
def cb(result):
print('cb',result)
if __name__ == '__main__':
pool = Pool(processes=4)
pool.apply_async(f,range(10),callback=cb)
pool.close()
pool.join()
我希望这可以在单独的行上打印数字0到9,但它不会输出任何内容. 如果我用这个替换上面的apply_sync调用; pool.apply_async(f,args=[10],callback=cb) 我得到了输出 cb 10 用范围(10),[1,2,3],[(1),(2),(3)]或([1],[2],[3])替换[10]不产生输出. 解决方法
考虑使用多处理,数据通常非常大.为每个数据分配一个进程是没有意义的,就像对N个数组的N个进程一样.
考虑这两种方法: 1)每个进程将处理一个数组块.请参阅flip_many()和partition() 2)每个数据都映射到池工作者.见flip_one() 其余代码非常接近原始代码. from multiprocessing import Array,Process
def flip_many(start_idx,end_idx):
for idx in range(start_idx,end_idx + 1):
a[idx] = not(a[idx])
def flip_one(idx):
a[idx] = not(a[idx])
return idx,a[idx]
def cb(result):
print(result)
def partition(range_,n):
start,end = range_
size = (end - start) // n
ranges = []
for _ in range(n):
ranges.append((start,start+size-1))
start += size
if ranges[-1][1] != end-1:
ranges[-1] = (ranges[-1][0],end-1)
return ranges
if __name__ == '__main__':
# size of array
N = 10
N_procs = 2
ranges = partition( (0,N),N_procs )
# shared array - N bytes - unsynchronized - initialized to zeros
a = Array('B',lock=False)
print([a[i] for i in range(N)],"elements of array initialized to 0")
# flip values to ones using Process
processes = []
for i in range(N_procs):
p = Process(target=flip_many,args=(*ranges[i],))
processes.append(p)
p.start()
for p in processes:
p.join()
print([a[i] for i in range(N)],"First flip by N processes,should be 1")
# flip values back to zeros using Pool
pool = Pool()
indices = range(N)
pool.map(flip_one,indices)
print([a[i] for i in range(N)],"Second flip by the pool.map ... 0")
pool.map(flip_one,indices,chunksize=N // N_procs)
print([a[i] for i in range(N)],"Third flip by the pool.map ... 1")
pool.map_async(flip_one,callback=cb)
print([a[i] for i in range(N)],"Fourth flip by the pool.map_async ... 0")
print(" Due to the async nature,flip not reflected until .join()")
print(" But the callback returns the correct results:")
pool.close()
pool.join()
print([a[i] for i in range(N)],"Content after the join... 0")
(编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
