Python实现大文件排序的方法
发布时间:2020-12-16 22:09:59 所属栏目:Python 来源:网络整理
导读:本篇章节讲解Python实现大文件排序的方法。供大家参考研究。具体实现方法如下: import gzipimport osfrom multiprocessing import Process,Queue,Pipe,current_process,freeze_supportfrom datetime import datetimedef sort_worker(input,output):
本篇章节讲解Python实现大文件排序的方法。分享给大家供大家参考。具体实现方法如下: import gzip import os from multiprocessing import Process,Queue,Pipe,current_process,freeze_support from datetime import datetime def sort_worker(input,output): while True: lines = input.get().splitlines() element_set = {} for line in lines: if line.strip() == 'STOP': return try: element = line.split(' ')[0] if not element_set.get(element): element_set[element] = '' except: pass sorted_element = sorted(element_set) #print sorted_element output.put('n'.join(sorted_element)) def write_worker(input,pre): os.system('mkdir %s'%pre) i = 0 while True: content = input.get() if content.strip() == 'STOP': return write_sorted_bulk(content,'%s/%s'%(pre,i)) i += 1 def write_sorted_bulk(content,filename): f = file(filename,'w') f.write(content) f.close() def split_sort_file(filename,num_sort = 3,buf_size = 65536*64*4): t = datetime.now() pre,ext = os.path.splitext(filename) if ext == '.gz': file_file = gzip.open(filename,'rb') else: file_file = open(filename) bulk_queue = Queue(10) sorted_queue = Queue(10) NUM_SORT = num_sort sort_worker_pool = [] for i in range(NUM_SORT): sort_worker_pool.append( Process(target=sort_worker,args=(bulk_queue,sorted_queue)) ) sort_worker_pool[i].start() NUM_WRITE = 1 write_worker_pool = [] for i in range(NUM_WRITE): write_worker_pool.append( Process(target=write_worker,args=(sorted_queue,pre)) ) write_worker_pool[i].start() buf = file_file.read(buf_size) sorted_count = 0 while len(buf): end_line = buf.rfind('n') #print buf[:end_line+1] bulk_queue.put(buf[:end_line+1]) sorted_count += 1 if end_line != -1: buf = buf[end_line+1:] + file_file.read(buf_size) else: buf = file_file.read(buf_size) for i in range(NUM_SORT): bulk_queue.put('STOP') for i in range(NUM_SORT): sort_worker_pool[i].join() for i in range(NUM_WRITE): sorted_queue.put('STOP') for i in range(NUM_WRITE): write_worker_pool[i].join() print 'elasped ',datetime.now() - t return sorted_count from heapq import heappush,heappop from datetime import datetime from multiprocessing import Process,freeze_support import os class file_heap: def __init__(self,dir,idx = 0,count = 1): files = os.listdir(dir) self.heap = [] self.files = {} self.bulks = {} self.pre_element = None for i in range(len(files)): file = files[i] if hash(file) % count != idx: continue input = open(os.path.join(dir,file)) self.files[i] = input self.bulks[i] = '' heappush(self.heap,(self.get_next_element_buffered(i),i)) def get_next_element_buffered(self,i): if len(self.bulks[i]) < 256: if self.files[i] is not None: buf = self.files[i].read(65536) if buf: self.bulks[i] += buf else: self.files[i].close() self.files[i] = None end_line = self.bulks[i].find('n') if end_line == -1: end_line = len(self.bulks[i]) element = self.bulks[i][:end_line] self.bulks[i] = self.bulks[i][end_line+1:] return element def poppush_uniq(self): while True: element = self.poppush() if element is None: return None if element != self.pre_element: self.pre_element = element return element def poppush(self): try: element,index = heappop(self.heap) except IndexError: return None new_element = self.get_next_element_buffered(index) if new_element: heappush(self.heap,(new_element,index)) return element def heappoppush(dir,queue,count = 1): heap = file_heap(dir,idx,count) while True: d = heap.poppush_uniq() queue.put(d) if d is None: return def heappoppush2(dir,count = 1): heap = [] procs = [] queues = [] pre_element = None for i in range(count): q = Queue(1024) q_buf = queue_buffer(q) queues.append(q_buf) p = Process(target=heappoppush,args=(dir,q_buf,i,count)) procs.append(p) p.start() queues = tuple(queues) for i in range(count): heappush(heap,(queues[i].get(),i)) while True: try: d,i= heappop(heap) except IndexError: queue.put(None) for p in procs: p.join() return else: if d is not None: heappush(heap,i)) if d != pre_element: pre_element = d queue.put(d) def merge_file(dir): heap = file_heap( dir ) os.system('rm -f '+dir+'.merge') fmerge = open(dir+'.merge','a') element = heap.poppush_uniq() fmerge.write(element+'n') while element is not None: element = heap.poppush_uniq() fmerge.write(element+'n') class queue_buffer: def __init__(self,queue): self.q = queue self.rbuf = [] self.wbuf = [] def get(self): if len(self.rbuf) == 0: self.rbuf = self.q.get() r = self.rbuf[0] del self.rbuf[0] return r def put(self,d): self.wbuf.append(d) if d is None or len(self.wbuf) > 1024: self.q.put(self.wbuf) self.wbuf = [] def diff_file(file_old,file_new,file_diff,buf = 268435456): print 'buffer size',buf from file_split import split_sort_file os.system('rm -rf '+ os.path.splitext(file_old)[0] ) os.system('rm -rf '+ os.path.splitext(file_new)[0] ) t = datetime.now() split_sort_file(file_old,5,buf) split_sort_file(file_new,buf) print 'split elasped ',datetime.now() - t os.system('cat %s/* | wc -l'%os.path.splitext(file_old)[0]) os.system('cat %s/* | wc -l'%os.path.splitext(file_new)[0]) os.system('rm -f '+file_diff) t = datetime.now() zdiff = open(file_diff,'a') old_q = Queue(1024) new_q = Queue(1024) old_queue = queue_buffer(old_q) new_queue = queue_buffer(new_q) h1 = Process(target=heappoppush2,args=(os.path.splitext(file_old)[0],old_queue,3)) h2 = Process(target=heappoppush2,args=(os.path.splitext(file_new)[0],new_queue,3)) h1.start(),h2.start() old = old_queue.get() new = new_queue.get() old_count,new_count = 0,0 while old is not None or new is not None: if old > new or old is None: zdiff.write('< '+new+'n') new = new_queue.get() new_count +=1 elif old < new or new is None: zdiff.write('> '+old+'n') old = old_queue.get() old_count +=1 else: old = old_queue.get() new = new_queue.get() print 'new_count:',new_count print 'old_count:',old_count print 'diff elasped ',datetime.now() - t h1.join(),h2.join() 希望本文所述对大家的Python程序设计有所帮助。 (编辑:李大同) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |