项目中碰到了这么一个需求:
有一个record文件,每行一个item,整个文件大小在2G左右。根据要求,需要每天向其他系统提供100000个item,怎么处理比较好?考虑之后觉得分片的主意不错,先根据每片的item数对这个大文件进行分片,然后每天投放一片即可。具体python代码如下: View Code
# -*- coding: utf-8 -*- import os import sys import shutil import time # import linecache import hashlib import zlib import binascii import urllib2 import logging datas_dir = "./datas/" items_per_page = 10000 url_prefix = "http://172.16.1.110:80/download/" check_result_dir = "./results/" logger = logging.getLogger(__name__) def initialize(): """ @summary: initialize the working directory """ if os.path.exists(datas_dir) and os.path.isdir(datas_dir): # clear datas dir print "begin to remove old datas directory" shutil.rmtree(datas_dir) print "begin to make datas directory" # to resove the conflict between rmtree and mkdir, so i will sleep 1 seconds time.sleep(1) os.mkdir(datas_dir) def read_specific_lines(file, lines_to_read): """ @summary: read specific lines from file file is any iterable; lines_to_read is an iterable containing int values """ lines = set(lines_to_read) last = max(lines) for n, line in enumerate(file): if n + 1 in lines: yield line if n + 1 > last: return def split_file(filename, lines_per_page): """ @summary: split the file into n lines a page """ if lines_per_page <=0: lines_per_page = 1 with open(filename, 'r') as fp: lines = [] for n, line in enumerate(fp): guard = n % lines_per_page if guard == 0: lines = [] lines.append(line) if guard == lines_per_page - 1: yield lines yield lines def write_to_file(lines, filename): """ @summary: write lines to specified file """ with open(filename, 'w') as fp: for line in lines: # construct content line_to_write = url_prefix + line fp.write(line_to_write) def calculate_md5_crc32(msg): """ @summary: calculate the md5 and crc32 """ m = hashlib.md5() m.update(msg) md5 = m.hexdigest().upper() crc32 = binascii.crc32(msg) crc32 = crc32 & 0xffffffff crc32_str = "%08x" % crc32 crc32_str = crc32_str.upper() return md5 + '.' + crc32_str def check_file_integrity(download_url): """ @summary: download file and check it's integrity @return: True/False """ try: file_name = download_url.rsplit("/", 1)[1] response = urllib2.urlopen(download_url) md5_crc32 = calculate_md5_crc32(response.read()) print "file_name = %s, md5_crc32 = %s" %(file_name, md5_crc32) if file_name == md5_crc32: return True else: return False except Exception, ex: logger.exception(ex) return False def do_check(): if os.path.exists(check_result_dir) and os.path.isdir(check_result_dir): # clear datas dir print "begin to remove old result directory" shutil.rmtree(check_result_dir) print "begin to make result directory" # to resove the conflict between rmtree and mkdir, so i will sleep 1 seconds time.sleep(1) os.mkdir(check_result_dir) # fp = open("not_integrity.list", 'w') for n, lines in enumerate(split_file("alive_sample.log", items_per_page)): print "begin to check %d sample list" %( n+1) if n >= 1: break filename = os.path.join(check_result_dir, "notintergrity_" + str(n + 1) + ".list") fp = open(filename, 'w') for line in lines: try: download_url = url_prefix + line.strip() res = check_file_integrity(download_url) if res == False: fp.write(line) fp.flush() logger.error("check integrity error, download_url = %s", download_url) else: print "%s check OK" % line except Exception, ex: logger.exception(ex) fp.close() fp.close() if __name__ == "__main__": import myloggingconfig #do_check() #assert False print check_file_integrity("http://172.16.1.110:80/download/B4D2EF861106F6812668D5163EA9CD58.4F38C168") assert False initialize() for n, lines in enumerate(split_file("20120106.rpt", items_per_page)): print "begin construct %d sample list" %( n+1) ## if n > 4: ## break # construct file name filename = os.path.join(datas_dir, "samplelist_" + str(n + 1) + ".list") write_to_file(lines, filename)
上述代码中包含了计算md5和crc32的工具,整个分片功能包含在split_file函数中。
def split_file(filename, lines_per_page): """ @summary: split the file into n lines a page """ if lines_per_page <=0: lines_per_page = 1 with open(filename, 'r') as fp: lines = [] for n, line in enumerate(fp): guard = n % lines_per_page if guard == 0: lines = [] lines.append(line) if guard == lines_per_page - 1: yield lines yield lines