Python的並行求和例子

先上一個例子,這段代碼是爲了評估一個預測模型寫的,詳細評價說明在html

https://www.kaggle.com/c/how-much-did-it-rain/details/evaluation,python

它的核心是要計算app

在實際計算過程當中,n很大(1126694),以致於單進程直接計算時間消耗巨大(14分10秒),async

因此這裏參考mapReduce的思想,嘗試使用多進程的方式進行計算,即每一個進程計算一部分n,最後將結果相加再計算C測試

代碼以下:this

import csv
import sys
import logging
import argparse
import numpy as np
import multiprocessing
import time

# configure logging
logger = logging.getLogger("example")

handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(logging.Formatter(
    '%(asctime)s %(levelname)s %(name)s: %(message)s'))

logger.addHandler(handler)
logger.setLevel(logging.DEBUG)

def H(n, z):
    return (n-z) >= 0

def evaluate(args, start, end):
    '''handle range[start, end)'''
    logger.info("Started %d to %d" %(start, end))
    expReader = open('train_exp.csv','r')
    expReader.readline()
    for i in range(start):
        _ = expReader.readline()
    predFile = open(args.predict)
    for i in range(start+1):
        _ = predFile.readline()
    predReader = csv.reader(predFile, delimiter=',')
    squareErrorSum = 0
    totalLines = end - start
    for i, row in enumerate(predReader):
        if i == totalLines:
            logger.info("Completed %d to %d" %(start, end))
            break
        expId, exp = expReader.readline().strip().split(',')
        exp = float(exp)
        predId = row[0]
        row = np.array(row, dtype='float')
        #assert expId == predId
        #lineSum = 0
        for j in xrange(1,71):
            n = j - 1
            squareErrorSum += (row[j]-(n>=exp))**2
            #squareErrorSum += (row[j]-H(n,exp))**2
            #lineSum += (row[j]-H(n,exp))**2
    logger.info('SquareErrorSum %d to %d: %f' %(start, end, squareErrorSum))
    return squareErrorSum

def fileCmp(args):
    '''check number of lines in two files are same'''
    for count, line in enumerate(open('train_exp.csv')):
        pass
    expLines = count + 1 - 1 #discare header
    for count, line in enumerate(open(args.predict)):
        pass
    predictLines = count + 1 - 1
    print 'Lines(exp, predict):', expLines, predictLines
    assert expLines == predictLines
    evaluate.Lines = expLines
    
if __name__ == "__main__":
    # set up logger
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument('--predict', 
                        help=("path to an predict probability file, this will "
                              "predict_changeTimePeriod.csv"))
    args = parser.parse_args()
    fileCmp(args)
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    result = []
    blocks = multiprocessing.cpu_count()
    linesABlock = evaluate.Lines / blocks
    for i in xrange(blocks-1):
        result.append(pool.apply_async(evaluate, (args, i*linesABlock, (i+1)*linesABlock)))
    result.append(pool.apply_async(evaluate, (args, (i+1)*linesABlock, evaluate.Lines+1)))
    pool.close()
    pool.join()
    result = [res.get() for res in result]
    print result
    print 'evaluate.Lines', evaluate.Lines
    score = sum(result) / (70*evaluate.Lines)
    print "score:", score

 

這裏是有幾個CPU核心就分紅幾個進程進行計算,但願儘可能榨乾CPU的計算能力。實際上運行過程當中CPU的佔用率也一直是100%lua

測試後計算結果與單進程一致,計算時間縮短爲6分27秒,只快了一倍。spa

提高沒有想象中的大。線程

通過嘗試直接用StringIO將原文件每一個進程加載一份到內存在進行處理速度也沒有進一步提高,結合CPU的100%佔用率考慮看起來是由於計算能力還不夠。code

看來計算密集密集型的工做仍是須要用C來寫的:)

C的實現要比python快太多了,單線程只須要50秒就能搞定,詳見:

http://www.cnblogs.com/instant7/p/4313649.html

相關文章
相關標籤/搜索