python – 无法使用Pool 更改共享内存对象
发布时间:2020-12-20 13:14:46 所属栏目:Python 来源:网络整理
导读:参见英文答案 How to combine Pool.map with Array (shared memory) in Python multiprocessing?????????????????????????????????????4个 我正在玩python的 multiprocessing模块和共享内存.我能够使用Process的共享内存对象,但不能使用Pool.我为我的Pool添
参见英文答案 >
How to combine Pool.map with Array (shared memory) in Python multiprocessing?????????????????????????????????????4个
我正在玩python的 multiprocessing模块和共享内存.我能够使用Process的共享内存对象,但不能使用Pool.我为我的Pool添加了一个回调,并且似乎也没有调用回调. from multiprocessing import Array,Pool,Process def flip(x,a): a[x] = 0 if a[x] else 1 return (x,a[x]) def cb(result): print(result) if __name__ == '__main__': # size of array N = 10 # shared array - N bytes - unsynchronized - initialized to zeros a = Array('B',N,lock=False) # flip values to ones using Process processes = [Process(target=flip,args=(x,a)) for x in range(N)] for p in processes: p.start() for p in processes: p.join() print([a[i] for i in range(N)]) # flip values back to zeros using Pool pool = Pool(processes=4) for x in range(N): pool.apply_async(flip,a),callback=cb) pool.close() pool.join() print([a[i] for i in range(N)]) 我希望我的共享阵列能够以1的全部打印一次,然后是回调打印的单行和全部为0的数组,但是请改为使用它. [1,1,1] [1,1] 为什么Pool不运行任务? 取出共享内存,为了一个最小的例子; def f(x): return x def cb(result): print('cb',result) if __name__ == '__main__': pool = Pool(processes=4) pool.apply_async(f,range(10),callback=cb) pool.close() pool.join() 我希望这可以在单独的行上打印数字0到9,但它不会输出任何内容. 如果我用这个替换上面的apply_sync调用; pool.apply_async(f,args=[10],callback=cb) 我得到了输出 cb 10 用范围(10),[1,2,3],[(1),(2),(3)]或([1],[2],[3])替换[10]不产生输出. 解决方法
考虑使用多处理,数据通常非常大.为每个数据分配一个进程是没有意义的,就像对N个数组的N个进程一样.
考虑这两种方法: 1)每个进程将处理一个数组块.请参阅flip_many()和partition() 2)每个数据都映射到池工作者.见flip_one() 其余代码非常接近原始代码. from multiprocessing import Array,Process def flip_many(start_idx,end_idx): for idx in range(start_idx,end_idx + 1): a[idx] = not(a[idx]) def flip_one(idx): a[idx] = not(a[idx]) return idx,a[idx] def cb(result): print(result) def partition(range_,n): start,end = range_ size = (end - start) // n ranges = [] for _ in range(n): ranges.append((start,start+size-1)) start += size if ranges[-1][1] != end-1: ranges[-1] = (ranges[-1][0],end-1) return ranges if __name__ == '__main__': # size of array N = 10 N_procs = 2 ranges = partition( (0,N),N_procs ) # shared array - N bytes - unsynchronized - initialized to zeros a = Array('B',lock=False) print([a[i] for i in range(N)],"elements of array initialized to 0") # flip values to ones using Process processes = [] for i in range(N_procs): p = Process(target=flip_many,args=(*ranges[i],)) processes.append(p) p.start() for p in processes: p.join() print([a[i] for i in range(N)],"First flip by N processes,should be 1") # flip values back to zeros using Pool pool = Pool() indices = range(N) pool.map(flip_one,indices) print([a[i] for i in range(N)],"Second flip by the pool.map ... 0") pool.map(flip_one,indices,chunksize=N // N_procs) print([a[i] for i in range(N)],"Third flip by the pool.map ... 1") pool.map_async(flip_one,callback=cb) print([a[i] for i in range(N)],"Fourth flip by the pool.map_async ... 0") print(" Due to the async nature,flip not reflected until .join()") print(" But the callback returns the correct results:") pool.close() pool.join() print([a[i] for i in range(N)],"Content after the join... 0") (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |