Python學習之路36-使用future處理併發

《流暢的Python》筆記。python

本篇主要討論concurrent.futures模塊,並用它實現一個簡單的併發操做。安全

1. 前言

咱們都知道,若是有大量數據要處理,或者要處理大量連接,異步操做會比順序操做快不少。Python中,concurrentasyncio則是標準庫中進行了高度封裝的兩個異步操做包。它們在底層使用了Python提供的更基礎的兩個模塊,分別是multiprocessingthreading微信

future(全小寫)並不具體指某個類的實例,並且筆者查了老多資料也沒看到哪一個類叫作future,它泛指用於異步操做的對象。concurrent.futuresasyncio這兩個模塊中有兩個名爲Future的類:concurrent.futures.Futureasyncio.Future。這兩個類的做用相同,都表示可能已經完成或還沒有完成的延遲計算。這兩個Future的實例並不該該由咱們手動建立,而應交由併發框架(也就是前面那兩個模塊)來實例化。多線程

本篇主要介紹concurrent.futures模塊的簡單使用,並會將其和順序計算進行對比,其中還會涉及GIL和阻塞型I/O的概念。asyncio將在下一篇進行介紹。併發

2. 順序執行

首先實現一個下載各國國旗的程序,隨後再將它與併發版本進行對比。如下是順序執行的版本,它下載人口前20的國家的國旗,並保存到本地:app

# 代碼2.1,flags.py
import os, time, sys  # 這麼引用只是爲了節省篇幅,並不提倡
import requests  # 第三方庫

POP20_CC = ("CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR").split()
# 若是想測試本身的併發程序,爲了不被誤認爲是DOS攻擊,請自建http服務
BASE_URL = "http://flupy.org/data/flags"  
DEST_DIR = "downloads/"

def save_flag(img, filename):  # 保存圖片到本地
    path = os.path.join(DEST_DIR, filename)
    with open(path, "wb") as fp:
        fp.write(img)

def get_flag(cc):   # 請求圖片
    url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content

def show(text):  # 每獲取一張圖片就給出一個提示
    print(text, end=" ")
    sys.stdout.flush()

def download_one(cc):   # 下載一張圖片
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + ".gif")
    return cc   # 這個return主要是給後面的併發程序用的,此處不要這行代碼也能夠

def download_many(cc_list):   # 下載多張圖片
    for cc in sorted(cc_list):
        download_one(cc)
    return len(cc_list)

def main(download_many):   # 主程序,接收一個函數爲參數
    t0 = time.time()   # 開始時間
    count = download_many(POP20_CC)
    elapsed = time.time() - t0   # 結束時間
    msg = "\n{} flags downloaded in {:.2f}s"
    print(msg.format(count, elapsed))

if __name__ == "__main__":
    main(download_many)

# 結果
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 flags downloaded in 14.83s   # 耗時,只作了一次
複製代碼

3. concurrent.futures

如今咱們用concurrent.futures模塊將上述代碼改寫爲線程版本,使其異步執行,其中有大部分函數延用上述代碼。框架

3.1 futures.as_completed

首先實現一個更具備細節的版本,咱們手動提交線程,而後再運行。這個版本只是爲了講述細節,因此並無所有下載,最大線程數也沒有設置得很高:異步

# 代碼3.1,flags_threadpool.py
from concurrent import futures
from flags import save_flag, get_flag, download_one, show, main

def download_many_ac(cc_list):
    cc_list = cc_list[:5] # 只下載前五個用於測試
    with futures.ThreadPoolExecutor(len(cc_list) / 2) as executor:
        to_do = {}   # 有意寫出字典,其實也能夠是列表或集合,但這是個慣用方法
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)
            to_do[future] = cc
            msg = "Scheduled for {}: {}"
            print(msg.format(cc, future))
        results = []
        for future in futures.as_completed(to_do):
            res = future.result()
            msg = "{} result: {!r}"
            print(msg.format(future, res))
            results.append(res)
        return len(results)

if __name__ == "__main__":
    main(download_many_ac)

# 結果:
Scheduled for BR: <Future at 0x1cbca5ab0f0 state=running>
Scheduled for CN: <Future at 0x1cbcb339b00 state=running>
Scheduled for ID: <Future at 0x1cbcb3490f0 state=running>
Scheduled for IN: <Future at 0x1cbcb349748 state=pending>
Scheduled for US: <Future at 0x1cbcb3497f0 state=pending>
CN <Future at 0x1cbcb339b00 state=finished returned str> result: 'CN'
BR <Future at 0x1cbca5ab0f0 state=finished returned str> result: 'BR'
IN <Future at 0x1cbcb349748 state=finished returned str> result: 'IN'
US <Future at 0x1cbcb3497f0 state=finished returned str> result: 'US'
ID <Future at 0x1cbcb3490f0 state=finished returned str> result: 'ID'

5 flags downloaded in 2.39s  # 20個一塊兒下載只須要1.6s左右
複製代碼

