Python 使用期物處理併發

  抨擊線程的每每是系統程序員,他們考慮的使用場景對通常的應用程序員來講,也許一輩子都不會遇到……應用程序員遇到的使用場景,99% 的狀況下只需知道如何派生一堆獨立的線程,而後用隊列收集結果。html

 

示例:網絡下載的三種風格 python

  爲了高效處理網絡 I/O,須要使用併發,由於網絡有很高的延遲,因此爲了避免浪費 CPU 週期去等待,最好在收到網絡響應以前作些其餘的事。git

  爲了經過代碼說明這一點,我寫了三個示例程序,從網上下載 20 個國家的國旗圖像。第一個示例程序 flags.py 是依序下載的:下載完一個圖像,並將其保存在硬盤中以後,才請求下一個圖像。另外兩個腳本是併發下載的:幾乎同時請求全部圖像,每下載完一個文件就保存一個文件。flags_threadpool.py 腳本使用 concurrent.futures 模塊,而flags_asyncio.py 腳本使用 asyncio 包。程序員

運行 flags.py、flags_threadpool.py 和 flags_asyncio.py 腳本獲得的結果github

$ python3 flags.py
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN ➊ 每次運行腳本後,首先顯示下載過程當中下載完畢的國家代碼,最後顯示一個消息,說明耗時
20 flags downloaded in 7.26s                    ➋ flags.py 腳本下載 20 個圖像平均用時 7.18 秒
$ python3 flags.py
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
20 flags downloaded in 7.20s
$ python3 flags.py
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
20 flags downloaded in 7.09s
$ python3 flags_threadpool.py
DE BD CN JP ID EG NG BR RU CD IR MX US PH FR PK VN IN ET TR
20 flags downloaded in 1.37s                    ➌ flags_threadpool.py 腳本平均用時 1.40 秒
$ python3 flags_threadpool.py
EG BR FR IN BD JP DE RU PK PH CD MX ID US NG TR CN VN ET IR
20 flags downloaded in 1.60s
$ python3 flags_threadpool.py
BD DE EG CN ID RU IN VN ET MX FR CD NG US JP TR PK BR IR PH
20 flags downloaded in 1.22s
$ python3 flags_asyncio.py                     ➍ flags_asyncio.py 腳本平均用時 1.35 秒
BD BR IN ID TR DE CN US IR PK PH FR RU NG VN ET MX EG JP CD
20 flags downloaded in 1.36s
$ python3 flags_asyncio.py
RU CN BR IN FR BD TR EG VN IR PH CD ET ID NG DE JP PK MX US
20 flags downloaded in 1.27s
$ python3 flags_asyncio.py
RU IN ID DE BR VN PK MX US IR ET EG NG BD FR CN JP PH CD TR ➎ 注意國家代碼的順序:對併發下載的腳原本說,每次下載的順序都不一樣
20 flags downloaded in 1.42s

   兩個併發下載的腳本之間性能差別不大,不過都比依序下載的腳本快 5倍多。這只是一個特別小的任務,若是把下載的文件數量增長到幾百個,併發下載的腳本能比依序下載的腳本快 20 倍或更多。安全

依序下載的腳本網絡

🌰  flags.py:依序下載的腳本;另外兩個腳本會重用其中幾個函數多線程

 1 import os
 2 import sys
 3 import time
 4 
 5 import requests
 6 
 7 POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
 8             'MX PH VN ET EG DE IR TR CD FR').split()
 9 
10 BASE_URL = 'http://flupy.org/data/flags'            # 下載國旗網站的入口
11 
12 DEST_DIR = 'downloads/'
13 
14 os.mkdir(DEST_DIR) if not os.path.exists(DEST_DIR) else 'ok'    #判斷目錄是否存在,不存在在就建立
15 
16 
17 def save_flag(img, filename):                       # 保存圖片的函數,傳遞img的二進制流和還有國旗的名稱
18     path = os.path.join(DEST_DIR, filename)
19     with open(path, 'wb') as fp:
20         fp.write(img)
21 
22 
23 def get_flag(cc):                                   # 獲取國旗的下載地址,經過requests中的content獲取二進制流
24     url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
25     resp = requests.get(url)
26     return resp.content
27 
28 
29 def show(text):                                     # 顯示一個字符串,而後刷新sys.stdout
30     print(text, end=' ')
31     sys.stdout.flush()
32 
33 
34 def download_many(cc_list):                         # download_many 是與併發實現比較的關鍵函數
35     for cc in sorted(cc_list):
36         image = get_flag(cc)
37         show(cc)
38         save_flag(image, cc.lower() + '.gif')
39 
40     return len(cc_list)
41 
42 
43 def main(download_many):                            # main 函數記錄並報告運行 download_many 函數以後的耗時
44     t0 = time.time()
45     count = download_many(POP20_CC)
46     elapsed = time.time() - t0
47     msg = '\n{} flags downloaded in {:.2f}s'
48     print(msg.format(count, elapsed))
49 
50 
51 if __name__ == "__main__":
52     main(download_many)

