先上一個例子,這段代碼是爲了評估一個預測模型寫的,詳細評價說明在html
https://www.kaggle.com/c/how-much-did-it-rain/details/evaluation,python
它的核心是要計算app
在實際計算過程當中,n很大(1126694),以致於單進程直接計算時間消耗巨大(14分10秒),async
因此這裏參考mapReduce的思想,嘗試使用多進程的方式進行計算,即每一個進程計算一部分n,最後將結果相加再計算C測試
代碼以下:this
import csv import sys import logging import argparse import numpy as np import multiprocessing import time # configure logging logger = logging.getLogger("example") handler = logging.StreamHandler(sys.stderr) handler.setFormatter(logging.Formatter( '%(asctime)s %(levelname)s %(name)s: %(message)s')) logger.addHandler(handler) logger.setLevel(logging.DEBUG) def H(n, z): return (n-z) >= 0 def evaluate(args, start, end): '''handle range[start, end)''' logger.info("Started %d to %d" %(start, end)) expReader = open('train_exp.csv','r') expReader.readline() for i in range(start): _ = expReader.readline() predFile = open(args.predict) for i in range(start+1): _ = predFile.readline() predReader = csv.reader(predFile, delimiter=',') squareErrorSum = 0 totalLines = end - start for i, row in enumerate(predReader): if i == totalLines: logger.info("Completed %d to %d" %(start, end)) break expId, exp = expReader.readline().strip().split(',') exp = float(exp) predId = row[0] row = np.array(row, dtype='float') #assert expId == predId #lineSum = 0 for j in xrange(1,71): n = j - 1 squareErrorSum += (row[j]-(n>=exp))**2 #squareErrorSum += (row[j]-H(n,exp))**2 #lineSum += (row[j]-H(n,exp))**2 logger.info('SquareErrorSum %d to %d: %f' %(start, end, squareErrorSum)) return squareErrorSum def fileCmp(args): '''check number of lines in two files are same''' for count, line in enumerate(open('train_exp.csv')): pass expLines = count + 1 - 1 #discare header for count, line in enumerate(open(args.predict)): pass predictLines = count + 1 - 1 print 'Lines(exp, predict):', expLines, predictLines assert expLines == predictLines evaluate.Lines = expLines if __name__ == "__main__": # set up logger parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('--predict', help=("path to an predict probability file, this will " "predict_changeTimePeriod.csv")) args = parser.parse_args() fileCmp(args) pool = multiprocessing.Pool(processes=multiprocessing.cpu_count()) result = [] blocks = multiprocessing.cpu_count() linesABlock = evaluate.Lines / blocks for i in xrange(blocks-1): result.append(pool.apply_async(evaluate, (args, i*linesABlock, (i+1)*linesABlock))) result.append(pool.apply_async(evaluate, (args, (i+1)*linesABlock, evaluate.Lines+1))) pool.close() pool.join() result = [res.get() for res in result] print result print 'evaluate.Lines', evaluate.Lines score = sum(result) / (70*evaluate.Lines) print "score:", score
這裏是有幾個CPU核心就分紅幾個進程進行計算,但願儘可能榨乾CPU的計算能力。實際上運行過程當中CPU的佔用率也一直是100%lua
測試後計算結果與單進程一致,計算時間縮短爲6分27秒,只快了一倍。spa
提高沒有想象中的大。線程
通過嘗試直接用StringIO將原文件每一個進程加載一份到內存在進行處理速度也沒有進一步提高,結合CPU的100%佔用率考慮看起來是由於計算能力還不夠。code
看來計算密集密集型的工做仍是須要用C來寫的:)
C的實現要比python快太多了,單線程只須要50秒就能搞定,詳見:
http://www.cnblogs.com/instant7/p/4313649.html