concurrent.futures模塊簡單介紹(線程池,進程池)

1、基類Executor

Executor類是ThreadPoolExecutor 和ProcessPoolExecutor 的基類。它爲咱們提供了以下方法:html

submit(fn, *args, **kwargs):提交任務。以 fn(*args **kwargs) 方式執行並返回 Future 對像。python

fn:函數地址。app

*args:位置參數。異步

**kwargs:關鍵字參數。函數

map(func, *iterables, timeout=None, chunksize=1):性能

func:函數地址。測試

iterables:一個可迭代對象,以迭代的方式將參數傳遞給函數。spa

timeout:這個參數沒弄明白,若是是None等待全部進程結束。線程

chunksize:使用 ProcessPoolExecutor 時,這個方法會將 iterables 分割任務塊,並做爲獨立的任務提交到執行池中。這些塊的數量能夠由 chunksize 指定設置。 對很長的迭代器來講,設置chunksize 值比默認值 1 能顯著地提升性能。 chunksize 對 ThreadPoolExecutor 沒有效果。code

shutdown(wait=True):若是爲True會等待線程池或進程池執行完成後釋放正在使用的資源。若是 wait 爲 False,將當即返回,全部待執行的期程完成執行後會釋放已分配的資源。 無論 wait 的值是什麼,整個 Python 程序將等到全部待執行的期程完成執行後才退出。

2、線程池對象

ThreadPoolExecutor 是 Executor 的子類,下面介紹ThreadPoolExecutor 的參數。

class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=()):

max_workers:線程池的數量。

thread_name_prefix:線程名前綴。默認線程名ThreadPoolExecutor-線程數。

initializer:一個函數或方法,在啓用線程前會調用這個函數(給線程池添加額外任務)

initargs :以元祖的方式給initializer中的函數傳遞參數。

這裏須要說明的是除了max_workers這個參數外其它三個參數基本不多用。max_workers很好理解就是線程池的數量。

下面來講initializer和initargs 這兩個奇怪的傢伙。

示例一:

from concurrent.futures import ThreadPoolExecutor
def work():
    print('工做線程')
def test(num):
    print('test:',num)
executor = ThreadPoolExecutor(max_workers=2,initializer=test(7))  # 開啓2個線程  initializer指定參數test(7)
executor.submit(work)  
executor.submit(work)

# 打印內容以下
test: 7
工做線程
工做線程

示例二:

from concurrent.futures import ThreadPoolExecutor
def work():
    print('工做線程')
def test(num):
    print('test:',num)
executor = ThreadPoolExecutor(max_workers=2,initializer=test,initargs=(7,)) # 這裏咱們使用initargs=(7,)的方式給test傳遞參數。
executor.submit(work)
executor.submit(work)

# 打印內容以下
test: 7
工做線程
工做線程
test: 7

經過示例一和示例二咱們能夠發現initializer=test(7)時,test函數只被調用了1次,當initializer=test,initargs=(7,)時,test被調用了2次。具體緣由沒有去分析。感受沒什麼用。之後有時間看看源碼在補上。

3、進程池對象

ProcessPoolExecutor 也是 Executor 的子類,下面是ProcessPoolExecutor 參數介紹:

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

max_workers:工做進程數。若是 max_workers 爲 None 或未給出,它將默認爲機器的處理器個數。 若是 max_workers 小於等於 0,則將引起 ValueError。 在 Windows 上,max_workers 必須小於等於 61,不然將引起 ValueError。 若是 max_workers 爲 None,則所選擇的默認最多爲 61,即便存在更多處理器。

mp_context :能夠是一個多進程上下文或是 None。 它將被用來啓動工做進程。 若是 mp_context 爲 None 或未給出,將使用默認的多進程上下文。

initializer:一個函數或方法,在啓用線程前會調用這個函數。

initargs :以元祖的方式給initializer中的函數傳遞參數。

關於說initializer和initargs 與ThreadPoolExecutor 相似這裏很少說了。


 

4、建立線程池

from concurrent.futures import ThreadPoolExecutor
import time
def work(num):
    time.sleep(1)
    print('工做線程:',num)
if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=5)  # 建立線程池,數量爲5
    for i in range(5):
        executor.submit(work, i)
    print('主線程')

# 打印內容以下
主線程
工做線程:   0
工做線程:   1
工做線程:   2
工做線程:   3
工做線程:   4
# 使用shutdown等待全部線程結束後在打印主線程 from concurrent.futures import ThreadPoolExecutor import time def work(num): time.sleep(1) print('工做線程:',num) if __name__ == '__main__': executor = ThreadPoolExecutor(max_workers=5) # 建立線程池,數量爲5 for i in range(5): executor.submit(work, i) executor.shutdown(wait=True) # 等待線程池結束 print('主線程') # 打印內容以下 工做線程: 0 工做線程: 1 工做線程: 2 工做線程: 3 工做線程: 4 主線程

若是想要在線程執行的過程當中添加額外的功能,可使用initializer參數,以下:

from concurrent.futures import ThreadPoolExecutor

def work(num):
    print('工做線程:',num)
def test(num):
    print('額外任務:',num)
if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=5,initializer=test,initargs=(7,)) # 添加額外任務
    for i in range(5):
        executor.submit(work, i)
    executor.shutdown(wait=True)
    print('主線程')

# 打印內容以下
額外任務: 7
工做線程: 0
額外任務: 7
工做線程: 1
額外任務: 7
工做線程: 2 
額外任務: 7
工做線程: 3 
額外任務: 7
工做線程: 4 
主線程

