python併發 1:使用 futures 處理併發

做爲Python程序員,平時不多使用併發編程,偶爾使用也只須要派生出一批獨立的線程,而後放到隊列中,批量執行。因此,不誇張的說,雖然我知道線程、進程、並行、併發的概念,但每次使用的時候可能還須要再打開文檔回顧一下。python

如今這一篇仍是 《流暢的python》讀書筆記,譯者在這裏把future 翻譯爲「期物」,我以爲不太合適,既然future不能找到一個合適的詞彙,暫時仍是直接使用 future 吧。程序員

concurrent.futures

future 是一種對象,表示異步執行的操做。這個概念是 concurrent.futures模塊和asyncio包的基礎。編程

concurrent.futures 模塊是Python3.2 引入的,對於Python2x 版本,Python2.5 以上的版本能夠安裝 futures 包來使用這個模塊。安全

從Python3.4起,標準庫中有兩個爲Future的類:concurrent.futures.Future 和 asyncio.Future。這兩個類做用相同:兩個Future類的實例都表示可能已經完成或未完成的延遲計算。bash

Future 封裝待完成的操做,可放入隊列,完成的狀態能夠查詢,獲得結果(或拋出異常)後能夠獲取結果(或異常)。網絡

咱們知道,若是程序中包含I/O操做,程序會有很高的延遲,CPU會處於等待狀態,這時若是咱們不使用併發會浪費不少時間。併發

示例

咱們先舉個例子:app

下邊是有兩段代碼,主要功能都是從網上下載人口前20的國際的國旗:
第一段代碼(flagss.py)是依序下載:下載完一個圖片後保存到硬盤,而後請求下一張圖片;
第二段代碼(flagss_threadpool.py)使用 concurrent.futures 模塊,批量下載10張圖片。異步

運行分別運行兩段代碼3次,結果以下:async

images.py 的結果以下

$ python flags.py
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 flags downloaded in 6.18s

$ python flags.py
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 flags downloaded in 5.67s

$ python flags.py
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN 
20 flags downloaded in 6.55s

能夠看到,依次下載10張圖片,平均須要6秒

flags_threadpool.py 的結果以下:

$ python flags_threadpool.py
NG EG VN BR JP FR DE CN TR BD PK MX PH US RU IN ET CD ID IR 
20 flags downloaded in 2.12s

$ python flags_threadpool.py
BR IN DE FR TR RU EG NG JP CN ID ET PK MX PH US IR CD VN BD 
20 flags downloaded in 2.23s

$ python flags_threadpool.py
CN BR DE ID NG RU TR IN MX US IR BD VN CD PH EG FR JP ET PK 
20 flags downloaded in 1.18s

使用 concurrent.futures 後,下載10張圖片平均須要2秒

經過上邊的結果咱們發現使用 concurrent.futures 後,下載效率大幅提高。

下邊咱們來看下這兩段代碼。

同步執行的代碼flags.py:

#! -*- coding: utf-8 -*-

import os
import time
import sys

import requests  # <1>

POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()  # <2>

BASE_URL = 'http://flupy.org/data/flags'  # <3>

DEST_DIR = 'images/'  # <4>


# 保存圖片
def save_flag(img, filename):  # <5>
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


# 下載圖片
def get_flag(cc):  # <6>
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    # 這裏咱們使用 requests 包,須要先經過pypi安裝
    resp = requests.get(url)
    return resp.content


# 顯示一個字符串,而後刷新sys.stdout,目的是在一行消息中看到進度
def show(text):  # <7>
    print(text, end=' ')
    sys.stdout.flush()


def download_many(cc_list):  # <8>
    for cc in sorted(cc_list):  # <9>
        image = get_flag(cc)
        show(cc)
        save_flag(image, cc.lower() + '.gif')

    return len(cc_list)


def main(download_many):  # <10>
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))


if __name__ == '__main__':
    main(download_many)  # <11>

