《流暢的Python》筆記。python
本篇主要討論concurrent.futures模塊,並用它實現一個簡單的併發操做。安全
咱們都知道,若是有大量數據要處理,或者要處理大量連接,異步操做會比順序操做快不少。Python中,concurrent
和asyncio
則是標準庫中進行了高度封裝的兩個異步操做包。它們在底層使用了Python提供的更基礎的兩個模塊,分別是multiprocessing
和threading
。微信
future
(全小寫)並不具體指某個類的實例,並且筆者查了老多資料也沒看到哪一個類叫作future
,它泛指用於異步操做的對象。concurrent.futures
和asyncio
這兩個模塊中有兩個名爲Future
的類:concurrent.futures.Future
和asyncio.Future
。這兩個類的做用相同,都表示可能已經完成或還沒有完成的延遲計算。這兩個Future
的實例並不該該由咱們手動建立,而應交由併發框架(也就是前面那兩個模塊)來實例化。多線程
本篇主要介紹concurrent.futures
模塊的簡單使用,並會將其和順序計算進行對比,其中還會涉及GIL和阻塞型I/O的概念。asyncio
將在下一篇進行介紹。併發
首先實現一個下載各國國旗的程序,隨後再將它與併發版本進行對比。如下是順序執行的版本,它下載人口前20的國家的國旗,並保存到本地:app
# 代碼2.1,flags.py
import os, time, sys # 這麼引用只是爲了節省篇幅,並不提倡
import requests # 第三方庫
POP20_CC = ("CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR").split()
# 若是想測試本身的併發程序,爲了不被誤認爲是DOS攻擊,請自建http服務
BASE_URL = "http://flupy.org/data/flags"
DEST_DIR = "downloads/"
def save_flag(img, filename): # 保存圖片到本地
path = os.path.join(DEST_DIR, filename)
with open(path, "wb") as fp:
fp.write(img)
def get_flag(cc): # 請求圖片
url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
resp = requests.get(url)
return resp.content
def show(text): # 每獲取一張圖片就給出一個提示
print(text, end=" ")
sys.stdout.flush()
def download_one(cc): # 下載一張圖片
image = get_flag(cc)
show(cc)
save_flag(image, cc.lower() + ".gif")
return cc # 這個return主要是給後面的併發程序用的,此處不要這行代碼也能夠
def download_many(cc_list): # 下載多張圖片
for cc in sorted(cc_list):
download_one(cc)
return len(cc_list)
def main(download_many): # 主程序,接收一個函數爲參數
t0 = time.time() # 開始時間
count = download_many(POP20_CC)
elapsed = time.time() - t0 # 結束時間
msg = "\n{} flags downloaded in {:.2f}s"
print(msg.format(count, elapsed))
if __name__ == "__main__":
main(download_many)
# 結果
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
20 flags downloaded in 14.83s # 耗時,只作了一次
複製代碼
如今咱們用concurrent.futures
模塊將上述代碼改寫爲線程版本,使其異步執行,其中有大部分函數延用上述代碼。框架
首先實現一個更具備細節的版本,咱們手動提交線程,而後再運行。這個版本只是爲了講述細節,因此並無所有下載,最大線程數也沒有設置得很高:異步
# 代碼3.1,flags_threadpool.py
from concurrent import futures
from flags import save_flag, get_flag, download_one, show, main
def download_many_ac(cc_list):
cc_list = cc_list[:5] # 只下載前五個用於測試
with futures.ThreadPoolExecutor(len(cc_list) / 2) as executor:
to_do = {} # 有意寫出字典,其實也能夠是列表或集合,但這是個慣用方法
for cc in sorted(cc_list):
future = executor.submit(download_one, cc)
to_do[future] = cc
msg = "Scheduled for {}: {}"
print(msg.format(cc, future))
results = []
for future in futures.as_completed(to_do):
res = future.result()
msg = "{} result: {!r}"
print(msg.format(future, res))
results.append(res)
return len(results)
if __name__ == "__main__":
main(download_many_ac)
# 結果:
Scheduled for BR: <Future at 0x1cbca5ab0f0 state=running>
Scheduled for CN: <Future at 0x1cbcb339b00 state=running>
Scheduled for ID: <Future at 0x1cbcb3490f0 state=running>
Scheduled for IN: <Future at 0x1cbcb349748 state=pending>
Scheduled for US: <Future at 0x1cbcb3497f0 state=pending>
CN <Future at 0x1cbcb339b00 state=finished returned str> result: 'CN'
BR <Future at 0x1cbca5ab0f0 state=finished returned str> result: 'BR'
IN <Future at 0x1cbcb349748 state=finished returned str> result: 'IN'
US <Future at 0x1cbcb3497f0 state=finished returned str> result: 'US'
ID <Future at 0x1cbcb3490f0 state=finished returned str> result: 'ID'
5 flags downloaded in 2.39s # 20個一塊兒下載只須要1.6s左右
複製代碼
解釋:async
在concurrent.futures
中有一個名爲Executor
的抽象基類,由它定義執行異步操做的接口。在這個模塊中有它的兩個具體類:的ThreadPoolExecutor
和ProcessPoolExecutor
,前者是線程,後者是進程。Executor
的第一個參數指定最大運行線程數。函數
Executor.submit(func, *args, **kwargs)
方法會在線程中執行func(*args, **kwargs)
,它將這個方法封裝成Future
對象並返回(假設這個實例叫作future
)。submit
方法會對future
進行排期,若是運行的線程數沒達到最大線程數,則future
會被當即運行,並將其狀態置爲running
;不然就等待,並將其狀態置爲pending
。這同時也代表,線程在submit
方法中啓動。
futures.as_completed
函數的第一個參數是一個future
序列,在內部會被轉換成set
。它返回一個迭代器,在future
運行結束後產出future
。在使用這個函數時還有一個慣用方法:將future
放到一個字典中。由於as_completed
返回的future
的順序不必定是傳入時的順序,使用字典能夠很輕鬆的作一些後續處理。
上述代碼中,從第31-35行的最開始兩個字母是由show
函數輸出的。光看上述結果,會讓人以爲線程是在as_completed
中啓動的,而之因此結果輸出得這麼整齊,是由於for
循環裏只是「提交」,實際運行是在線程中。若是在每次循環最後都執行sleep(2)
,你將會看到這樣的結果:
# 代碼3.2
Scheduled for BR: <Future at 0x13e6b30b2b0 state=running>
BR Scheduled for CN: <Future at 0x13e6b5820b8 state=running>
CN Scheduled for ID: <Future at 0x13e6c099278 state=running>
-- snip --
複製代碼
concurrent.futures.Future
有一個**result
方法,它返回future
中可調用對象運行完成後的結果,或者從新拋出可調用對象運行時的異常**。若是future
還未運行完成,調用future.result()
將阻塞調用方所在的線程,直到有結果可返回;它還能夠接受一個timeout
參數用於指定運行時間,若是在timeout
時間內future
沒有運行完畢,將拋出TimeoutError
異常。
在代碼3.1
中,咱們自行提交線程,其實,上述可改成更簡潔的版本:使用Executor.map
批量提交,只須要新建一個download_many
函數,其他不變:
# 代碼3.3
def download_many(cc_list):
with futures.ThreadPoolExecutor(len(cc_list)) as executor:
res = executor.map(download_one, sorted(cc_list))
return len(list(res))
# 結果:
JP RUBR EG CN VN BD TR FR ID NG DE IN PK ET PH IR US CD MX
20 flags downloaded in 1.69s
複製代碼
Executor.map()
方法和內置的map
函數相似,它將第一個參數(可調用對象)映射到第二個參數(可迭代對象)的每個元素上以建立Future
列表。Executor.map()
方法內部也是經過調用Future.submit
來建立Future
對象。
從上面代碼能夠看出,雖然使用Executor.map()
的代碼量比較少,但Executor.submit()
和futures.as_completed()
的組合更靈活。
Executor.map()
更適合於須要批量處理的狀況,好比同一函數(或者可調用對象)不一樣參數。而Executor.submit()
則更適合於零散的狀況,好比不一樣函數同一參數,不一樣函數不一樣參數,甚至兩個線程毫無關聯。
本文主體部分已經結束,下面是一些補充。
CPython自己並非線程安全的,所以有全局解釋器鎖(Global Interpreter Lock, GIL),一次只容許使用一個線程執行Python字節碼。
以這個爲基礎,按理說上述全部代碼將都不能並行下載,由於一次只能運行一個線程,而且線程版本的運行時間應該比順序版本的還要多才對(線程切換耗時)。但結果也代表,兩個線程版本的耗時都大大下降了。
這是由於,Python標準庫中全部執行阻塞型I/O操做的函數,在等待操做系統返回結果時都會釋放GIL。這就意味着,GIL幾乎對I/O密集型處理並無什麼影響,依然可使用多線程。
concurrent.futures
中還有一個ProcessPoolExecutor
類,它實現的是真正的並行計算。它和ThreadPoolExecutro
同樣,繼承自Executor
,二者實現了共同的接口,所以使用concurrent.futures
編寫的代碼能夠輕鬆地在線程版本與進程版本之間轉換,好比要講上述代碼改成進程版本,只需更改download_many()
中的一行代碼:
# 代碼3.4
with futures.ThreadPoolExecutor(len(cc_list)) as executor:
# 改成:
with futures.ProcessPoolExecutor() as executor:
複製代碼
也能夠指定進程數,但默認是os.cpu_count()
的返回值,即電腦的CPU核心數。
這個類很是適合於CPU密集型做業上。使用這個類實現的上述代碼雖然比線程版本慢一些,但依然比順序版本快不少。
若是你用最新版pip
下載過第三方庫,你會發如今下載時會有一個文字進度條。在Python中想要實現這種效果可使用第三方庫tqdm
,如下是它的一個簡單用法:
# 代碼3.5
import tqdm
from time import sleep
for i in tqdm.tqdm(range(1000)):
sleep(0.01)
# 結果:
40%|████ | 400/1000 [00:10<00:00, 98.11it/s]
複製代碼
迎你們關注個人微信公衆號"代碼港" & 我的網站 www.vpointer.net ~