Python多处理工作者/队列
发布时间:2020-12-20 12:24:11 所属栏目:Python 来源:网络整理
导读:我有一个 python函数,总共运行12次.我目前设置这个设置来使用多处理库中的池来并行运行所有这些池.通常我一次运行6,因为该功能是CPU密集型的并且并行运行12经常导致程序崩溃.当我们一次做6时,第二组6将不会开始,直到所有前6个过程完成.理想情况下,我们希望另
我有一个
python函数,总共运行12次.我目前设置这个设置来使用多处理库中的池来并行运行所有这些池.通常我一次运行6,因为该功能是CPU密集型的并且并行运行12经常导致程序崩溃.当我们一次做6时,第二组6将不会开始,直到所有前6个过程完成.理想情况下,我们希望另一个(例如第7个)一旦从最初的6个批次中的一个完成就开始 – 所以6个正在同时运行,而有更多的开始.现在代码看起来像这样(它将被调用两次,将前6个元素传递到一个列表中,然后将第二个6传递给另一个列表:
from multiprocessing import Pool def start_pool(project_list): pool = Pool(processes=6) pool.map(run_assignments_parallel,project_list[0:6]) 所以我一直在尝试实现一个worker / queue解决方案并遇到了一些问题.我有一个看起来像这样的worker函数: def worker(work_queue,done_queue): try: for proj in iter(work_queue.get,'STOP'): print proj run_assignments_parallel(proj) done_queue.put('finished ' + proj ) except Exception,e: done_queue.put("%s failed on %s with: %s" % (current_process().name,proj,e.message)) return True 调用worker函数的代码如下: workers = 6 work_queue = Queue() done_queue = Queue() processes = [] for project in project_list: print project work_queue.put(project) for w in xrange(workers): p = Process(target=worker,args=(work_queue,done_queue)) p.start() processes.append(p) work_queue.put('STOP') for p in processes: p.join() done_queue.put('STOP') for status in iter(done_queue.get,'STOP'): print status project_list只是需要在函数“run_assignments_parallel”中运行的12个项目的路径列表. 现在编写这个函数的方法是,对于同一个进程(项目),函数被多次调用,我真的不知道发生了什么.这段代码基于我找到的一个例子,我很确定循环结构搞砸了.任何帮助都会很棒,我对此事无知感到抱歉.谢谢! 解决方法
您需要更改的是传递所有12个输入参数而不是6: from multiprocessing import Pool pool = Pool(processes=6) # run no more than 6 at a time pool.map(run_assignments_parallel,project_list) # pass full list (12 items) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |