加入收藏 | 设为首页 | 会员中心 | 我要投稿 李大同 (https://www.lidatong.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 编程开发 > Python > 正文

Python实现大文件排序的方法

发布时间:2020-12-16 22:09:59 所属栏目:Python 来源:网络整理
导读:本篇章节讲解Python实现大文件排序的方法。供大家参考研究。具体实现方法如下: import gzipimport osfrom multiprocessing import Process,Queue,Pipe,current_process,freeze_supportfrom datetime import datetimedef sort_worker(input,output):

本篇章节讲解Python实现大文件排序的方法。分享给大家供大家参考。具体实现方法如下:

import gzip
import os
from multiprocessing import Process,Queue,Pipe,current_process,freeze_support
from datetime import datetime
def sort_worker(input,output):
 while True:
  lines = input.get().splitlines()
  element_set = {}
  for line in lines:
    if line.strip() == 'STOP':
      return
    try:
      element = line.split(' ')[0]
      if not element_set.get(element): element_set[element] = ''
    except:
      pass
  sorted_element = sorted(element_set)
  #print sorted_element
  output.put('n'.join(sorted_element))
def write_worker(input,pre):
  os.system('mkdir %s'%pre)
  i = 0
  while True:
    content = input.get()
    if content.strip() == 'STOP':
      return
    write_sorted_bulk(content,'%s/%s'%(pre,i))
    i += 1
def write_sorted_bulk(content,filename):
  f = file(filename,'w')
  f.write(content)
  f.close()
def split_sort_file(filename,num_sort = 3,buf_size = 65536*64*4):
  t = datetime.now()
  pre,ext = os.path.splitext(filename)
  if ext == '.gz':
    file_file = gzip.open(filename,'rb')
  else:
    file_file = open(filename)
  bulk_queue = Queue(10)
  sorted_queue = Queue(10)
  NUM_SORT = num_sort
  sort_worker_pool = []
  for i in range(NUM_SORT):
    sort_worker_pool.append( Process(target=sort_worker,args=(bulk_queue,sorted_queue)) )
    sort_worker_pool[i].start()
  NUM_WRITE = 1
  write_worker_pool = []
  for i in range(NUM_WRITE):
    write_worker_pool.append( Process(target=write_worker,args=(sorted_queue,pre)) )
    write_worker_pool[i].start()
  buf = file_file.read(buf_size)
  sorted_count = 0
  while len(buf):
    end_line = buf.rfind('n')
    #print buf[:end_line+1]
    bulk_queue.put(buf[:end_line+1])
    sorted_count += 1
    if end_line != -1:
      buf = buf[end_line+1:] + file_file.read(buf_size)
    else:
      buf = file_file.read(buf_size)
  for i in range(NUM_SORT):
    bulk_queue.put('STOP')
  for i in range(NUM_SORT):
    sort_worker_pool[i].join()
   
  for i in range(NUM_WRITE):
    sorted_queue.put('STOP')
  for i in range(NUM_WRITE):
    write_worker_pool[i].join()
  print 'elasped ',datetime.now() - t
  return sorted_count
from heapq import heappush,heappop
from datetime import datetime
from multiprocessing import Process,freeze_support
import os
class file_heap:
  def __init__(self,dir,idx = 0,count = 1):
    files = os.listdir(dir)
    self.heap = []
    self.files = {}
    self.bulks = {}
    self.pre_element = None
    for i in range(len(files)):
      file = files[i]
      if hash(file) % count != idx: continue
      input = open(os.path.join(dir,file))
      self.files[i] = input
      self.bulks[i] = ''
      heappush(self.heap,(self.get_next_element_buffered(i),i))
  def get_next_element_buffered(self,i):
    if len(self.bulks[i]) < 256:
      if self.files[i] is not None:
        buf = self.files[i].read(65536)
        if buf:
          self.bulks[i] += buf
        else:
          self.files[i].close()
          self.files[i] = None
    end_line = self.bulks[i].find('n')
    if end_line == -1:
      end_line = len(self.bulks[i])
    element = self.bulks[i][:end_line]
    self.bulks[i] = self.bulks[i][end_line+1:]
    return element
  def poppush_uniq(self):
    while True:
      element = self.poppush()
      if element is None:
        return None
      if element != self.pre_element:
        self.pre_element = element
        return element
  def poppush(self):
    try:
      element,index = heappop(self.heap)
    except IndexError:
      return None
    new_element = self.get_next_element_buffered(index)
    if new_element:
      heappush(self.heap,(new_element,index))
    return element
def heappoppush(dir,queue,count = 1):
  heap = file_heap(dir,idx,count)
  while True:
    d = heap.poppush_uniq()
    queue.put(d)
    if d is None: return
def heappoppush2(dir,count = 1):
  heap = []
  procs = []
  queues = []
  pre_element = None
  for i in range(count):
    q = Queue(1024)
    q_buf = queue_buffer(q)
    queues.append(q_buf)
    p = Process(target=heappoppush,args=(dir,q_buf,i,count))
    procs.append(p)
    p.start()
  queues = tuple(queues)
  for i in range(count):
    heappush(heap,(queues[i].get(),i))
  while True:
    try:
      d,i= heappop(heap)
    except IndexError:
      queue.put(None)
      for p in procs:
        p.join()
      return
    else:
      if d is not None:
        heappush(heap,i))
        if d != pre_element:
          pre_element = d
          queue.put(d)
