在使用Python處理任務時,限於單線程處理能力有限,須要將任務並行化,分散到多個線程或者是多個進程去執行。python
concurrent.futures就是這樣一種庫,它可讓用戶能夠很是方便的將任務並行化。這個名字有點長,後面我直接使用詞彙concurrent來代替concurrent.futures。數組
concurrent提供了兩種併發模型,一個是多線程ThreadPoolExecutor,一個是多進程ProcessPoolExecutor。對於IO密集型任務宜使用多線程模型。對於計算密集型任務應該使用多進程模型。bash
爲何要這樣選擇呢?是由於Python GIL的存在讓Python虛擬機在進行運算時沒法有效利用多核心。對於純計算任務,它永遠最多隻能榨乾單個CPU核心。若是要突破這個瓶頸,就必須fork出多個子進程來分擔計算任務。而對於IO密集型任務,CPU使用率每每是極低的,使用多線程雖然會加倍CPU使用率,可是還遠遠到不了飽和(100%)的地步,在單核心能夠應付總體計算的前提下,天然是應該選擇資源佔用少的模式,也就是多線程模式。網絡
接下來咱們分別嘗試一下兩種模式來進行並行計算。多線程
# coding: utf8
# t.py
import time
import fire
import threading
from concurrent.futures import ThreadPoolExecutor, wait
# 分割子任務
def each_task(index):
time.sleep(1) # 睡1s,模擬IO
print "thread %s square %d" % (threading.current_thread().ident, index)
return index * index # 返回結果
def run(thread_num, task_num):
# 實例化線程池,thread_num個線程
executor = ThreadPoolExecutor(thread_num)
start = time.time()
fs = [] # future列表
for i in range(task_num):
fs.append(executor.submit(each_task, i)) # 提交任務
wait(fs) # 等待計算結束
end = time.time()
duration = end - start
s = sum([f.result() for f in fs]) # 求和
print "total result=%s cost: %.2fs" % (s, duration)
executor.shutdown() # 銷燬線程池
if __name__ == '__main__':
fire.Fire(run)
複製代碼
運行python t.py 2 10
,也就是2個線程跑10個任務,觀察輸出併發
thread 123145422131200 square 0thread 123145426337792 square 1
thread 123145426337792 square 2
thread 123145422131200 square 3
thread 123145426337792 square 4
thread 123145422131200 square 5
thread 123145426337792 square 6
thread 123145422131200 square 7
thread 123145426337792 square 8
thread 123145422131200 square 9
total result=285 cost: 5.02s
複製代碼
咱們看到計算總共花費了大概5s,總共sleep了10s由兩個線程分擔,因此是5s。讀者也許會問,爲何輸出亂了,這是由於print操做不是原子的,它是兩個連續的write操做合成的,第一個write輸出內容,第二個write輸出換行符,write操做自己是原子的,可是在多線程環境下,這兩個write操做會交錯執行,因此輸出就不整齊了。若是將代碼稍做修改,將print改爲單個write操做,輸出就整齊了(關於write是否絕對原子性還須要進一步深刻討論)app
# 分割子任務
def each_task(index):
time.sleep(1) # 睡1s,模擬IO
import sys
sys.stdout.write("thread %s square %d\n" % (threading.current_thread().ident, index))
return index * index # 返回結果
複製代碼
咱們再跑一下python t.py 2 10
,觀察輸出框架
thread 123145438244864 square 0
thread 123145442451456 square 1
thread 123145442451456 square 2
thread 123145438244864 square 3
thread 123145438244864 square 4
thread 123145442451456 square 5
thread 123145438244864 square 6
thread 123145442451456 square 7
thread 123145442451456 square 9
thread 123145438244864 square 8
total result=285 cost: 5.02s
複製代碼
接下來,咱們改變參數,擴大到10個線程,看看全部任務總共須要多久完成socket
> python t.py 10 10
thread 123145327464448 square 0
thread 123145335877632 square 2
thread 123145331671040 square 1
thread 123145344290816 square 4
thread 123145340084224 square 3
thread 123145348497408 square 5
thread 123145352704000 square 6
thread 123145356910592 square 7
thread 123145365323776 square 9
thread 123145361117184 square 8
total result=285 cost: 1.01s
複製代碼
能夠看到1s中就完成了全部的任務。這就是多線程的魅力,能夠將多個IO操做並行化,減小總體處理時間。分佈式
相比多線程適合處理IO密集型任務,多進程適合計算密集型。接下來咱們要模擬一下計算密集型任務。個人我的電腦有2個核心,正好能夠體驗多核心計算的優點。
那這個密集型計算任務怎麼模擬呢,咱們可使用圓周率計算公式。
經過擴大級數的長度n,就能夠無限逼近圓周率。當n特別大時,計算會比較緩慢,這時候CPU就會一直處於繁忙狀態,這正是咱們所指望的。
好,下面開寫多進程並行計算代碼
# coding: utf8
# p.py
import os
import sys
import math
import time
import fire
from concurrent.futures import ProcessPoolExecutor, wait
# 分割子任務
def each_task(n):
# 按公式計算圓周率
s = 0.0
for i in range(n):
s += 1.0/(i+1)/(i+1)
pi = math.sqrt(6*s)
# os.getpid能夠得到子進程號
sys.stdout.write("process %s n=%d pi=%s\n" % (os.getpid(), n, pi))
return pi
def run(process_num, *ns): # 輸入多個n值,分紅多個子任務來計算結果
# 實例化進程池,process_num個進程
executor = ProcessPoolExecutor(process_num)
start = time.time()
fs = [] # future列表
for n in ns:
fs.append(executor.submit(each_task, int(n))) # 提交任務
wait(fs) # 等待計算結束
end = time.time()
duration = end - start
print "total cost: %.2fs" % duration
executor.shutdown() # 銷燬進程池
if __name__ == '__main__':
fire.Fire(run)
複製代碼
經過代碼能夠看出多進程模式在代碼的編寫上和多線程沒有多大差別,僅僅是換了一個類名,其它都一摸同樣。這也是concurrent庫的魅力所在,將多線程和多進程模型抽象出了同樣的使用接口。
接下來咱們運行一下python p.py 1 5000000 5001000 5002000 5003000
,總共計算4次pi,只用一個進程。觀察輸出
process 96354 n=5000000 pi=3.1415924626
process 96354 n=5001000 pi=3.14159246264
process 96354 n=5002000 pi=3.14159246268
process 96354 n=5003000 pi=3.14159246272
total cost: 9.45s
複製代碼
能夠看出來隨着n的增大,結果愈來愈逼近圓周率,由於只用了一個進程,因此任務是串行執行,總共花了大約9.5s。
接下來再增長一個進程,觀察輸出
> python p.py 2 5000000 5001000 5002000 5003000
process 96529 n=5001000 pi=3.14159246264
process 96530 n=5000000 pi=3.1415924626
process 96529 n=5002000 pi=3.14159246268
process 96530 n=5003000 pi=3.14159246272
total cost: 4.98s
複製代碼
從耗時上看縮短了接近1半,說明多進程確實起到了計算並行化的效果。此刻若是使用top命令觀察進程的CPU使用率,這兩個進程的CPU使用率都佔到了接近100%。
若是咱們再增長2個進程,是否是還能繼續壓縮計算時間呢
> python p.py 4 5000000 5001000 5002000 5003000
process 96864 n=5002000 pi=3.14159246268
process 96862 n=5000000 pi=3.1415924626
process 96863 n=5001000 pi=3.14159246264
process 96865 n=5003000 pi=3.14159246272
total cost: 4.86s
複製代碼
看來耗時不能繼續節約了,由於只有2個計算核心,2個進程已經足以榨乾它們了,即便再多加進程也只有2個計算核心可用。
concurrent用的時候很是簡單,可是內部實現並非很好理解。在深刻分析內部的結構以前,咱們須要先理解一下Future這個對象。在前面的例子中,executor提交(submit)任務後都會返回一個Future對象,它表示一個結果的坑,在任務剛剛提交時,這個坑是空的,一旦子線程運行任務結束,就會將運行的結果塞到這個坑裏,主線程就能夠經過Future對象得到這個結果。簡單一點說,Future對象是主線程和子線程通訊的媒介。
class Future(object):
def __init__(self):
self._condition = threading.Condition() # 條件變量
self._result = None
def result(self, timeout=None):
self._condition.wait(timeout)
return self._result
def set_result(self, result):
self._result = result
self._condition.notify_all()
複製代碼
主線程將任務塞進線程池後獲得了這個Future對象,它內部的_result仍是空的。若是主線程調用result()方法獲取結果,就會阻塞在條件變量上。若是子線程計算任務完成了就會當即調用set_result()方法將結果填充進future對象,並喚醒阻塞在條件變量上的線程,也就是主線程。這時主線程當即醒過來並正常返回結果。
主線程和子線程交互分爲兩部分,第一部分是主線程如何將任務傳遞給子線程,第二部分是子線程如何將結果傳遞給主線程。第二部分已經講過了是經過Future對象來完成的。那第一部分是怎麼作到的呢?
如上圖所示,祕密就在於這個隊列,主線程是經過隊列將任務傳遞給多個子線程的。一旦主線程將任務塞進任務隊列,子線程們就會開始爭搶,最終只有一個線程能搶到這個任務,並當即進行執行,執行完後將結果放進Future對象就完成了這個任務的完整執行過程。
concurrent的線程池有個重大的設計問題,那就是任務隊列是無界的。若是隊列的生產者任務生產的太快,而線程池消費太慢處理不過來,任務就會堆積。若是堆積一直持續下去,內存就會持續增加直到OOM,任務隊列裏堆積的全部任務所有完全丟失。用戶使用時必定要注意這點,並作好適當的控制。
進程池內部結構複雜,連concurent庫的做者本身也以爲特別複雜,因此在代碼裏專門畫了一張ascii圖來說解模型內部結構
我以爲做者的這張圖還不夠好懂,因此也單獨畫了一張圖,請讀者們仔細結合上面兩張圖,一塊兒來過一邊完整的任務處理過程。
這個複雜的流程中涉及到3個隊列,還有中間附加的管理線程。那爲何做者要設計的這麼複雜,這樣的設計有什麼好處?
首先,咱們看這張圖的左半邊,它和線程池的處理流程沒有太多區別,區別僅僅是管理線程只有一個,而線程池的子線程會有多個。這樣設計可使得多進程模型和多線程模型的使用方法保持一致,這就是爲何兩個模型使用起來沒有任何區別的緣由所在——經過中間的管理線程隱藏了背後的多進程交互邏輯。
而後咱們再看這張圖的右半邊,管理線程經過兩個隊列來和子進程們進行交互,這兩個隊列都是跨進程隊列(multiprocessing.Queue)。CallQueue是單生產者多消費者,ResultQueue是多生產者單消費者。
CallQueue是個有界隊列,它的上限在代碼裏寫死了爲「子進程數+1」。若是子進程們處理不過來,CallQueue就會變滿,管理線程就會中止往裏面塞數據。可是這裏也遭遇了和線程池同樣的問題,TaskQueue是無界隊列,它的內容可無論消費者是否在持續(管理線程)消費,TaskQueue會無限制的持續生長,因而最終也會會致使OOM。
進程池模型中的跨進程隊列是用multiprocessing.Queue實現的。那這個跨進程隊列內部細節是怎樣的,它又是用什麼高科技來實現的呢
筆者仔細閱讀了multiprocessing.Queue的源碼發現,它使用無名套接字sockerpair來完成的跨進程通訊,socketpair和socket的區別就在於socketpair不須要端口,不須要走網絡協議棧,經過內核的套接字讀寫緩衝區直接進行跨進程通訊。
當父進程要傳遞任務給子進程時,先使用pickle將任務對象進行序列化成字節數組,而後將字節數組經過socketpair的寫描述符寫入內核的buffer中。子進程接下來就能夠從buffer中讀取到字節數組,而後再使用pickle對字節數組進行反序列化來獲得任務對象,這樣總算能夠執行任務了。一樣子進程將結果傳遞給父進程走的也是同樣的流程,只不過這裏的socketpair是ResultQueue內部建立的無名套接字。
multiprocessing.Queue是支持雙工通訊,數據流向能夠是父到子,也能夠是子到父,只不過在concurrent的進程池實現中只用到了單工通訊。CallQueue是從父到子,ResultQueue是從子到父。
concurrent.futures框架很是好用,雖然內部實現機制異常複雜,讀者也無需徹底理解內部細節就能夠直接使用了。可是須要特別注意的是不論是線程池仍是進程池其內部的任務隊列都是無界的,必定要避免消費者處理不及時內存持續攀升的狀況發生。
今天,做者新書《深刻理解RPC》正式上線,限時優惠9.9元,感興趣的讀者點擊下面的鏈接進行閱讀
深刻理解 RPC : 基於 Python 自建分佈式高併發 RPC 服務