這一章講的是並行計算,佔了60多頁。看完後得把這一大坨東西理理順消化一下。python
邏輯很簡單:在座標系的單位正方形裏投針,計算落在1/4單位圓中(x^2+y^2<=1)的針的比例,而後乘以4。順序執行,投擲100,000,000次花費的時間大約是120秒。app
每次投擲都是彼此獨立的,因此直接把工做量差成多份(好比8份),交給多個進程執行便可。能夠想見,這裏運行的函數不須要與程序中其餘的部分共享狀態,並且進程間只須要傳遞一小部分數據就能完成大量運算。
函數estimate_nbr_points_in_quarter_circle
反覆投針而後計算落在1/4單位圓內的針的個數:dom
def estimate_points(nbr_estimates): in_unit_circle = 0 for step in range(int(nbr_estimates)): x = random.uniform(0, 1) y = random.uniform(0, 1) is_in_unit_circle = x * x + y * y <= 1.0 in_unit_circle += is_in_unit_circle return in_unit_circle
並行部分:socket
from multiprocessing import Pool ... samples_total = 1e8 N = 8 pool = Pool(processes=N) samples = samples_total / N trials = [samples] * N t1 = time.time() nbr_in_unit_circles = pool.map(estimate_points, trials) pi_estimate = sum(nbr_in_unit_circles) * 4 / samples_total print("Estimate pi", pi_estimate) print("Delta:", time.time() - t1)
這是一個寫起來至關簡單的並行程序。肯定要使用的進程個數(通常是cpu的個數),在trials
列表裏設置好每一個進程的參數,而後往pool.map
裏一丟,就好像在使用普通的map
函數同樣。運行它大約花了19秒。函數
(插一句,書中這段代碼讀起來十分辛苦,由於變量名實在過長,處處都是nbr_trials_in_quarter_unit_circle,nbr_trials_per_process,nbr_samples_in_total這種三四個單詞拼起來的變量名,把很簡單的東西搞得大段大段的,繞得人頭暈。這個故事告訴咱們代碼要簡練,信息密度要高。)翻譯
Effective Python第41條指出,multiprocessing
的開銷是比較高的,由於主進程和子進程之間必須進行序列化和反序列化操做。具體而言,multiprocessing
作了這些事:code
trials
列表中的每一項數據都傳給map
。pickle
模塊對數據進行序列化,將其變成二進制形式。pickle
對二進制數據進行反序列化操做,將其還原爲python對象。estimate_points
函數的python模塊。estimate_points
函數。在並行計算中得好好想一想會不會獲得重複的或者相關的隨機序列。若是是使用python自帶的random
模塊,multiprocessing
會本身在每次fork過程當中從新設置隨機數生成器的種子。但若是是用numpy
,就得親自從新設置,否則random
就會返回相同的序列。orm
使用numpy
:對象
import numpy as np def estimate_points(samples): np.random.seed() xs = np.random.uniform(0, 1, samples) ys = np.random.uniform(0, 1, samples) is_in_quc = (xs * xs + ys * ys) <= 1.0 in_quc = np.sum(is_in_quc) return in_quc
使用numpy
使得運行時間縮減到了1.13秒。numpy
可太強了(或者說CPython可太菜了)。隊列
在一個很大的範圍內查找素數和估算pi是不一樣的,由於工做量的大小和查找範圍的上下限的大小有關(檢查[10, 100]和[10000, 100000]的工做量確定是不一樣的),檢查每一個數字的複雜程度也不同(誰知道檢查到第幾個素因數的時候它就被整除了呢?偶數檢查起來是最簡單的,素數是最難的)。這個問題是embarassingly parallel的(不會翻譯...),即沒有須要共享的狀態。關鍵在於如何平衡進程之間的工做量(load balancing),將複雜度各異的任務分配給有限的計算資源。
當咱們把計算任務分配給進程池的時候,咱們能夠控制給每一個進程分配多少工做量,把工做量分紅塊(chunk),一旦有cpu空閒下來了就給它分配工做。塊越大,通訊開銷越小;塊越小,控制越精細。(塊大小爲10就是指一個進程一次檢查10個數字)。做者們給出了一張」塊數-運行時間「的關係圖,用來講明「塊數是cpu個數的倍數時運行時間最短」這個簡單的道理(否則在執行最後一輪計算時會有cpu空着)。
咱們可使用隊列來向一組工做進程提供任務並收集結果:
ALL_DONE = b"ALL_DONE" WORKER_FINISHED = b"WORKER_FINISHED" def check_prime(possibles, definites): while True: n = possibles.get() if n == ALL_DONE: definites.put(WORKER_FINISHED) break else: if n % 2 == 0: continue for i in range(3, int(math.sqrt(n)) + 1 , 2): if n % i == 0: break else: definites.put(n)
possibles
和definites
爲兩個隊列,用於結果的輸入和輸出。咱們設置了兩個標誌位(flag),ALL_DONE
做爲終止循環的sentinel由父進程在將數字塞進possibles
後提供,用於告訴子進程已經已經沒有別的工做了。子進程收到ALL_DONE
後,向definites
輸出WORKER_FINISHED
,告訴父進程本身已經收到sentinel,而後終止從possibles
隊列獲取輸入。
建立輸入輸出隊列和8個進程,向possibles
隊列中添加數字,並在最後加入8個ALL_DONE
sentinel:
if __name__ == '__main__': primes = [] possibles = Queue() definites = Queue() N = 8 pool = Pool(processes=N) processes = [] for _ in range(N): p = Process(target=check_prime,args=(possibles, definites)) processes.append(p) p.start() t1 = time.time() number_range = range(10000000000, 10000100000) for possible in number_range: possibles.put(possible) print("ALL JOBS ADDED TO THE QUEUE") # add poison pills to stop the remote workers for n in range(N): possibles.put(ALL_DONE) print("NOW WAITING FOR RESULTS...") ...
循環地從definites
隊列中獲取結果(固然,結果不是順序的),獲得8個WORKER_FINISHED
後中止循環:
... processors_finished = 0 while True: new_result = definites.get() if new_result == WORKER_FINISHED: processors_finished += 1 print("{} WORKER(S) FINISHED".format(processors_finished)) if processors_finished == N: break else: primes.append(new_result) assert processors_finished == N print("Took:", time.time() - t1) print(len(primes), primes[:10], primes[-10:])
程序執行花了7秒多,而順序執行須要20秒左右。但因爲建立隊列須要序列化和同步,多進程的執行速度並不必定會比順序執行更快。在原書中就是如此,甚至看成者把全部偶數從輸入隊列中剔除後多進程執行仍是比順序執行慢,說明多進程執行的程序有很大一部分時間是花費在通訊開銷上的。
與第2節的「尋找一個範圍內全部的素數」不一樣,如今咱們來解決如何快速判斷一個特別大的數(好比一個18位數)是否爲素數的問題——由多個cpu合做完成。這是一個須要進程間通訊或共享狀態的問題。
與前兩個例子類似,咱們把要檢查的數字的可能的因子分爲多組,傳遞給多個子進程進行檢查。當某個子進程中的因子整除了這個數,子進程就返回False
——但這不會讓別的子進程停下來(因此是一個簡單的版本)。這或許會讓別的子進程作無用功,但也省去了檢查共享狀態的通訊開銷。
把因子分組:
def create_range(from_i, to_i, N): piece_length = int((to_i - from_i) / N) lrs = [from_i] + [(i + 1) if (i % 2 == 0) else i for i in range(from_i, to_i, piece_length)[1:]] if len(lrs) > N: lrs.pop() assert len(lrs) == N ranges = list(zip(lrs, lrs[1:])) + [(lrs[-1], to_i)] return ranges
e.g. create_range(1000, 100000, 4)
的返回值是[(1000, 25751), (25751, 50501), (50501, 75251), (75251, 100000)]。
import time import math from multiprocessing import Pool def check_prime_in_range(args): n, from_i, to_i = args #彷佛只能經過傳入元組而後再拆包的方式達到傳入多參數的效果 from_i, to_i = ranges if n % 2 == 0: return False for i in range(from_i, to_i, 2): if n % i == 0: return False return True def check_prime(n, pool, N): from_i = 3 to_i = int(math.sqrt(n)) + 1 ranges = create_range(from_i, to_i, N) args = [(n, from_i, to_i) for from_i, to_i in ranges]` results = pool.map(check_prime_in_range, args) if False in results: return False return True if __name__ == "__main__": N = 8 pool = Pool(processes=N) prime18 = 100109100129100151 t1 = time.time() print("%d: %s" %(prime18, check_prime(prime18, pool, N))) print('Took:', time.time() - t1)
大約用了10s。
因爲額外開銷的緣由,對於小一點的數字,多進程的方法可能尚未順序查找的方法好。並且,若是已經找到了一個很小的因數,程序也不會立刻停下來。固然,在因數時咱們能夠當即在進程間通訊,但這會產生大量的額外通訊開銷,由於大多數數字都會有一個較小的因數。因而咱們採用混合策略:先順序查找較小的因數,而後再將剩餘的工做分派給多個進程。這是一種避免多進程開銷的常見作法。
def check_prime(n, pool, N): from_i = 3 to_i= 21 args = (n, from_i, to_i) if not check_prime_in_range(args): return False from_i = to_i to_i = int(math.sqrt(n)) + 1 ranges = create_range(from_i, to_i, N) args = [(n, from_i, to_i) for from_i, to_i in ranges] results = pool.map(check_prime_in_range, args) if False in results: return False return True
直接上代碼吧。能夠看到這裏用Manager建立了一個符號位。讀取這個符號位並不須要本身作什麼上鎖之類的操做,就像在檢查一個全局變量同樣方便(不過仍是要做爲參數傳入函數中的)。爲了節約通訊開銷,讓每一個進程每檢查1000個數檢查一次符號位。若是進程檢查到了FLAG_SET
或者找到了因數就停下來。
import time import math from multiprocessing import Pool, Manager SERIAL_CHECK_CUTOFF = 21 CHECK_EVERY = 1000 FLAG_CLEAR = b'0' FLAG_SET = b'1' def create_range(from_i, to_i, N): piece_length = int((to_i - from_i) / N) lrs = [from_i] + [(i + 1) if (i % 2 == 0) else i for i in range(from_i, to_i, piece_length)[1:]] if len(lrs) > N: lrs.pop() assert len(lrs) == N ranges = list(zip(lrs, lrs[1:])) + [(lrs[-1], to_i)] return ranges def check_prime_in_range(args): n, from_i, to_i, value = args if n % 2 == 0: return False check_every = CHECK_EVERY for i in range(from_i, to_i, 2): check_every -= 1 if not check_every: if value.value == FLAG_SET: return False check_every = CHECK_EVERY if n % i == 0: value.value = FLAG_SET return False return True def check_prime(n, pool, N, value): from_i = 3 to_i= SERIAL_CHECK_CUTOFF value.value = FLAG_CLEAR # 要記得先設置標誌位的值 args = (n, from_i, to_i, value) if not check_prime_in_range(args): return False from_i = to_i to_i = int(math.sqrt(n)) + 1 ranges = create_range(from_i, to_i, N) args = [(n, from_i, to_i, value) for from_i, to_i in ranges] results = pool.map(check_prime_in_range, args) if False in results: return False return True if __name__ == "__main__": N = 8 manager = Manager() value = manager.Value(b'c', FLAG_CLEAR) # 建立一個一字節(一字符)大小的符號標誌位 pool = Pool(processes=N) prime18 = 100109100129100151 non_prime = 100109100129101027 t1 = time.time() print("%d: %s" %(non_prime, check_prime(non_prime, pool, N, value))) print('Took:', time.time()-t1)