使用concurrent.futures模塊下載併發

  concurrent.futures 模塊的主要特點是 ThreadPoolExecutor 和ProcessPoolExecutor 類,這兩個類實現的接口能分別在不一樣的線程或進程中執行可調用的對象。這兩個類在內部維護着一個工做線程或進程池,以及要執行的任務隊列。不過,這個接口抽象的層級很高,像下載國旗這種簡單的案例,無需關心任何實現細節。app

🌰 flags_threadpool.py:使用futures.ThreadPoolExecutor 類實現多線程下載的腳本 

 1 from concurrent import futures
 2 
 3 from flags import save_flag, get_flag, show, main
 4 
 5 
 6 MAX_WORKERS = 20                                    # 設定 ThreadPoolExecutor 類最多使用幾個線程
 7 
 8 
 9 def download_one(cc):                               # 下載一個圖像的函數;這是在各個線程中執行的函數
10     image = get_flag(cc)
11     show(cc)
12     save_flag(image, cc.lower() + '.gif')
13     return cc
14 
15 
16 def download_many(cc_list):
17     workers = min(MAX_WORKERS, len(cc_list))        # 設定工做的線程數量:使用容許的最大值(MAX_WORKERS)與要
18                                                     # 處理的數量之間較小的那個值,以避免建立多餘的線程。
19     with futures.ThreadPoolExecutor(workers) as executor:  # 使用工做的線程數實例化 ThreadPoolExecutor類
20         res = executor.map(download_one, sorted(cc_list))  # map 方法的做用與內置的 map 函數相似,不過 download_one 函數會在
21                                                            # 多個線程中併發調用;map 方法返回一個生成器,所以能夠迭代,獲取各個函數返回的值。
22     return len(list(res))                           #返回獲取的結果數量;若是有線程拋出異常,異常會在這裏拋出
23 
24 if __name__ == "__main__":
25     main(download_many)

期物在哪裏

  期物是 concurrent.futures 模塊和 asyncio 包的重要組件,從 Python 3.4 起,標準庫中有兩個名爲 Future 的類:concurrent.futures.Future 和 asyncio.Future。這兩個類的做用相同:兩個 Future 類的實例都表示可能已經完成或者還沒有完成的延遲計算。這與 Twisted 引擎中的 Deferred 類、Tornado 框架中的Future 類,以及多個 JavaScript 庫中的 Promise 對象相似。

  期物封裝待完成的操做,能夠放入隊列,完成的狀態能夠查詢,獲得結果(或拋出異常)後能夠獲取結果(或異常)。

  咱們要記住一件事:一般狀況下本身不該該建立期物,而只能由併發框架(concurrent.futures 或 asyncio)實例化。緣由很簡單:期物表示終將發生的事情,而肯定某件事會發生的惟一方式是執行的時間已經排定。所以,只有排定把某件事交給concurrent.futures.Executor 子類處理時,纔會建立concurrent.futures.Future 實例。例如,Executor.submit() 方法的參數是一個可調用的對象,調用這個方法後會爲傳入的可調用對象排期,並返回一個期物。

  這兩種期物都有 .done() 方法,這個方法不阻塞,返回值是布爾值,指明期物連接的可調用對象是否已經執行。客戶端代碼一般不會詢問期物是否運行結束,而是會等待通知。所以,兩個 Future 類都有.add_done_callback() 方法:這個方法只有一個參數,類型是可調用的對象,期物運行結束後會調用指定的可調用對象。

  此外,還有 .result() 方法。在期物運行結束後調用的話,這個方法在兩個 Future 類中的做用相同:返回可調用對象的結果,或者從新拋出執行可調用的對象時拋出的異常。但是,若是期物沒有運行結束,result 方法在兩個 Future 類中的行爲相差很大。對concurrency.futures.Future 實例來講,調用 f.result() 方法會阻塞調用方所在的線程,直到有結果可返回。此時,result 方法能夠接收可選的 timeout 參數,若是在指定的時間內期物沒有運行完畢,會拋出 TimeoutError 異常。

  爲了使用 futures.as_completed 函數,只需修改 download_many 函數,把較抽象的 executor.map 調用換成兩個 for 循環:一個用於建立並排按期物,另外一個用於獲取期物的結果。同時,咱們會添加幾個print 調用,顯示運行結束先後的期物。修改後的 download_many 函數如示例。

