使用python的多处理池和地图功能进行测量
以下代码我用于并行csv处理:
#!/usr/bin/env python import csv from time import sleep from multiprocessing import Pool from multiprocessing import cpu_count from multiprocessing import current_process from pprint import pprint as pp def init_worker(x): sleep(.5) print "(%s,%s)" % (x[0],x[1]) x.append(int(x[0])**2) return x def parallel_csv_processing(inputFile,outputFile,header=["Default","header","please","change"],separator=",",skipRows = 0,cpuCount = 1): # OPEN FH FOR READING INPUT FILE inputFH = open(inputFile,"rt") csvReader = csv.reader(inputFH,delimiter=separator) # SKIP HEADERS for skip in xrange(skipRows): csvReader.next() # PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE try: p = Pool(processes = cpuCount) results = p.map(init_worker,csvReader,chunksize = 10) p.close() p.join() except KeyboardInterrupt: p.close() p.join() p.terminate() # CLOSE FH FOR READING INPUT inputFH.close() # OPEN FH FOR WRITING OUTPUT FILE outputFH = open(outputFile,"wt") csvWriter = csv.writer(outputFH,lineterminator='n') # WRITE HEADER TO OUTPUT FILE csvWriter.writerow(header) # WRITE RESULTS TO OUTPUT FILE [csvWriter.writerow(row) for row in results] # CLOSE FH FOR WRITING OUTPUT outputFH.close() print pp(results) # print len(results) def main(): inputFile = "input.csv" outputFile = "output.csv" parallel_csv_processing(inputFile,cpuCount = cpu_count()) if __name__ == '__main__': main() 我想以某种方式衡量脚本的进度(只是纯文本而不是任何奇特的ASCII艺术).我想到的一个选项是将init_worker成功处理的行与input.csv中的所有行进行比较,并打印实际状态,例如:每一秒,你能指点我正确的解决方案吗?我发现有几篇文章有类似的问题,但我无法根据我的需要调整它,因为它们都没有使用Pool类和map方法.我还想问一下p.close(),p.join(),p.terminate()方法,我已经看到它们主要是使用Process not Pool类,它们是否需要Pool类并且我正确使用它们?使用p.terminate()意图用ctrl c来杀死进程,但这是different故事还没有结束.谢谢. PS:我的input.csv看起来像这样,如果重要的话: 0,0 1,3 2,6 3,9 ... ... 48,144 49,147 PPS:正如我所说,我是多处理的新手,而且我把它放在一起的代码才有效.我可以看到的一个缺点是整个csv存储在内存中,所以如果你们有更好的想法,请不要犹豫,分享它. 编辑 回复@ J.F.Sebastian 以下是基于您的建议的实际代码: #!/usr/bin/env python import csv from time import sleep from multiprocessing import Pool from multiprocessing import cpu_count from multiprocessing import current_process from pprint import pprint as pp from tqdm import tqdm def do_job(x): sleep(.5) # print "(%s,cpuCount = 1): # OPEN FH FOR READING INPUT FILE inputFH = open(inputFile,"rb") csvReader = csv.reader(inputFH,delimiter=separator) # SKIP HEADERS for skip in xrange(skipRows): csvReader.next() # OPEN FH FOR WRITING OUTPUT FILE outputFH = open(outputFile,lineterminator='n') # WRITE HEADER TO OUTPUT FILE csvWriter.writerow(header) # PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE try: p = Pool(processes = cpuCount) # results = p.map(do_job,chunksize = 10) for result in tqdm(p.imap_unordered(do_job,chunksize=10)): csvWriter.writerow(result) p.close() p.join() except KeyboardInterrupt: p.close() p.join() # CLOSE FH FOR READING INPUT inputFH.close() # CLOSE FH FOR WRITING OUTPUT outputFH.close() print pp(result) # print len(result) def main(): inputFile = "input.csv" outputFile = "output.csv" parallel_csv_processing(inputFile,cpuCount = cpu_count()) if __name__ == '__main__': main() 这是tqdm的输出: 1 [elapsed: 00:05,0.20 iters/sec] 这个输出是什么意思?在您引用的页面上,tqdm以循环方式使用: >>> import time >>> from tqdm import tqdm >>> for i in tqdm(range(100)): ... time.sleep(1) ... |###-------| 35/100 35% [elapsed: 00:35 left: 01:05,1.00 iters/sec] 这个输出很有意义,但我的输出是什么意思?此外它似乎没有修复ctrl c问题:点击ctrl c脚本后抛出一些Traceback,如果我再次点击ctrl c然后我得到新的Traceback等等.杀死它的唯一方法是将其发送到后台(ctr z)然后杀死它(杀死%1) 解决方法
要显示进度,请将pool.map替换为pool.imap_unordered:
from tqdm import tqdm # $pip install tqdm for result in tqdm(pool.imap_unordered(init_worker,chunksize=10)): csvWriter.writerow(result)
无意中,它修复了“整个csv存储在内存中”和“KeyboardInterrupt未引发”问题. 这是一个完整的代码示例: #!/usr/bin/env python import itertools import logging import multiprocessing import time def compute(i): time.sleep(.5) return i**2 if __name__ == "__main__": logging.basicConfig(format="%(asctime)-15s %(levelname)s %(message)s",datefmt="%F %T",level=logging.DEBUG) pool = multiprocessing.Pool() try: for square in pool.imap_unordered(compute,itertools.count(),chunksize=10): logging.debug(square) # report progress by printing the result except KeyboardInterrupt: logging.warning("got Ctrl+C") finally: pool.terminate() pool.join() 您应该每隔.5 * chunksize秒批量查看输出.如果按Ctrl C;你应该看到在子进程和主进程中引发的KeyboardInterrupt.在Python 3中,主进程立即退出.在Python 2中,KeyboardInterrupt被延迟,直到应该打印下一个批处理(Python中的错误). (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |