加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

Python:使用多个核心的流程文件

发布时间:2020-12-16 22:46:34 所属栏目:Python 来源:网络整理
导读:我目前正在尝试读取一个大文件(8000万行),我需要为每个条目进行计算密集型矩阵乘法.计算完之后,我想将结果插入数据库.由于此过程采用时间密集的方式,我希望将文件拆分为多个核心以加快进程. 在研究之后,我发现了这个有希望的尝试,它将文件分成n个部分. def f

我目前正在尝试读取一个大文件(8000万行),我需要为每个条目进行计算密集型矩阵乘法.计算完之后,我想将结果插入数据库.由于此过程采用时间密集的方式,我希望将文件拆分为多个核心以加快进程.

在研究之后,我发现了这个有希望的尝试,它将文件分成n个部分.

def file_block(fp,number_of_blocks,block):
    '''
    A generator that splits a file into blocks and iterates
    over the lines of one of the blocks.

    '''

    assert 0 <= block and block < number_of_blocks
    assert 0 < number_of_blocks

    fp.seek(0,2)
    file_size = fp.tell()

    ini = file_size * block / number_of_blocks
    end = file_size * (1 + block) / number_of_blocks

    if ini <= 0:
        fp.seek(0)
    else:
        fp.seek(ini-1)
        fp.readline()

    while fp.tell() < end:
        yield fp.readline()

迭代地,您可以像这样调用函数:

if __name__ == '__main__':
    fp = open(filename)
    number_of_chunks = 4
    for chunk_number in range(number_of_chunks):
        print chunk_number,100 * '='
        for line in file_block(fp,number_of_chunks,chunk_number):
            process(line)

虽然这有效,但我遇到了问题,使用多处理并行化:

fp = open(filename)
number_of_chunks = 4
li = [file_block(fp,chunk_number) for chunk_number in range(number_of_chunks)]

p = Pool(cpu_count() - 1)
p.map(processChunk,li)

由于错误,生成器无法被腌制.

虽然我理解这个错误,但是首先迭代整个文件以将所有行放入列表中是太昂贵了.

此外,我希望每次迭代使用每个核心的行块,因为一次将多行插入数据库更有效(如果使用典型的映射方法,则不是1乘1)

谢谢你的帮助.

最佳答案
不是预先创建生成器并将它们传递到每个线程,而是将其留给线程代码.

def processChunk(params):
    filename,chunk_number,number_of_chunks = params
    with open(filename,'r') as fp:
        for line in file_block(fp,chunk_number):
            process(line)

li = [(filename,i,number_of_chunks) for i in range(number_of_chunks)]
p.map(processChunk,li)

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读