使用 concurrent.future 併發的代碼 flags_threadpool.py

#! -*- coding: utf-8 -*-

from concurrent import futures

from flags import save_flag, get_flag, show, main

# 設定ThreadPoolExecutor 類最多使用幾個線程
MAX_WORKERS = 20


# 下載一個圖片
def download_one(cc):
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc


def download_many(cc_list):
    # 設定工做的線程數量,使用約需的最大值與要處理的數量直接較小的那個值,以避免建立多餘的線程
    workers = min(MAX_WORKERS, len(cc_list))  # <4>
    # 使用工做的線程數實例化ThreadPoolExecutor類;
    # executor.__exit__方法會調用executor.shutdown(wait=True)方法,
    # 它會在全部線程都執行完畢前阻塞線程
    with futures.ThreadPoolExecutor(workers) as executor:  # <5>
        # map 與內置map方法相似,不過download_one 函數會在多個線程中併發調用;
        # map 方法返回一個生成器,所以能夠迭代,
        # 迭代器的__next__方法調用各個Future 的 result 方法
        res = executor.map(download_one, sorted(cc_list))

    # 返回獲取的結果數量;若是有現成拋出異常,會在這裏拋出
    # 這與隱式調用next() 函數從迭代器中獲取相應的返回值同樣。
    return len(list(res))  # <7>
    return len(results)


if __name__ == '__main__':
    main(download_many)

上邊的代碼,咱們對 concurrent.futures 的使用有了大體的瞭解。但 future 在哪裏呢,咱們並無看到。

Future 是 concurrent.futures 模塊和 asyncio 包的重要組件。從Python3.4起,標準庫中有兩個爲Future的類:concurrent.futures.Future 和 asyncio.Future。這兩個Future做用相同。

Future 封裝待完成的操做,可放入隊列,完成的狀態能夠查詢,獲得結果(或拋出異常)後能夠獲取結果(或異常)。
Future 表示終將發生的事情,而肯定某件事情會發生的惟一方式是執行的時間已經排定。所以只有把某件事交給 concurrent.futures.Executor 子類處理時,纔會建立 concurrent.futures.Future 實例。

例如,調用Executor.submit() 方法的參數是一個可調用的對象,調用這個方法後會爲傳入的可調用對象排期,並返回一個Future。

Future 有三個重要的方法:

  • .done() 返回布爾值,表示Future 是否已經執行

  • .add_done_callback() 這個方法只有一個參數,類型是可調用對象,Future運行結束後會回調這個對象。

  • .result() 若是 Future 運行結束後調用result(), 會返回可調用對象的結果或者拋出執行可調用對象時拋出的異常,若是是 Future 沒有運行結束時調用 f.result()方法,這時會阻塞調用方所在的線程,直到有結果返回。此時result 方法還能夠接收 timeout 參數,若是在指定的時間內 Future 沒有運行完畢,會拋出 TimeoutError 異常。

asyncio.Future.result 方法不支持設定超時時間,若是想獲取 Future 的結果,可使用 yield from 結構

爲了加深對 Future 的理解,如今咱們修改下 flags_threadpool.py download_many 函數。

def download_many(cc_list):
    cc_list = cc_list[:5]
    with futures.ThreadPoolExecutor(max_workers=3) as executor:
        to_do = []
        # 用於建立並排定 future
        for cc in sorted(cc_list):
            # submit 方法排定可調用對象的執行時間而後返回一個future,表示這個待執行的操做
            future = executor.submit(download_one, cc)
            to_do.append(future)
            msg = 'Scheduled for {}: {}'
            print(msg.format(cc, future))
        
        results = []
        # 用於獲取future 結果
        # as_completed 接收一個future 列表,返回值是一個迭代器,在運行結束後產出future
        for future in futures.as_completed(to_do):
            res = future.result()
            msg = '{} result: {!r}'
            print(msg.format(future, res))
            results.append(res)
    
    return len(results)

