多线程Python FS Crawler
发布时间:2020-12-15 02:23:28 所属栏目:Java 来源:网络整理
导读:我编写了一个 python函数,它使用提供的目录模式搜索文件系统,并在每个级别提供可选的“操作”.然后我尝试多线程,因为有些卷在网络共享上,我想最小化IO阻塞.我开始使用多处理池类,因为这是最方便的…(严重的是,没有用于线程的Pool类?)我的函数尽可能地解析提
我编写了一个
python函数,它使用提供的目录模式搜索文件系统,并在每个级别提供可选的“操作”.然后我尝试多线程,因为有些卷在网络共享上,我想最小化IO阻塞.我开始使用多处理池类,因为这是最方便的…(严重的是,没有用于线程的Pool类?)我的函数尽可能地解析提供的FS模式并将新返回的路径提交到池直到没有新的路径被退回.当我直接使用函数和类时,我得到了很好的工作,但现在我试图从另一个类使用这个函数,我的程序似乎挂起.为了简化我使用Threads而不是Processes重写了函数,甚至编写了一个简单的ThreadPool类……同样的问题.这是一个非常简化的代码版本,仍然会出现同样的问题:
file test1.py: ------------------------------------------------ import os import glob from multiprocessing import Pool def mapGlob(pool,paths,pattern): results = [] paths = [os.path.join(p,pattern) for p in paths] for result in pool.map(glob.glob,paths): results += result return results def findAllMyPaths(): pool = Pool(10) paths = ['/Volumes'] follow = ['**','ptid_*','expid_*','slkid_*'] for pattern in follow: paths = mapGlob(pool,pattern) return paths file test2.py: ---------------------------------------------------------------------------- from test1 import findAllMyPaths allmypaths = findAllMyPaths() 现在,如果我打电话 >>>from test1 import findAllMyPaths >>>findAllMyPaths() >>>...long list of all the paths 这工作正常,但如果尝试: >>>from test2 import allmypaths python永远挂起.调用动作函数(在本例中为glob),但它们似乎永远不会返回… 如果我将映射函数更改为非并行版本: def mapGlob(pool,pattern) for p in paths] for path in paths: results += glob.glob(path) return results 一切顺利. 编辑: 我打开多处理调试,看看是否可以帮助我. [DEBUG/MainProcess] created semlock with handle 5 [DEBUG/MainProcess] created semlock with handle 6 [DEBUG/MainProcess] created semlock with handle 9 [DEBUG/MainProcess] created semlock with handle 10 [INFO/PoolWorker-1] child process calling self.run() [INFO/PoolWorker-2] child process calling self.run() [INFO/PoolWorker-3] child process calling self.run() [INFO/PoolWorker-5] child process calling self.run() [INFO/PoolWorker-4] child process calling self.run() [INFO/PoolWorker-6] child process calling self.run() [INFO/PoolWorker-7] child process calling self.run() [INFO/PoolWorker-9] child process calling self.run() [INFO/PoolWorker-8] child process calling self.run() [INFO/PoolWorker-10] child process calling self.run() [DEBUG/MainProcess] closing pool [SUBDEBUG/MainProcess] finalizer calling <bound method type._terminate_pool of <class 'multiprocessing.pool.Pool'>> with args (<Queue.Queue instance at 0x34af918>,<multiprocessing.queues.SimpleQueue object at 0x3494950>,<multiprocessing.queues.SimpleQueue object at 0x34a61b0>,[<Process(PoolWorker-1,started daemon)>,<Process(PoolWorker-2,<Process(PoolWorker-3,<Process(PoolWorker-4,<Process(PoolWorker-5,<Process(PoolWorker-6,<Process(PoolWorker-7,<Process(PoolWorker-8,<Process(PoolWorker-9,<Process(PoolWorker-10,started daemon)>],<Thread(Thread-1,started daemon -1341648896)>,<Thread(Thread-2,started daemon -1341116416)>,{}) and kwargs {} [DEBUG/MainProcess] finalizing pool [DEBUG/MainProcess] helping task handler/workers to finish [DEBUG/MainProcess] removing tasks from inqueue until task handler finished [DEBUG/MainProcess] task handler got sentinel [DEBUG/MainProcess] task handler sending sentinel to result handler [DEBUG/MainProcess] task handler sending sentinel to workers [DEBUG/MainProcess] task handler exiting [DEBUG/MainProcess] result handler got sentinel [DEBUG/MainProcess] ensuring that outqueue is not full [DEBUG/MainProcess] result handler exiting: len(cache)=0,thread._state=0 [DEBUG/PoolWorker-2] worker got sentinel -- exiting [DEBUG/PoolWorker-1] worker got sentinel -- exiting [INFO/PoolWorker-2] process shutting down [DEBUG/PoolWorker-7] worker got sentinel -- exiting [INFO/PoolWorker-1] process shutting down [INFO/PoolWorker-7] process shutting down [DEBUG/PoolWorker-7] running all "atexit" finalizers with priority >= 0 [DEBUG/PoolWorker-1] running all "atexit" finalizers with priority >= 0 [DEBUG/PoolWorker-7] running the remaining "atexit" finalizers [INFO/PoolWorker-7] process exiting with exitcode 0 [DEBUG/PoolWorker-1] running the remaining "atexit" finalizers [INFO/PoolWorker-1] process exiting with exitcode 0 [DEBUG/PoolWorker-5] worker got sentinel -- exiting [DEBUG/PoolWorker-2] running all "atexit" finalizers with priority >= 0 [INFO/PoolWorker-5] process shutting down [DEBUG/PoolWorker-5] running all "atexit" finalizers with priority >= 0 [DEBUG/PoolWorker-2] running the remaining "atexit" finalizers [DEBUG/PoolWorker-5] running the remaining "atexit" finalizers [INFO/PoolWorker-2] process exiting with exitcode 0 [INFO/PoolWorker-5] process exiting with exitcode 0 [DEBUG/PoolWorker-6] worker got sentinel -- exiting [INFO/PoolWorker-6] process shutting down [DEBUG/PoolWorker-6] running all "atexit" finalizers with priority >= 0 [DEBUG/PoolWorker-6] running the remaining "atexit" finalizers [INFO/PoolWorker-6] process exiting with exitcode 0 [DEBUG/PoolWorker-4] worker got sentinel -- exiting [DEBUG/PoolWorker-9] worker got sentinel -- exiting [INFO/PoolWorker-9] process shutting down [DEBUG/PoolWorker-9] running all "atexit" finalizers with priority >= 0 [DEBUG/PoolWorker-9] running the remaining "atexit" finalizers [INFO/PoolWorker-9] process exiting with exitcode 0 [INFO/PoolWorker-4] process shutting down [DEBUG/PoolWorker-4] running all "atexit" finalizers with priority >= 0 [DEBUG/PoolWorker-4] running the remaining "atexit" finalizers [INFO/PoolWorker-4] process exiting with exitcode 0 [DEBUG/PoolWorker-10] worker got sentinel -- exiting [INFO/PoolWorker-10] process shutting down [DEBUG/PoolWorker-10] running all "atexit" finalizers with priority >= 0 [DEBUG/PoolWorker-10] running the remaining "atexit" finalizers [INFO/PoolWorker-10] process exiting with exitcode 0 [DEBUG/PoolWorker-8] worker got sentinel -- exiting [INFO/PoolWorker-8] process shutting down [DEBUG/PoolWorker-8] running all "atexit" finalizers with priority >= 0 [DEBUG/PoolWorker-8] running the remaining "atexit" finalizers [INFO/PoolWorker-8] process exiting with exitcode 0 [DEBUG/PoolWorker-3] worker got sentinel -- exiting [INFO/PoolWorker-3] process shutting down [DEBUG/PoolWorker-3] running all "atexit" finalizers with priority >= 0 [DEBUG/PoolWorker-3] running the remaining "atexit" finalizers [INFO/PoolWorker-3] process exiting with exitcode 0 [DEBUG/MainProcess] terminating workers [DEBUG/MainProcess] joining task handler [DEBUG/MainProcess] joining result handler [DEBUG/MainProcess] joining pool workers 当它不是我得到的全部是: [DEBUG/MainProcess] created semlock with handle 6 [DEBUG/MainProcess] created semlock with handle 7 [DEBUG/MainProcess] created semlock with handle 10 [DEBUG/MainProcess] created semlock with handle 11 [INFO/PoolWorker-1] child process calling self.run() [INFO/PoolWorker-2] child process calling self.run() [INFO/PoolWorker-3] child process calling self.run() [INFO/PoolWorker-8] child process calling self.run() [INFO/PoolWorker-5] child process calling self.run() [INFO/PoolWorker-4] child process calling self.run() [INFO/PoolWorker-9] child process calling self.run() [INFO/PoolWorker-6] child process calling self.run() [INFO/PoolWorker-7] child process calling self.run() [INFO/PoolWorker-10] child process calling self.run() 解决方法
不是一个完整的解决方案,但我找到了一种方法,使代码能够以任何形式运行:从解释器或作为运行脚本中的代码.我认为问题与多处理文档中的以下注释有关:
此包中的功能要求主方法可由子进程导入.这在编程指南中有所涉及,但值得在此指出.这意味着某些示例(例如multiprocessing.Pool示例)在交互式解释器中不起作用. 我不确定为什么存在这种限制,为什么我仍然有时可以使用交互式解释器中的池,有时候不会,但是哦…… 为了解决这个问题,我在任何可能使用多处理的模块中执行以下操作: import __main__ __SHOULD_MULTITHREAD__ = False if hasattr(__main__,'__file__'): __SHOULD_MULTITHREAD__ = True 然后,该模块中的其余代码可以检查此标志以查看它是应该使用池还是仅执行而不进行并行化.这样做,我仍然可以在交互式解释器中使用和测试模块中的并行化函数,它们的运行速度要慢得多. (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |