我的筆記,若有疏漏,還請指正。html
使用多線程(threading)和多進程(multiprocessing)完成常規的併發需求,在啓動的時候 start、join 等步驟不能省,複雜的須要還要用 1-2 個隊列。
隨着需求愈來愈複雜,若是沒有良好的設計和抽象這部分的功能層次,代碼量越多調試的難度就越大。python
對於須要併發執行、可是對實時性要求不高的任務,咱們可使用 concurrent.futures 包中的 PoolExecutor 類來實現。git
這個包提供了兩個執行器:線程池執行器 ThreadPoolExecutor 和進程池執行器 ProcessPoolExecutor,兩個執行器提供一樣的 API。程序員
池的概念主要目的是爲了重用:讓線程或進程在生命週期內能夠屢次使用。它減小了建立建立線程和進程的開銷,提升了程序性能。重用不是必須的規則,但它是程序員在應用中使用池的主要緣由。github
池,只有固定個數的線程/進程,經過 max_workers 指定。編程
import concurrent.futures import time from itertools import count number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] def evaluate_item(x): for i in count(x): # count 是無限迭代器,會一直遞增。 print(f"{x} - {i}") time.sleep(0.01) if __name__ == "__main__": # 進程池 start_time_2 = time.time() # 使用 with 在離開此代碼塊時,自動調用 executor.shutdown(wait=true) 釋放 executor 資源 with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # 將 10 個任務提交給 executor,並收集 futures futures = [executor.submit(evaluate_item, item) for item in number_list] # as_completed 方法等待 futures 中的 future 完成 # 一旦某個 future 完成,as_completed 就當即返回該 future # 這個方法,使每次返回的 future,老是最早完成的 future # 而不是先等待任務 1,再等待任務 2... for future in concurrent.futures.as_completed(futures): print(future.result()) print ("Thread pool execution in " + str(time.time() - start_time_2), "seconds")
上面的代碼中,item 爲 1 2 3 4 5 的五個任務會一直佔用全部的 workers,而 6 7 8 9 10 這五個任務會永遠等待!!!設計模式
concurrent.futures 包含三個部分的 API:api
submit(fn, *args, **kwargs)
:將任務函數 fn 提交到執行器,args 和 kwargs 就是 fn 須要的參數。
map(func, *iterables, timeout=None, chunksize=1)
:當任務是同一個,只有參數不一樣時,能夠用這個方法代替 submit。iterables 的每一個元素對應 func 的一組參數。
shutdown(wait=True)
:關閉執行器,通常都使用 with 管理器自動關閉。future.result(timout=None)
:最經常使用的方法,返回任務的結果。若是任務還沒有結束,這個方法會一直等待!
exception(timeout=None)
:給出任務拋出的異常。和 result() 同樣,也會等待任務結束。cancel()
:取消此任務add_done_callback(fn)
:future 完成後,會執行 fn(future)
。running()
:是否正在運行done()
:future 是否已經結束了,booleanconcurrent.futures.as_completed(fs, timeout=None)
:等待 fs (futures iterable)中的 future 完成
for future in as_completed(fs):
使用此函數。concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
:一直等待,直到 return_when 所指定的事發生,或者 timeout
這裏的 PoolExecutor 的特色,在於它使用了 Future 設計模式,使任務的執行,與結果的獲取,變成一個異步的流程。
咱們先經過 submit/map 將任務放入任務隊列,這時任務就已經開始執行了!而後咱們在須要的時候,經過 future 獲取結果,或者直接 add_done_callback(fn)
。多線程
這裏任務的執行是在新的 workers 中的,主進程/線程不會阻塞,所以主線程能夠幹其餘的事。這種方式被稱做異步編程。併發
concurrent.futures 基於 multiprocessing.pool 實現,所以實際上它比直接使用 線程/進程 的 Pool 要慢一點。可是它提供了更方便簡潔的 API。