筆記:High Performance Python第9章

這一章講的是並行計算,佔了60多頁。看完後得把這一大坨東西理理順消化一下。python

1.用蒙特卡洛模擬估算pi。

邏輯很簡單:在座標系的單位正方形裏投針,計算落在1/4單位圓中(x^2+y^2<=1)的針的比例,而後乘以4。順序執行,投擲100,000,000次花費的時間大約是120秒。app

1.1用多進程加速

每次投擲都是彼此獨立的,因此直接把工做量差成多份(好比8份),交給多個進程執行便可。能夠想見,這裏運行的函數不須要與程序中其餘的部分共享狀態,並且進程間只須要傳遞一小部分數據就能完成大量運算。
函數estimate_nbr_points_in_quarter_circle反覆投針而後計算落在1/4單位圓內的針的個數:dom

def estimate_points(nbr_estimates):
    in_unit_circle = 0
    for step in range(int(nbr_estimates)):
        x = random.uniform(0, 1)
        y = random.uniform(0, 1)
        is_in_unit_circle = x * x + y * y <= 1.0
        in_unit_circle += is_in_unit_circle
    return in_unit_circle

並行部分:socket

from multiprocessing import Pool
...
samples_total = 1e8
N = 8
pool = Pool(processes=N)
samples = samples_total / N
trials = [samples] * N

t1 = time.time()
nbr_in_unit_circles = pool.map(estimate_points, trials)
pi_estimate = sum(nbr_in_unit_circles) * 4 / samples_total

print("Estimate pi", pi_estimate)
print("Delta:", time.time() - t1)

這是一個寫起來至關簡單的並行程序。肯定要使用的進程個數(通常是cpu的個數),在trials列表裏設置好每一個進程的參數,而後往pool.map裏一丟,就好像在使用普通的map函數同樣。運行它大約花了19秒。函數

(插一句,書中這段代碼讀起來十分辛苦,由於變量名實在過長,處處都是nbr_trials_in_quarter_unit_circle,nbr_trials_per_process,nbr_samples_in_total這種三四個單詞拼起來的變量名,把很簡單的東西搞得大段大段的,繞得人頭暈。這個故事告訴咱們代碼要簡練,信息密度要高。)翻譯

Effective Python第41條指出,multiprocessing的開銷是比較高的,由於主進程和子進程之間必須進行序列化和反序列化操做。具體而言,multiprocessing作了這些事:code

  1. trials列表中的每一項數據都傳給map
  2. pickle模塊對數據進行序列化,將其變成二進制形式。
  3. 經過local socket將序列化以後的數據從主解釋器所在的進程發送到子解釋器所在的進程。
  4. 在子進程中用pickle對二進制數據進行反序列化操做,將其還原爲python對象。
  5. 引入包含estimate_points函數的python模塊。
  6. 各條子進程平行地針對各自的輸入數據運行estimate_points函數。
  7. 對運行結果進行序列化操做,將其轉變爲字節。
  8. 將這些字節經過socket複製到主進程中。
  9. 主進程對這些字節執行反序列化操做,還原爲python對象。
  10. 把每條子進程求出的結算結果合併到一份列表中,返回給調用者。

1.2並行系統中的隨機數

在並行計算中得好好想一想會不會獲得重複的或者相關的隨機序列。若是是使用python自帶的random模塊,multiprocessing會本身在每次fork過程當中從新設置隨機數生成器的種子。但若是是用numpy,就得親自從新設置,否則random就會返回相同的序列。orm

使用numpy:對象

import numpy as np

def estimate_points(samples):
    np.random.seed()
    xs = np.random.uniform(0, 1, samples)
    ys = np.random.uniform(0, 1, samples)
    is_in_quc = (xs * xs + ys * ys) <= 1.0 
    in_quc = np.sum(is_in_quc)
    return in_quc

使用numpy使得運行時間縮減到了1.13秒。numpy可太強了(或者說CPython可太菜了)。隊列

2.查找素數