如今執行代碼,運行結果以下:

Scheduled for BR: <Future at 0x10d43cb70 state=running>
Scheduled for CN: <Future at 0x10d4434a8 state=running>
Scheduled for ID: <Future at 0x10d443ef0 state=running>
Scheduled for IN: <Future at 0x10d443978 state=pending>
Scheduled for US: <Future at 0x10d44f748 state=pending>
BR <Future at 0x10d43cb70 state=finished returned str> result: 'BR'
IN <Future at 0x10d443978 state=finished returned str> result: 'IN'
CN <Future at 0x10d4434a8 state=finished returned str> result: 'CN'
ID <Future at 0x10d443ef0 state=finished returned str> result: 'ID'
US <Future at 0x10d44f748 state=finished returned str> result: 'US'

5 flags downloaded in 1.47s

從結果能夠看到,future 的 repr() 方法會顯示狀態,前三個 是running 是由於咱們設定了三個進程,因此後兩個是pendding 狀態。若是將max_workers參數設置爲5,結果就會全都是 running。

雖然,使用 future 的腳步比第一個腳本的執行速度快了不少,但因爲受GIL的限制,下載並非並行的。

GIL(Global Interpreter Lock)和阻塞型I/O

CPython 解釋器自己不是線程安全的,所以解釋器被一個全局解釋器鎖保護着,它確保任什麼時候候都只有一個Python線程執行。

然而,Python標準庫中全部執行阻塞型I/O操做的函數,在等待系統返回結果時都會釋放GIL。這意味着I/O密集型Python程序能從中受益:一個Python線程等待網絡響應時,阻塞型I/O函數會釋放GIL,再運行一個線程。

Python 標準庫中全部阻塞型I/O函數都會釋放GIL,容許其餘線程運行。time.sleep()函數也會釋放GIL。

那麼如何在CPU密集型做業中使用 concurrent.futures 模塊繞開GIL呢?

答案是 使用 ProcessPoolExecutor 類。

使用這個模塊能夠在作CPU密集型工做是繞開GIL,利用全部可用核心。

ThreadPoolExecutor 和 ProcessPoolExecutor 都實現了通用的 Executor 接口,因此,咱們能夠輕鬆的將基於線程的方案改成使用進程的方案。

好比下邊這樣:

def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))
    with futures.ThreadPoolExecutor(workers) as executor:
        pass

# 改爲
def download_many(cc_list):
    with futures.ProcessPoolExecutor() as executor:
        pass

須要注意的是,ThreadPoolExecutor 須要指定 max_workers 參數,
而 ProcessPoolExecutor 的這個參數是可選的默認值是 os.cup_count()(計算機cpu核心數)。

ProcessPoolExecutor 的價值主要體如今CPU密集型做業上。

使用Python處理CPU密集型工做,應該試試PyPy,會有更高的執行速度。

如今咱們回到開始的代碼,看下 Executor.map 函數。

文檔中對map函數的介紹以下。

map(func, *iterables, timeout=None, chunksize=1)

等同於 map(func, *iterables),不一樣的是 func 是異步執行的,而且能夠同時進行對 func 的多個調用。若是調用 __next__(),則返回的迭代器提出 concurrent.futures.TimeoutError,而且在從 Executor.map() 的原始調用起的 timeout 秒以後結果不可用。 timeout 能夠是int或float。若是未指定 timeout 或 None,則等待時間沒有限制。若是調用引起異常,那麼當從迭代器檢索其值時,將引起異常。當使用 ProcessPoolExecutor 時,此方法將 iterables 分紅多個塊,它做爲單獨的任務提交到進程池。這些塊的(近似)大小能夠經過將 chunksize 設置爲正整數來指定。對於很是長的迭代,與默認大小1相比,使用大值 chunksize 能夠顯着提升性能。使用 ThreadPoolExecutor,chunksize 沒有效果。

