加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

python – 无法使用Pool 更改共享内存对象

发布时间:2020-12-20 13:14:46 所属栏目:Python 来源:网络整理
导读:参见英文答案 How to combine Pool.map with Array (shared memory) in Python multiprocessing?????????????????????????????????????4个 我正在玩python的 multiprocessing模块和共享内存.我能够使用Process的共享内存对象,但不能使用Pool.我为我的Pool添
参见英文答案 > How to combine Pool.map with Array (shared memory) in Python multiprocessing?????????????????????????????????????4个
我正在玩python的 multiprocessing模块和共享内存.我能够使用Process的共享内存对象,但不能使用Pool.我为我的Pool添加了一个回调,并且似乎也没有调用回调.

from multiprocessing import Array,Pool,Process

def flip(x,a):
    a[x] = 0 if a[x] else 1
    return (x,a[x])

def cb(result):
    print(result)

if __name__ == '__main__':

    # size of array
    N = 10

    # shared array - N bytes - unsynchronized - initialized to zeros
    a = Array('B',N,lock=False)

    # flip values to ones using Process
    processes = [Process(target=flip,args=(x,a)) for x in range(N)]
    for p in processes: p.start()
    for p in processes: p.join()
    print([a[i] for i in range(N)])    

    # flip values back to zeros using Pool
    pool = Pool(processes=4)
    for x in range(N):
        pool.apply_async(flip,a),callback=cb)
    pool.close()
    pool.join()
    print([a[i] for i in range(N)])

我希望我的共享阵列能够以1的全部打印一次,然后是回调打印的单行和全部为0的数组,但是请改为使用它.

[1,1,1]
[1,1]

为什么Pool不运行任务?

取出共享内存,为了一个最小的例子;

def f(x):
    return x

def cb(result):
    print('cb',result)

if __name__ == '__main__':

    pool = Pool(processes=4)
    pool.apply_async(f,range(10),callback=cb)
    pool.close()
    pool.join()

我希望这可以在单独的行上打印数字0到9,但它不会输出任何内容.

如果我用这个替换上面的apply_sync调用;

pool.apply_async(f,args=[10],callback=cb)

我得到了输出

cb 10

用范围(10),[1,2,3],[(1),(2),(3)]或([1],[2],[3])替换[10]不产生输出.

解决方法

考虑使用多处理,数据通常非常大.为每个数据分配一个进程是没有意义的,就像对N个数组的N个进程一样.

考虑这两种方法:

1)每个进程将处理一个数组块.请参阅flip_many()和partition()

2)每个数据都映射到池工作者.见flip_one()

其余代码非常接近原始代码.

from multiprocessing import Array,Process

def flip_many(start_idx,end_idx):
    for idx in range(start_idx,end_idx + 1):
        a[idx] = not(a[idx])

def flip_one(idx):
    a[idx] = not(a[idx])
    return idx,a[idx]

def cb(result):
    print(result)

def partition(range_,n):
    start,end = range_
    size = (end - start) // n
    ranges = []
    for _ in range(n):
        ranges.append((start,start+size-1))
        start += size
    if ranges[-1][1] != end-1:
        ranges[-1] = (ranges[-1][0],end-1)
    return ranges    

if __name__ == '__main__':

    # size of array
    N = 10
    N_procs = 2
    ranges = partition( (0,N),N_procs )

    # shared array - N bytes - unsynchronized - initialized to zeros
    a = Array('B',lock=False)
    print([a[i] for i in range(N)],"elements of array initialized to 0")    

    # flip values to ones using Process

    processes = []
    for i in range(N_procs):
        p = Process(target=flip_many,args=(*ranges[i],))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print([a[i] for i in range(N)],"First flip by N processes,should be 1")    

    # flip values back to zeros using Pool
    pool = Pool()
    indices = range(N)
    pool.map(flip_one,indices)
    print([a[i] for i in range(N)],"Second flip by the pool.map ... 0")

    pool.map(flip_one,indices,chunksize=N // N_procs)
    print([a[i] for i in range(N)],"Third flip by the pool.map ... 1")

    pool.map_async(flip_one,callback=cb)
    print([a[i] for i in range(N)],"Fourth flip by the pool.map_async ... 0")
    print("    Due to the async nature,flip not reflected until .join()")
    print("    But the callback returns the correct results:")

    pool.close()
    pool.join()
    print([a[i] for i in range(N)],"Content after the join... 0")

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读