在一個很大的範圍內查找素數和估算pi是不一樣的,由於工做量的大小和查找範圍的上下限的大小有關(檢查[10, 100]和[10000, 100000]的工做量確定是不一樣的),檢查每一個數字的複雜程度也不同(誰知道檢查到第幾個素因數的時候它就被整除了呢?偶數檢查起來是最簡單的,素數是最難的)。這個問題是embarassingly parallel的(不會翻譯...),即沒有須要共享的狀態。關鍵在於如何平衡進程之間的工做量(load balancing),將複雜度各異的任務分配給有限的計算資源。
當咱們把計算任務分配給進程池的時候,咱們能夠控制給每一個進程分配多少工做量,把工做量分紅塊(chunk),一旦有cpu空閒下來了就給它分配工做。塊越大,通訊開銷越小;塊越小,控制越精細。(塊大小爲10就是指一個進程一次檢查10個數字)。做者們給出了一張」塊數-運行時間「的關係圖,用來講明「塊數是cpu個數的倍數時運行時間最短」這個簡單的道理(否則在執行最後一輪計算時會有cpu空着)。

咱們可使用隊列來向一組工做進程提供任務並收集結果:

ALL_DONE = b"ALL_DONE"
WORKER_FINISHED = b"WORKER_FINISHED"

def check_prime(possibles, definites):
    while True:
        n = possibles.get()
        if n == ALL_DONE:
            definites.put(WORKER_FINISHED)
            break
        else:
            if n % 2 == 0:
                continue
            for i in range(3, int(math.sqrt(n)) + 1 , 2):
                if n % i == 0:
                    break
            else:
                definites.put(n)

possiblesdefinites爲兩個隊列,用於結果的輸入和輸出。咱們設置了兩個標誌位(flag),ALL_DONE做爲終止循環的sentinel由父進程在將數字塞進possibles後提供,用於告訴子進程已經已經沒有別的工做了。子進程收到ALL_DONE後,向definites輸出WORKER_FINISHED,告訴父進程本身已經收到sentinel,而後終止從possibles隊列獲取輸入。

建立輸入輸出隊列和8個進程,向possibles隊列中添加數字,並在最後加入8個ALL_DONEsentinel:

if __name__ == '__main__':
    primes = []
    possibles = Queue()
    definites = Queue()

    N = 8
    pool = Pool(processes=N)
    processes = []
    for _ in range(N):
        p = Process(target=check_prime,args=(possibles, definites))
        processes.append(p)
        p.start()
    
    t1 = time.time()
    
    number_range = range(10000000000, 10000100000)
    for possible in number_range:
        possibles.put(possible)
    print("ALL JOBS ADDED TO THE QUEUE")

    # add poison pills to stop the remote workers
    for n in range(N):
        possibles.put(ALL_DONE)
    print("NOW WAITING FOR RESULTS...")
    ...

循環地從definites隊列中獲取結果(固然,結果不是順序的),獲得8個WORKER_FINISHED後中止循環:

...
    processors_finished = 0
    while True:
        new_result = definites.get()
        if new_result == WORKER_FINISHED:
            processors_finished += 1
            print("{} WORKER(S) FINISHED".format(processors_finished))
            if processors_finished == N:
                break
        else:
            primes.append(new_result)
    assert processors_finished == N

    print("Took:", time.time() - t1)
    print(len(primes), primes[:10], primes[-10:])

程序執行花了7秒多,而順序執行須要20秒左右。但因爲建立隊列須要序列化和同步,多進程的執行速度並不必定會比順序執行更快。在原書中就是如此,甚至看成者把全部偶數從輸入隊列中剔除後多進程執行仍是比順序執行慢,說明多進程執行的程序有很大一部分時間是花費在通訊開銷上的。

3.驗證素數

與第2節的「尋找一個範圍內全部的素數」不一樣,如今咱們來解決如何快速判斷一個特別大的數(好比一個18位數)是否爲素數的問題——由多個cpu合做完成。這是一個須要進程間通訊或共享狀態的問題。

3.1簡單的進程池

與前兩個例子類似,咱們把要檢查的數字的可能的因子分爲多組,傳遞給多個子進程進行檢查。當某個子進程中的因子整除了這個數,子進程就返回False——但這不會讓別的子進程停下來(因此是一個簡單的版本)。這或許會讓別的子進程作無用功,但也省去了檢查共享狀態的通訊開銷。
把因子分組:

def create_range(from_i, to_i, N):
    piece_length = int((to_i - from_i) / N)
    lrs = [from_i] 
          + [(i + 1) if (i % 2 == 0) else i 
             for i in range(from_i, to_i, piece_length)[1:]]
    if len(lrs) > N:
        lrs.pop()
    assert len(lrs) == N
    ranges = list(zip(lrs, lrs[1:])) + [(lrs[-1], to_i)]
    return ranges

e.g. create_range(1000, 100000, 4)的返回值是[(1000, 25751), (25751, 50501), (50501, 75251), (75251, 100000)]。

import time
import math
from multiprocessing import Pool