在 3.5 版更改: 添加了 chunksize 參數。

Executor.map 還有個特性比較有用,那就是這個函數返回結果的順序於調用開始的順序是一致的。若是第一個調用稱其結果用時10秒,其餘調用只用1秒,代碼會阻塞10秒,獲取map方法返回的生成器產出的第一個結果。

若是不是獲取到全部結果再處理,一般會使用 Executor.submit + Executor.as_completed 組合使用的方案。

Executor.submit + Executor.as_completed 這個組合更靈活,由於submit方法能處理不一樣的可調用對象和參數,而executor.map 只能處理參數不一樣的同一個可調用對象。此外,傳給futures.as_completed 函數的期物集合能夠來自不一樣的 Executor 實例。

future 的異常處理

futures 有三個異常類:

  • exception concurrent.futures.CancelledError 在future取消時引起。

  • exception concurrent.futures.TimeoutError 在future操做超過給定超時時觸發。

  • exception concurrent.futures.process.BrokenProcessPool
    從 RuntimeError 派生,當 ProcessPoolExecutor 的一個工人以非乾淨方式終止(例如,若是它從外部被殺死)時,引起此異常類。

咱們先看一下,future.result() 出現異常的處理狀況。代碼改動以下:

# 將第一個 CN 改成CN1 也能夠是其它任意錯誤代碼
POP20_CC = ('CN1 IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()


def get_flag(cc):  # <6>
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    if resp.status_code != 200:  # <1>
        resp.raise_for_status() # 若是不是200 拋出異常
    return resp.content

def download_one(cc):
    try:
        image = get_flag(cc)
    # 捕獲 requests.exceptions.HTTPError
    except requests.exceptions.HTTPError as exc:  #
        # 若是有異常 直接拋出
        raise
    else:
        save_flag(image, cc.lower() + '.gif')
    return cc

如今執行代碼,會發現 download_one 中的異常傳遞到了download_many 中,而且致使拋出了異常,未執行完的其它future 也都中斷。

爲了能保證其它沒有錯誤的future 能夠正常執行,這裏咱們須要對future.result() 作異常處理。

改動結果以下:

def download_many(cc_list):
    cc_list = cc_list[:5]
    with futures.ThreadPoolExecutor(max_workers=20) as executor:
        to_do_map = {}
        for cc in sorted(cc_list):
            future = executor.submit(download_one, cc)
            to_do_map[future] = cc
            msg = 'Scheduled for {}: {}'
            print(msg.format(cc, future))

        results = []
        for future in futures.as_completed(to_do_map):
            try:
                res = future.result()
            except requests.exceptions.HTTPError as exc:
                # 處理可能出現的異常
                error_msg = '{} result {}'.format(cc, exc)
            else:
                error_msg = ''
            if error_msg:
                cc = to_do_map[future]  # <16>
                print('*** Error for {}: {}'.format(cc, error_msg))
            else:
                msg = '{} result: {!r}'
                print(msg.format(future, res))
                results.append(res)

    return len(results)

這裏咱們用到了一個對 futures.as_completed 函數特別有用的慣用法:構建一個字典,把各個future映射到其餘數據(future運行結束後可能用的)上。這樣,雖然 future生成的順序雖然已經亂了,依然便於使用結果作後續處理。

一篇寫完了沒有總結總感受少點什麼,因此。

總結

Python 自 0.9.8 版就支持線程了,concurrent.futures 只不過是使用線程的最新方式。

futures.ThreadPoolExecutor 類封裝了 threading 模塊的組件,使使用線程變得更加方便。

順便再推薦一下 《流暢的python》,絕對值得一下。

下一篇筆記應該是使用 asyncio 處理併發。

最後,感謝女友支持。

>歡迎關注 >請我喝芬達
歡迎關注 請我喝芬達
相關文章
相關標籤/搜索