加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

使用python的多处理池和地图功能进行测量

发布时间:2020-12-20 12:08:55 所属栏目:Python 来源:网络整理
导读:以下代码我用于并行csv处理: #!/usr/bin/env pythonimport csvfrom time import sleepfrom multiprocessing import Poolfrom multiprocessing import cpu_countfrom multiprocessing import current_processfrom pprint import pprint as ppdef init_worker
以下代码我用于并行csv处理:

#!/usr/bin/env python

import csv
from time import sleep
from multiprocessing import Pool
from multiprocessing import cpu_count
from multiprocessing import current_process
from pprint import pprint as pp

def init_worker(x):
  sleep(.5)
  print "(%s,%s)" % (x[0],x[1])
  x.append(int(x[0])**2)
  return x

def parallel_csv_processing(inputFile,outputFile,header=["Default","header","please","change"],separator=",",skipRows = 0,cpuCount = 1):
  # OPEN FH FOR READING INPUT FILE
  inputFH   = open(inputFile,"rt")
  csvReader = csv.reader(inputFH,delimiter=separator)

  # SKIP HEADERS
  for skip in xrange(skipRows):
    csvReader.next()

  # PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE
  try:
    p = Pool(processes = cpuCount)
    results = p.map(init_worker,csvReader,chunksize = 10)
    p.close()
    p.join()
  except KeyboardInterrupt:
    p.close()
    p.join()
    p.terminate()

  # CLOSE FH FOR READING INPUT
  inputFH.close()

  # OPEN FH FOR WRITING OUTPUT FILE
  outputFH  = open(outputFile,"wt")
  csvWriter = csv.writer(outputFH,lineterminator='n')

  # WRITE HEADER TO OUTPUT FILE
  csvWriter.writerow(header)

  # WRITE RESULTS TO OUTPUT FILE
  [csvWriter.writerow(row) for row in results]

  # CLOSE FH FOR WRITING OUTPUT
  outputFH.close()

  print pp(results)
  # print len(results)

def main():
  inputFile  = "input.csv"
  outputFile = "output.csv"
  parallel_csv_processing(inputFile,cpuCount = cpu_count())

if __name__ == '__main__':
  main()

我想以某种方式衡量脚本的进度(只是纯文本而不是任何奇特的ASCII艺术).我想到的一个选项是将init_worker成功处理的行与input.csv中的所有行进行比较,并打印实际状态,例如:每一秒,你能指点我正确的解决方案吗?我发现有几篇文章有类似的问题,但我无法根据我的需要调整它,因为它们都没有使用Pool类和map方法.我还想问一下p.close(),p.join(),p.terminate()方法,我已经看到它们主要是使用Process not Pool类,它们是否需要Pool类并且我正确使用它们?使用p.terminate()意图用ctrl c来杀死进程,但这是different故事还没有结束.谢谢.

PS:我的input.csv看起来像这样,如果重要的话:

0,0
1,3
2,6
3,9
...
...
48,144
49,147

PPS:正如我所说,我是多处理的新手,而且我把它放在一起的代码才有效.我可以看到的一个缺点是整个csv存储在内存中,所以如果你们有更好的想法,请不要犹豫,分享它.

编辑

回复@ J.F.Sebastian

以下是基于您的建议的实际代码:

#!/usr/bin/env python

import csv
from time import sleep
from multiprocessing import Pool
from multiprocessing import cpu_count
from multiprocessing import current_process
from pprint import pprint as pp
from tqdm import tqdm

def do_job(x):
  sleep(.5)
  # print "(%s,cpuCount = 1):

  # OPEN FH FOR READING INPUT FILE
  inputFH   = open(inputFile,"rb")
  csvReader = csv.reader(inputFH,delimiter=separator)

  # SKIP HEADERS
  for skip in xrange(skipRows):
    csvReader.next()

  # OPEN FH FOR WRITING OUTPUT FILE
  outputFH  = open(outputFile,lineterminator='n')

  # WRITE HEADER TO OUTPUT FILE
  csvWriter.writerow(header)

  # PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE
  try:
    p = Pool(processes = cpuCount)
    # results = p.map(do_job,chunksize = 10)
    for result in tqdm(p.imap_unordered(do_job,chunksize=10)):
      csvWriter.writerow(result)
    p.close()
    p.join()
  except KeyboardInterrupt:
    p.close()
    p.join()

  # CLOSE FH FOR READING INPUT
  inputFH.close()

  # CLOSE FH FOR WRITING OUTPUT
  outputFH.close()

  print pp(result)
  # print len(result)

def main():
  inputFile  = "input.csv"
  outputFile = "output.csv"
  parallel_csv_processing(inputFile,cpuCount = cpu_count())

if __name__ == '__main__':
  main()

这是tqdm的输出:

1 [elapsed: 00:05,0.20 iters/sec]

这个输出是什么意思?在您引用的页面上,tqdm以循环方式使用:

>>> import time
>>> from tqdm import tqdm
>>> for i in tqdm(range(100)):
...     time.sleep(1)
... 
|###-------| 35/100  35% [elapsed: 00:35 left: 01:05,1.00 iters/sec]

这个输出很有意义,但我的输出是什么意思?此外它似乎没有修复ctrl c问题:点击ctrl c脚本后抛出一些Traceback,如果我再次点击ctrl c然后我得到新的Traceback等等.杀死它的唯一方法是将其发送到后台(ctr z)然后杀死它(杀死%1)

解决方法

要显示进度,请将pool.map替换为pool.imap_unordered:

from tqdm import tqdm # $pip install tqdm

for result in tqdm(pool.imap_unordered(init_worker,chunksize=10)):
    csvWriter.writerow(result)

tqdm部分是可选的,见Text Progress Bar in the Console

无意中,它修复了“整个csv存储在内存中”和“KeyboardInterrupt未引发”问题.

这是一个完整的代码示例:

#!/usr/bin/env python
import itertools
import logging
import multiprocessing
import time

def compute(i):
    time.sleep(.5)
    return i**2

if __name__ == "__main__":
    logging.basicConfig(format="%(asctime)-15s %(levelname)s %(message)s",datefmt="%F %T",level=logging.DEBUG)
    pool = multiprocessing.Pool()
    try:
        for square in pool.imap_unordered(compute,itertools.count(),chunksize=10):
            logging.debug(square) # report progress by printing the result
    except KeyboardInterrupt:
        logging.warning("got Ctrl+C")
    finally:
        pool.terminate()
        pool.join()

您应该每隔.5 * chunksize秒批量查看输出.如果按Ctrl C;你应该看到在子进程和主进程中引发的KeyboardInterrupt.在Python 3中,主进程立即退出.在Python 2中,KeyboardInterrupt被延迟,直到应该打印下一个批处理(Python中的错误).

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读