def check_prime_in_range(args):
    n, from_i, to_i = args #彷佛只能經過傳入元組而後再拆包的方式達到傳入多參數的效果
    from_i, to_i = ranges
    if n % 2 == 0:
        return False
    for i in range(from_i, to_i, 2):
        if n % i == 0:
            return False
    return True


def check_prime(n, pool, N):
    from_i = 3
    to_i = int(math.sqrt(n)) + 1
    ranges = create_range(from_i, to_i, N)
    args = [(n, from_i, to_i) for from_i, to_i in ranges]`
    results = pool.map(check_prime_in_range, args)
    if False in results:
        return False
    return True


if __name__ == "__main__":
    N = 8
    pool = Pool(processes=N)
    prime18 = 100109100129100151
    t1 = time.time()
    print("%d: %s" %(prime18, check_prime(prime18, pool, N)))
    print('Took:', time.time() - t1)

大約用了10s。

3.2稍微沒那麼簡單的進程池

因爲額外開銷的緣由,對於小一點的數字,多進程的方法可能尚未順序查找的方法好。並且,若是已經找到了一個很小的因數,程序也不會立刻停下來。固然,在因數時咱們能夠當即在進程間通訊,但這會產生大量的額外通訊開銷,由於大多數數字都會有一個較小的因數。因而咱們採用混合策略:先順序查找較小的因數,而後再將剩餘的工做分派給多個進程。這是一種避免多進程開銷的常見作法。

def check_prime(n, pool, N):
    from_i = 3
    to_i= 21
    args = (n, from_i, to_i)
    if not check_prime_in_range(args):
        return False
        
    from_i = to_i
    to_i = int(math.sqrt(n)) + 1
    ranges = create_range(from_i, to_i, N)
    args = [(n, from_i, to_i) for from_i, to_i in ranges]
    results = pool.map(check_prime_in_range, args)
    if False in results:
        return False
    return True

3.3使用multiprocessing.Manager()做爲標誌位

直接上代碼吧。能夠看到這裏用Manager建立了一個符號位。讀取這個符號位並不須要本身作什麼上鎖之類的操做,就像在檢查一個全局變量同樣方便(不過仍是要做爲參數傳入函數中的)。爲了節約通訊開銷,讓每一個進程每檢查1000個數檢查一次符號位。若是進程檢查到了FLAG_SET或者找到了因數就停下來。

import time
import math
from multiprocessing import Pool, Manager


SERIAL_CHECK_CUTOFF = 21
CHECK_EVERY = 1000
FLAG_CLEAR = b'0'
FLAG_SET = b'1'


def create_range(from_i, to_i, N):
    piece_length = int((to_i - from_i) / N)
    lrs = [from_i] + [(i + 1) if (i % 2 == 0) else i for i in range(from_i, to_i, piece_length)[1:]]
    if len(lrs) > N:
        lrs.pop()
        assert len(lrs) == N
    ranges = list(zip(lrs, lrs[1:])) + [(lrs[-1], to_i)]
    return ranges

def check_prime_in_range(args):
    n, from_i, to_i, value = args
    if n % 2 == 0:
        return False
    check_every = CHECK_EVERY
    for i in range(from_i, to_i, 2):
        check_every -= 1
        if not check_every:
            if value.value == FLAG_SET:
                return False
            check_every = CHECK_EVERY

        if n % i == 0:
            value.value = FLAG_SET
            return False
    return True

def check_prime(n, pool, N, value):
    from_i = 3
    to_i= SERIAL_CHECK_CUTOFF
    value.value = FLAG_CLEAR  # 要記得先設置標誌位的值
    args = (n, from_i, to_i, value)
    if not check_prime_in_range(args):
        return False

    from_i = to_i
    to_i = int(math.sqrt(n)) + 1
    ranges = create_range(from_i, to_i, N)
    args = [(n, from_i, to_i, value) for from_i, to_i in ranges]
    results = pool.map(check_prime_in_range, args)
    if False in results:
        return False
    return True


if __name__ == "__main__":
    N = 8
    manager = Manager()
    value = manager.Value(b'c', FLAG_CLEAR) # 建立一個一字節(一字符)大小的符號標誌位
    pool = Pool(processes=N)
    prime18 = 100109100129100151
    non_prime = 100109100129101027
    t1 = time.time()
    print("%d: %s" %(non_prime, check_prime(non_prime, pool, N, value)))
    print('Took:', time.time()-t1)
相關文章
相關標籤/搜索