5、進程池

進程池與線程池用法基本一致,只是名字和實現不同而已。

from concurrent.futures import ProcessPoolExecutor
import time
def work(num):
    time.sleep(1)
    print('工做進程:',num)
if __name__ == '__main__':
    executor = ProcessPoolExecutor(max_workers=5)  # 建立進程池,數量爲5
    for i in range(5):
        executor.submit(work, i)
    print('主線程')

# 打印內容以下
主線程
工做進程: 0
工做進程: 1
工做進程: 2
工做進程: 3
工做進程: 4

# 使用shutdown等待全部線程結束後在打印主線程
from concurrent.futures import ProcessPoolExecutor
import time
def work(num):
    time.sleep(1)
    print('工做進程:',num)
if __name__ == '__main__':
    executor = ProcessPoolExecutor(max_workers=5)  # 建立進程池,數量爲5
    for i in range(5):
        executor.submit(work, i)
    executor.shutdown(wait=True)  # 等待進程池結束
    print('主線程')
# 打印內容以下
工做進程: 0
工做進程: 1
工做進程: 2
工做進程: 3
工做進程: 4
主線程

若是想要在線程執行的過程當中添加額外的功能,可使用initializer參數,以下:

from concurrent.futures import ProcessPoolExecutor

def work(num):
    print('工做進程:',num)
def test(num):
    print('額外任務:',num)
if __name__ == '__main__':
    executor = ProcessPoolExecutor(max_workers=5,initializer=test,initargs=(7,)) # 添加額外任務
    for i in range(5):
        executor.submit(work, i)
    executor.shutdown(wait=True)
    print('主線程')

# 打印內容以下
額外任務: 7
工做進程: 0
工做進程: 1
工做進程: 2
工做進程: 3
工做進程: 4
額外任務: 7
額外任務: 7
額外任務: 7
額外任務: 7
主線程

 


 

6、Future Objects

future類封裝了可調用文件的異步執行。future的實例由executor.submit()時被建立的,除了測試以外不該該直接實例化future對象,因此爲了獲取future對象咱們能夠f=executor.submit()便可。

class concurrent.futures.Future類中的方法:

cancel():嘗試取消執行線程池中的函數調用。若是調用當前正在執行或已完成運行,而且沒法取消,則方法將返回false,不然調用將被取消,方法將返回true。

cancelled():若是線程池中的函數執行成功返回True,調用失敗返回false。

running():若是線程池中的調用當前正在執行且沒法取消,則返回true。

done():若是呼叫成功取消或完成運行,則返回true。不然返回false

result(timeout=None):返回線程函數的返回值。若是線程函數未執行完成,則此方法將最多等待timeout秒,若是線程函數未在超時秒內完成,則將引起concurrent.futures.TimeoutError。超時能夠是int或float。若是未指定超時 timeout=None,則會阻塞,一直等待函數執行完成。若是在線程函數完成以前使用future對象取消了執行,則將引起CancelederRor。若是調用raised,此方法將引起相同的異常。

exception(timeout=None):返回線程函數引起的異常。若是線程函數還沒有完成,則此方法將最多等待timeout秒。若是線程函數未在超時秒內完成,則將引起concurrent.futures.TimeoutError。超時能夠是int或float。若是未指定超時或無超時timeout=None,則會一直等待。若是在線程函數完成以前使用future對象取消了執行,則將引起CancelederRor若是線程函數完成但未引起,則返回None。

add_done_callback(fn):將可調用fn附加到future對象。當future對象被取消或結束運行時,將調用fn,其中future對象是唯一的參數。添加的可調用對象是按照添加順序調用的,而且老是在屬於添加它們的進程的線程中調用。若是Callable引起異常子類,它將被記錄並忽略。若是可調用引起BaseException子類,則行爲未定義。


 

7、Module Functions

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED):將fs綁定一個future實例,若是future執行完成或取消執行fs函數。

fs:fs是一個函數綁定在future實例(可能由不一樣的執行器實例建立)。返回2個命名元組的集合。第一組名爲「done」,包含等待完成,完成前(完成或future對象取消)。第二組名爲「not_done」,包含未完成的future(未完成或正在運行的future)。

timeout:若是爲None一直等待,不然會等待timeout秒。

return_when :必須是以下範圍。

Constant

Description

FIRST_COMPLETED

當任何future 完成或取消或者線程函數執行完成時。

FIRST_EXCEPTION

當future經過引起異常而結束時,線程函數將返回。若是沒有future引起異常,那麼它至關於全部已完成的。

ALL_COMPLETED

當全部future完成或取消時,函數將返回。

 

concurrent.futures.as_completed(fs, timeout=None):返回一個future迭代器。

fs:可迭代對象的future。

timeout:超時時間,若是爲None會一直阻塞直到執行完成。不然將等待timeout秒。

from concurrent.futures._base import as_completed
from concurrent.futures import ThreadPoolExecutor

def work(num):
    return num ** 2
if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=5)
    future_list = []  # 存放future對象
    for i in range(5):
        future_list.append(executor.submit(work, i))
    for future in as_completed(future_list):   # 這是一個無聊的用法
        res = future.result()
        print(f'結果:{res}')  # 打印工做線程返回的結果
# 打印結果以下

結果:0
結果:4
結果:16
結果:1
結果:9

 

參考文檔:https://docs.python.org/3/library/concurrent.futures.html

相關文章
相關標籤/搜索