🌰 flags_threadpool_ac.py:把download_many 函數中的executor.map 方法換成 executor.submit 方法和futures.as_completed 函數

 1 from concurrent import futures
 2 
 3 from flags_threadpool import download_one, main
 4 
 5 def download_many(cc_list):
 6     cc_list = cc_list[:5]           # 此次演示只使用人口最多的 5 個國家
 7     with futures.ThreadPoolExecutor(max_workers=3) as executor: # 把 max_workers 硬編碼爲 3,以便在輸出中觀察待完成的期物
 8         to_do = []
 9         for cc in sorted(cc_list):  # 按照字母表順序迭代國家代碼,明確代表輸出的順序與輸入一致
10             future = executor.submit(download_one, cc)  # executor.submit 方法排定可調用對象的執行時間,
11                                                         # 而後返回一個期物,表示這個待執行的操做。
12             to_do.append(future)    # 存儲各個期物,後面傳給 as_completed 函數
13             msg = 'Scheduled for {}: {}'
14             print(msg.format(cc, future))   # 顯示一個消息,包含國家代碼和對應的期物
15 
16         results = []
17         for future in futures.as_completed(to_do):  # as_completed 函數在期物運行結束後產出期物
18             res = future.result()   # 獲取該期物的結果
19             msg = '{} result: {!r}'
20             print(msg.format(future, res))  # 顯示期物及其結果
21             results.append(res)
22 
23     return len(results)
24 
25 if __name__ == "__main__":
26     main(download_many)

flags_threadpool_ac.py 腳本的輸出

$ python3 flags_threadpool_ac.py
Scheduled for BR: <Future at 0x100791518 state=running> ➊ 排定的期物按字母表排序;期物的repr()方法會顯示期物的狀態前三個running,覺得有三個線程可用
Scheduled for CN: <Future at 0x100791710 state=running>
Scheduled for ID: <Future at 0x100791a90 state=running>
Scheduled for IN: <Future at 0x101807080 state=pending> ➋ 後兩個期物的狀態是pending,等待有線程可用
Scheduled for US: <Future at 0x101807128 state=pending>
CN <Future at 0x100791710 state=finished returned str> result: 'CN' ➌ 這一行裏的第一個CN是運行在一個工做線程中的download_one函數裏輸出的,隨後的內容是download_many函數輸出的
BR ID <Future at 0x100791518 state=finished returned str> result: 'BR' ➍ 這裏有兩個線程輸出國家代碼,而後主線程中有download_many函數輸出第一個線程的結果
<Future at 0x100791a90 state=finished returned str> result: 'ID'
IN <Future at 0x101807080 state=finished returned str> result: 'IN'
US <Future at 0x101807128 state=finished returned str> result: 'US'
5 flags downloaded in 0.70s

 

阻塞型I/O和GIL

  CPython 解釋器自己就不是線程安全的,所以有全局解釋器鎖(GIL),一次只容許使用一個線程執行 Python 字節碼。所以,一個 Python 進程一般不能同時使用多個 CPU 核心。

  編寫 Python 代碼時沒法控制 GIL;不過,執行耗時的任務時,可使用一個內置的函數或一個使用 C 語言編寫的擴展釋放 GIL。其實,有個使用 C 語言編寫的 Python 庫能管理 GIL,自行啓動操做系統線程,利用所有可用的 CPU 核心。這樣作會極大地增長庫代碼的複雜度,所以大多數庫的做者都不這麼作。

  然而,標準庫中全部執行阻塞型 I/O 操做的函數,在等待操做系統返回結果時都會釋放 GIL。這意味着在 Python 語言這個層次上可使用多線程,而 I/O 密集型 Python 程序能從中受益:一個 Python 線程等待網絡響應時,阻塞型 I/O 函數會釋放 GIL,再運行一個線程。

 

使用concurrent.futures模塊啓動進程

  concurrent.futures 模塊的文檔(https://docs.python.org/3/library/concurrent.futures.html)副標題是「Launching parallel tasks」(執行並行任務)。這個模塊實現的是真正的並行計算,由於它使用 ProcessPoolExecutor 類把工做分配給多個Python 進程處理。所以,若是須要作 CPU 密集型處理,使用這個模塊能繞開 GIL,利用全部可用的 CPU 核心。

  ProcessPoolExecutor 和 ThreadPoolExecutor 類都實現了通用的Executor 接口,所以使用 concurrent.futures 模塊能特別輕鬆地把基於線程的方案轉成基於進程的方案。

下載國旗的示例或其餘 I/O 密集型做業使用 ProcessPoolExecutor 類得不到任何好處。這一點易於驗證,只需把下面 🌰 中這幾行: 

 def download_many(cc_list):
     workers = min(MAX_WORKERS, len(cc_list))
     with futures.ThreadPoolExecutor(workers) as executor:

改爲:

def download_many(cc_list):
    with futures.ProcessPoolExecutor() as executor: 

  對簡單的用途來講,這兩個實現 Executor 接口的類惟一值得注意的區別是,ThreadPoolExecutor.__init__ 方法須要 max_workers 參數,指定線程池中線程的數量。在 ProcessPoolExecutor 類中,那個參數是可選的,並且大多數狀況下不使用——默認值是os.cpu_count() 函數返回的 CPU 數量。這樣處理說得通,由於對CPU 密集型的處理來講,不可能要求使用超過 CPU 數量的值程。而對I/O 密集型處理來講能夠在一個 ThreadPoolExecutor 實例中使用 10個、100 個或 1000 個線程;最佳線程數取決於作的是什麼事,以及可用內存有多少,所以要仔細測試才能找到最佳的線程數。

 

實驗Executor.map方法

  若想併發運行多個可調用的對象,最簡單的方式是使用 🌰 中見過的 Executor.map 方法

🌰 demo_executor_map.py:簡單演示ThreadPoolExecutor 類的 map 方法

 1 from time import sleep, strftime
 2 from concurrent import futures
 3 
 4 
 5 def display(*args):         # 打印傳遞的參數,並打印時間戳[HH:MM:SS]格式
 6     print(strftime('[%H:%M:%S]'), end=' ')
 7     print(*args)
 8 
 9 
10 def loiter(n):              # 打印信息,休眠n秒
11     msg = '{}loiter({}): doing nothing for {}s...'
12     display(msg.format('\t' * n, n, n))
13     sleep(n)
14     msg = '{}loiter({}): done.'
15     display(msg.format('\t' * n, n))
16     return n * 10           # loiter 函數返回 n * 10,以便讓咱們瞭解收集結果的方式
17 
18 
19 def main():
20     display('Script starting.')
21     executor = futures.ThreadPoolExecutor(max_workers=3)    # 建立 ThreadPoolExecutor 實例,有 3 個線程
22     results = executor.map(loiter, range(5))  # 把5個任務交給executor(3個會提早運行,另外2個等待)
23     display('results:', results)    # 生成器,顯示results的結果
24     display('Waiting for individual results:')
25     for i, result in enumerate(results):
26         display('result {}: {}'.format(i, result))
27 
28 main()

以上代碼執行的結果爲:

[20:15:11] Script starting.
[20:15:11] loiter(0): doing nothing for 0s...
[20:15:11] loiter(0): done.
[20:15:11]     loiter(1): doing nothing for 1s...
[20:15:11]         loiter(2): doing nothing for 2s...
[20:15:11] results: <generator object Executor.map.<locals>.result_iterator at 0x102360f10>
[20:15:11] Waiting for individual results:
[20:15:11] result 0: 0
[20:15:11]             loiter(3): doing nothing for 3s...
[20:15:12]     loiter(1): done.
[20:15:12]                 loiter(4): doing nothing for 4s...
[20:15:12] result 1: 10
[20:15:13]         loiter(2): done.
[20:15:13] result 2: 20
[20:15:14]             loiter(3): done.
[20:15:14] result 3: 30
[20:15:16]                 loiter(4): done.
[20:15:16] result 4: 40

 

顯示下載進度 

  TQDM 包特別易於使用,項目的 README.md 文件(https://github.com/noamraph/tqdm/blob/master/README.md)中有個 GIF動畫,演示了最簡單的用法。安裝 tqdm 包以後, 在 Python 控制檯中輸入下述代碼,會在註釋那裏看到進度條動畫:

>>> import time
>>> from tqdm import tqdm
>>> for i in tqdm(range(1000)):
...     time.sleep(.01)
... 
 51%|████████████████████▎                   | 509/1000 [00:05<00:05, 83.79it/s]

  除了這個靈巧的效果以外,tqdm 函數的實現方式也頗有趣:能處理任何可迭代的對象,生成一個迭代器;使用這個迭代器時,顯示進度條和完成所有迭代預計的剩餘時間。爲了計算預計剩餘時間,tqdm 函數要獲取一個能使用 len 函數肯定大小的可迭代對象,或者在第二個參數中指定預期的元素數量。

相關文章
相關標籤/搜索