解釋async

  • concurrent.futures中有一個名爲Executor的抽象基類,由它定義執行異步操做的接口。在這個模塊中有它的兩個具體類:的ThreadPoolExecutorProcessPoolExecutor,前者是線程,後者是進程。Executor的第一個參數指定最大運行線程數。函數

  • Executor.submit(func, *args, **kwargs)方法會在線程中執行func(*args, **kwargs),它將這個方法封裝成Future對象並返回(假設這個實例叫作future)。submit方法會對future進行排期,若是運行的線程數沒達到最大線程數,則future會被當即運行,並將其狀態置爲running;不然就等待,並將其狀態置爲pending這同時也代表,線程在submit方法中啓動

  • futures.as_completed函數的第一個參數是一個future序列,在內部會被轉換成set。它返回一個迭代器,在future運行結束後產出future。在使用這個函數時還有一個慣用方法:將future放到一個字典中。由於as_completed返回的future的順序不必定是傳入時的順序,使用字典能夠很輕鬆的作一些後續處理。

  • 上述代碼中,從第31-35行的最開始兩個字母是由show函數輸出的。光看上述結果,會讓人以爲線程是在as_completed中啓動的,而之因此結果輸出得這麼整齊,是由於for循環裏只是「提交」,實際運行是在線程中。若是在每次循環最後都執行sleep(2),你將會看到這樣的結果:

    # 代碼3.2
    Scheduled for BR: <Future at 0x13e6b30b2b0 state=running>
    BR Scheduled for CN: <Future at 0x13e6b5820b8 state=running>
    CN Scheduled for ID: <Future at 0x13e6c099278 state=running>
    -- snip --
    複製代碼
  • concurrent.futures.Future有一個**result方法,它返回future中可調用對象運行完成後的結果,或者從新拋出可調用對象運行時的異常**。若是future還未運行完成,調用future.result()阻塞調用方所在的線程,直到有結果可返回;它還能夠接受一個timeout參數用於指定運行時間,若是在timeout時間內future沒有運行完畢,將拋出TimeoutError異常。

3.2 Executor.map

代碼3.1中,咱們自行提交線程,其實,上述可改成更簡潔的版本:使用Executor.map批量提交,只須要新建一個download_many函數,其他不變:

# 代碼3.3
def download_many(cc_list):
    with futures.ThreadPoolExecutor(len(cc_list)) as executor:
        res = executor.map(download_one, sorted(cc_list))
    return len(list(res))

# 結果:
JP RUBR  EG CN VN BD TR FR ID NG DE IN PK ET PH IR US CD MX 
20 flags downloaded in 1.69s
複製代碼

Executor.map()方法和內置的map函數相似,它將第一個參數(可調用對象)映射到第二個參數(可迭代對象)的每個元素上以建立Future列表。Executor.map()方法內部也是經過調用Future.submit來建立Future對象。

3.3 比較

從上面代碼能夠看出,雖然使用Executor.map()的代碼量比較少,但Executor.submit()futures.as_completed()的組合更靈活。

Executor.map()更適合於須要批量處理的狀況,好比同一函數(或者可調用對象)不一樣參數。而Executor.submit()則更適合於零散的狀況,好比不一樣函數同一參數,不一樣函數不一樣參數,甚至兩個線程毫無關聯。

4. 補充

本文主體部分已經結束,下面是一些補充。

4.1 I/O密集型和GIL

CPython自己並非線程安全的,所以有全局解釋器鎖(Global Interpreter Lock, GIL),一次只容許使用一個線程執行Python字節碼。

以這個爲基礎,按理說上述全部代碼將都不能並行下載,由於一次只能運行一個線程,而且線程版本的運行時間應該比順序版本的還要多才對(線程切換耗時)。但結果也代表,兩個線程版本的耗時都大大下降了。

這是由於,Python標準庫中全部執行阻塞型I/O操做的函數,在等待操做系統返回結果時都會釋放GIL。這就意味着,GIL幾乎對I/O密集型處理並無什麼影響,依然可使用多線程。

4.2 CPU密集型

concurrent.futures中還有一個ProcessPoolExecutor類,它實現的是真正的並行計算。它和ThreadPoolExecutro同樣,繼承自Executor,二者實現了共同的接口,所以使用concurrent.futures編寫的代碼能夠輕鬆地在線程版本與進程版本之間轉換,好比要講上述代碼改成進程版本,只需更改download_many()中的一行代碼:

# 代碼3.4
with futures.ThreadPoolExecutor(len(cc_list)) as executor:
# 改成:
with futures.ProcessPoolExecutor() as executor:
複製代碼

也能夠指定進程數,但默認是os.cpu_count()的返回值,即電腦的CPU核心數。

這個類很是適合於CPU密集型做業上。使用這個類實現的上述代碼雖然比線程版本慢一些,但依然比順序版本快不少。

4.3 進度條

若是你用最新版pip下載過第三方庫,你會發如今下載時會有一個文字進度條。在Python中想要實現這種效果可使用第三方庫tqdm,如下是它的一個簡單用法:

# 代碼3.5
import tqdm
from time import sleep

for i in tqdm.tqdm(range(1000)):
    sleep(0.01)

# 結果:
 40%|████      |  400/1000 [00:10<00:00, 98.11it/s]
複製代碼

迎你們關注個人微信公衆號"代碼港" & 我的網站 www.vpointer.net ~

相關文章
相關標籤/搜索