python实现大文件排序的方法

本文实例讲述了python实现大文件排序的方法。分享给大家供大家参考。具体实现方法如下:

import gzip
import os
from multiprocessing import process, queue, pipe, current_process, freeze_support
from datetime import datetime
def sort_worker(input,output):
while true:
lines = input.get().splitlines()
element_set = {}
for line in lines:
if line.strip() == ‘stop’:
return
try:
element = line.split(‘ ‘)[0]
if not element_set.get(element): element_set[element] = ”
except:
pass
sorted_element = sorted(element_set)
#print sorted_element
output.put(‘\n’.join(sorted_element))
def write_worker(input, pre):
os.system(‘mkdir %s’%pre)
i = 0
while true:
content = input.get()
if content.strip() == ‘stop’:
return
write_sorted_bulk(content, ‘%s/%s’%(pre, i))
i += 1
def write_sorted_bulk(content, filename):
f = file(filename, ‘w’)
f.write(content)
f.close()
def split_sort_file(filename, num_sort = 3, buf_size = 65536*64*4):
t = datetime.now()
pre, ext = os.path.splitext(filename)
if ext == ‘.gz’:
file_file = gzip.open(filename, ‘rb’)
else:
file_file = open(filename)
bulk_queue = queue(10)
sorted_queue = queue(10)
num_sort = num_sort
sort_worker_pool = []
for i in range(num_sort):
sort_worker_pool.append( process(target=sort_worker, args=(bulk_queue, sorted_queue)) )
sort_worker_pool[i].start()
num_write = 1
write_worker_pool = []
for i in range(num_write):
write_worker_pool.append( process(target=write_worker, args=(sorted_queue, pre)) )
write_worker_pool[i].start()
buf = file_file.read(buf_size)
sorted_count = 0
while len(buf):
end_line = buf.rfind(‘\n’)
#print buf[:end_line+1]
bulk_queue.put(buf[:end_line+1])
sorted_count += 1
if end_line != -1:
buf = buf[end_line+1:] + file_file.read(buf_size)
else:
buf = file_file.read(buf_size)
for i in range(num_sort):
bulk_queue.put(‘stop’)
for i in range(num_sort):
sort_worker_pool[i].join()
for i in range(num_write):
sorted_queue.put(‘stop’)
for i in range(num_write):
write_worker_pool[i].join()
print ‘elasped ‘, datetime.now() – t
return sorted_count
from heapq import heappush, heappop
from datetime import datetime
from multiprocessing import process, queue, pipe, current_process, freeze_support
import os
class file_heap:
def __init__(self, dir, idx = 0, count = 1):
files = os.listdir(dir)
self.heap = []
self.files = {}
self.bulks = {}
self.pre_element = none
for i in range(len(files)):
file = files[i]
if hash(file) % count != idx: continue
input = open(os.path.join(dir, file))
self.files[i] = input
self.bulks[i] = ”
heappush(self.heap, (self.get_next_element_buffered(i), i))
def get_next_element_buffered(self, i):
if len(self.bulks[i]) < 256: if self.files[i] is not none: buf = self.files[i].read(65536) if buf: self.bulks[i] += buf else: self.files[i].close() self.files[i] = none end_line = self.bulks[i].find('\n') if end_line == -1: end_line = len(self.bulks[i]) element = self.bulks[i][:end_line] self.bulks[i] = self.bulks[i][end_line+1:] return element def poppush_uniq(self): while true: element = self.poppush() if element is none: return none if element != self.pre_element: self.pre_element = element return element def poppush(self): try: element, index = heappop(self.heap) except indexerror: return none new_element = self.get_next_element_buffered(index) if new_element: heappush(self.heap, (new_element, index)) return element def heappoppush(dir, queue, idx = 0, count = 1): heap = file_heap(dir, idx, count) while true: d = heap.poppush_uniq() queue.put(d) if d is none: return def heappoppush2(dir, queue, count = 1): heap = [] procs = [] queues = [] pre_element = none for i in range(count): q = queue(1024) q_buf = queue_buffer(q) queues.append(q_buf) p = process(target=heappoppush, args=(dir, q_buf, i, count)) procs.append(p) p.start() queues = tuple(queues) for i in range(count): heappush(heap, (queues[i].get(), i)) while true: try: d, i= heappop(heap) except indexerror: queue.put(none) for p in procs: p.join() return else: if d is not none: heappush(heap,(queues[i].get(), i)) if d != pre_element: pre_element = d queue.put(d) def merge_file(dir): heap = file_heap( dir ) os.system('rm -f '+dir+'.merge') fmerge = open(dir+'.merge', 'a') element = heap.poppush_uniq() fmerge.write(element+'\n') while element is not none: element = heap.poppush_uniq() fmerge.write(element+'\n') class queue_buffer: def __init__(self, queue): self.q = queue self.rbuf = [] self.wbuf = [] def get(self): if len(self.rbuf) == 0: self.rbuf = self.q.get() r = self.rbuf[0] del self.rbuf[0] return r def put(self, d): self.wbuf.append(d) if d is none or len(self.wbuf) > 1024:
self.q.put(self.wbuf)
self.wbuf = []
def diff_file(file_old, file_new, file_diff, buf = 268435456):
print ‘buffer size’, buf
from file_split import split_sort_file
os.system(‘rm -rf ‘+ os.path.splitext(file_old)[0] )
os.system(‘rm -rf ‘+ os.path.splitext(file_new)[0] )
t = datetime.now()
split_sort_file(file_old,5,buf)
split_sort_file(file_new,5,buf)
print ‘split elasped ‘, datetime.now() – t
os.system(‘cat %s/* | wc -l’%os.path.splitext(file_old)[0])
os.system(‘cat %s/* | wc -l’%os.path.splitext(file_new)[0])
os.system(‘rm -f ‘+file_diff)
t = datetime.now()
zdiff = open(file_diff, ‘a’)
old_q = queue(1024)
new_q = queue(1024)
old_queue = queue_buffer(old_q)
new_queue = queue_buffer(new_q)
h1 = process(target=heappoppush2, args=(os.path.splitext(file_old)[0], old_queue, 3))
h2 = process(target=heappoppush2, args=(os.path.splitext(file_new)[0], new_queue, 3))
h1.start(), h2.start()
old = old_queue.get()
new = new_queue.get()
old_count, new_count = 0, 0
while old is not none or new is not none:
if old > new or old is none:
zdiff.write(‘< '+new+'\n') new = new_queue.get() new_count +=1 elif old < new or new is none: zdiff.write('> ‘+old+’\n’)
old = old_queue.get()
old_count +=1
else:
old = old_queue.get()
new = new_queue.get()
print ‘new_count:’, new_count
print ‘old_count:’, old_count
print ‘diff elasped ‘, datetime.now() – t
h1.join(), h2.join()

希望本文所述对大家的python程序设计有所帮助。

Posted in 未分类

发表评论