def merge_file(dir):
  heap = file_heap( dir )
  os.system('rm -f '+dir+'.merge')
  fmerge = open(dir+'.merge','a')
  element = heap.poppush_uniq()
  fmerge.write(element+'n')
  while element is not None:
    element = heap.poppush_uniq()
    fmerge.write(element+'n')
class queue_buffer:
  def __init__(self,queue):
    self.q = queue
    self.rbuf = []
    self.wbuf = []
  def get(self):
    if len(self.rbuf) == 0:
      self.rbuf = self.q.get()
    r = self.rbuf[0]
    del self.rbuf[0]
    return r
  def put(self,d):
    self.wbuf.append(d)
    if d is None or len(self.wbuf) > 1024:
      self.q.put(self.wbuf)
      self.wbuf = []
def diff_file(file_old,file_new,file_diff,buf = 268435456):
  print 'buffer size',buf
  from file_split import split_sort_file
  os.system('rm -rf '+ os.path.splitext(file_old)[0] )
  os.system('rm -rf '+ os.path.splitext(file_new)[0] )
  t = datetime.now()
  split_sort_file(file_old,5,buf)
  split_sort_file(file_new,buf)
  print 'split elasped ',datetime.now() - t
  os.system('cat %s/* | wc -l'%os.path.splitext(file_old)[0])
  os.system('cat %s/* | wc -l'%os.path.splitext(file_new)[0])
  os.system('rm -f '+file_diff)
  t = datetime.now()
  zdiff = open(file_diff,'a')
  old_q = Queue(1024)
  new_q = Queue(1024)
  old_queue = queue_buffer(old_q)
  new_queue = queue_buffer(new_q)
  h1 = Process(target=heappoppush2,args=(os.path.splitext(file_old)[0],old_queue,3))
  h2 = Process(target=heappoppush2,args=(os.path.splitext(file_new)[0],new_queue,3))
  h1.start(),h2.start()
  old = old_queue.get()
  new = new_queue.get()
  old_count,new_count = 0,0
  while old is not None or new is not None:
    if old > new or old is None:
      zdiff.write('< '+new+'n')
      new = new_queue.get()
      new_count +=1
    elif old < new or new is None:
      zdiff.write('> '+old+'n')
      old = old_queue.get()
      old_count +=1
    else:
      old = old_queue.get()
      new = new_queue.get()
  print 'new_count:',new_count
  print 'old_count:',old_count
  print 'diff elasped ',datetime.now() - t
  h1.join(),h2.join()

希望本文所述对大家的Python程序设计有所帮助。

(编辑:李大同)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章
      热点阅读