python多处理池如果值中止
我正在开发一个脚本,我随机创建对象,但我不想重复.它们被存储起来,每次我创建一个新的,我都会检查现有的.正如我想为大量对象做的那样,我现在正在尝试并行化它,但到目前为止还没有成功.我尝试了在网上找到的一些解决方案(主要在这里),但仍然无法正常工作.
我的想法是启动一个池并将我的功能映射到它.当进程找到匹配时,它将值设置为1.此值可由所有进程读取,它们可以使用锁写入它,我需要在最后返回.因此,我创建了一个Lock和一个Value,以便所有进程都可以读取该值(因此lock = False)并检查是否在另一个进程中找到了匹配项.然后我尝试了一些与事件不同的东西,并检查它是否已设置但是仍然无法工作……然后我尝试提出一个特殊的异常,但仍未成功使代码成功. 拜托,我更喜欢编写OOP,所以我会避免直到我的最后一个资源来定义一个全局变量,因为我认为它们是不确定的(个人意见). 这是一个MWE,我用int替换我的复杂对象,用范围(10000)替换我存储的对象以帮助你理解. #!/usr/bin/env python3 import multiprocessing as muproc def ParallelCheck(me): print(" Testing {}...".format(me)) #manager = muproc.Manager() #lock = manager.Lock() lock = muproc.Lock() back = muproc.Value("i",lock=False) ParChild = ParallelChild(me,lock,back) with muproc.Pool() as pool: try: pool.map(ParChild.run,range(10000)) except AbortPool: pool.terminate() print("pool") return back.value def Computation(me,neighbour): return me == neighbour class ParallelChild(object): def __init__(self,me,back): self.abort = muproc.Event() self.lock = lock self.me = me self.back = back def run(self,neighbour): print("run") if self.abort.is_set(): print("Aborting") pass else: if Computation(self.me,neighbour): self.lock.acquire() self.abort.set() self.back.value = 1 print("GOTCHA") self.lock.release() raise AbortPool else: print("...") class AbortPool(Exception): #pass def __init__(self): ## Just to check print("AbortPool raised!") if __name__ == "__main__": values = [12000,13,7] for v in values: print("value={} match={}".format(v,ParallelCheck(v))) 现在它产生一个RunTimeError: me@stigepc4$python3 mwe.py Testing 12000... Traceback (most recent call last): File "mwe.py",line 63,in <module> print("value={} match={}".format(v,ParallelCheck(v))) File "mwe.py",line 16,in ParallelCheck pool.map(ParChild.run,range(10000)) File "/usr/lib/python3.4/multiprocessing/pool.py",line 260,in map return self._map_async(func,iterable,mapstar,chunksize).get() File "/usr/lib/python3.4/multiprocessing/pool.py",line 599,in get raise self._value File "/usr/lib/python3.4/multiprocessing/pool.py",line 383,in _handle_tasks put(task) File "/usr/lib/python3.4/multiprocessing/connection.py",line 206,in send self._send_bytes(ForkingPickler.dumps(obj)) File "/usr/lib/python3.4/multiprocessing/reduction.py",line 50,in dumps cls(buf,protocol).dump(obj) File "/usr/lib/python3.4/multiprocessing/sharedctypes.py",line 128,in reduce_ctype assert_spawning(obj) File "/usr/lib/python3.4/multiprocessing/context.py",line 347,in assert_spawning ' through inheritance' % type(obj).__name__ RuntimeError: c_int objects should only be shared between processes through inheritance 我想它与Lock有关(虽然评论经理但是这不起作用)或者与Value有关但现在想法如何摆脱它… 编辑 当我继续尝试改变我的代码以我想要的方式工作时,我意识到我没有提到我的主要问题是什么.我真正的困难是如果找到匹配项,则停止池中的所有进程.这就是我所需要的,因此并行运行优于串行运行.现在我可以让一个事件告诉孩子是否已经找到匹配,但它会不断循环数据,即使我引发异常…… 编辑2 简单地说,我有以下…… for o in objects: if too_close(o,existing_objects): return 1 return 0 …我希望在CPU之间分配…… for o in objects: if too_close(o,some_existing_objects): return 1 and abort other processes return 0 解决方法
通过寻找答案,我的脚本太复杂了.
我试着从接近原始文档的东西开始 多处理模块. 然后没有成功我寻找一种方法来修复它并添加了一些东西. 我不是python多处理的专家,但经过一段时间的尝试, 但是我做这些事情的方式可能效率不高. 我试图产生较少的进程,但每个都有较少的数据和 我只想知道是否有匹配,因此我可以进一步简化. 现在回到主要过程,最后我只看一下这个事件 缺点是我必须声明multiprocessing.Event全局… 但正如bj0已经提到的那样,并行化这个问题可能不是更好…… 在实现这两种方法之后,我将它们与串行问题进行了比较,这是我的结果 >序列号:7s 所以没有什么比这更好了…我会坚持我的串行实现,并寻找其他方法来加速事情,像其他方法,而不是完全随机… 这是我的MWE的最后一个工作版本: #!/usr/bin/env python3 import multiprocessing as muproc def ParallelCheck(me): print(" Testing {}...".format(me)) global abort abort.clear() ParChild = ParallelChild(me) jobs = [] N = 4 for i in range(N): jobs.append(muproc.Process(target = ParChild.run,args=(range(i * 2500,(i+1) * 2500),))) for p in jobs: p.start() for p in jobs: p.join() if abort.is_set(): print("MATCH FOUND") return 1 else: print(" no match...") return 0 def Computation(me,neighbour): return me == neighbour class ParallelChild(object): def __init__(self,me): self.me = me def run(self,neighbours): global abort for neighbour in neighbours: print("{} vs {} by {}".format(self.me,neighbour,self.CurProc())) if abort.is_set(): print("Aborting {}".format(self.CurProc())) return 0 else: if Computation(self.me,neighbour): abort.set() print("GOTCHA {}".format(self.CurProc())) return 1 def CurProc(self): return muproc.current_process()._identity[0] if __name__ == "__main__": abort = muproc.Event() values = [12000,130,ParallelCheck(v))) (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |