Python線程池及其原理和使用(超級詳細)

系統啓動一個新線程的成本是比較高的,由於它涉及與操做系統的交互。在這種情形下,使用線程池能夠很好地提高性能,尤爲是當程序中須要建立大量生存期很短暫的線程時,更應該考慮使用線程池。

線程池在系統啓動時即建立大量空閒的線程,程序只要將一個函數提交給線程池,線程池就會啓動一個空閒的線程來執行它。當該函數執行結束後,該線程並不會死亡,而是再次返回到線程池中變成空閒狀態,等待執行下一個函數。

此外,使用線程池能夠有效地控制系統中併發線程的數量。當系統中包含有大量的併發線程時,會致使系統性能急劇降低,甚至致使 Python 解釋器崩潰,而線程池的最大線程數參數能夠控制系統中併發線程的數量不超過此數。html

線程池的使用

線程池的基類是 concurrent.futures 模塊中的 Executor,Executor 提供了兩個子類,即 ThreadPoolExecutor 和 ProcessPoolExecutor,其中 ThreadPoolExecutor 用於建立線程池,而 ProcessPoolExecutor 用於建立進程池。

若是使用線程池/進程池來管理併發編程,那麼只要將相應的 task 函數提交給線程池/進程池,剩下的事情就由線程池/進程池來搞定。

Exectuor 提供了以下經常使用方法:編程

  • submit(fn, *args, **kwargs):將 fn 函數提交給線程池。*args 表明傳給 fn 函數的參數,*kwargs 表明以關鍵字參數的形式爲 fn 函數傳入參數。
  • map(func, *iterables, timeout=None, chunksize=1):該函數相似於全局函數 map(func, *iterables),只是該函數將會啓動多個線程,以異步方式當即對 iterables 執行 map 處理。
  • shutdown(wait=True):關閉線程池。


程序將 task 函數提交(submit)給線程池後,submit 方法會返回一個 Future 對象,Future 類主要用於獲取線程任務函數的返回值。因爲線程任務會在新線程中以異步方式執行,所以,線程執行的函數至關於一個「未來完成」的任務,因此 Python 使用 Future 來表明。併發

Future 提供了以下方法:異步

  • cancel():取消該 Future 表明的線程任務。若是該任務正在執行,不可取消,則該方法返回 False;不然,程序會取消該任務,並返回 True。
  • cancelled():返回 Future 表明的線程任務是否被成功取消。
  • running():若是該 Future 表明的線程任務正在執行、不可被取消,該方法返回 True。
  • done():若是該 Funture 表明的線程任務被成功取消或執行完成,則該方法返回 True。
  • result(timeout=None):獲取該 Future 表明的線程任務最後返回的結果。若是 Future 表明的線程任務還未完成,該方法將會阻塞當前線程,其中 timeout 參數指定最多阻塞多少秒。
  • exception(timeout=None):獲取該 Future 表明的線程任務所引起的異常。若是該任務成功完成,沒有異常,則該方法返回 None。
  • add_done_callback(fn):爲該 Future 表明的線程任務註冊一個「回調函數」,當該任務成功完成時,程序會自動觸發該 fn 函數。


在用完一個線程池後,應該調用該線程池的 shutdown() 方法,該方法將啓動線程池的關閉序列。調用 shutdown() 方法後的線程池再也不接收新任務,但會將之前全部的已提交任務執行完成。當線程池中的全部任務都執行完成後,該線程池中的全部線程都會死亡。

使用線程池來執行線程任務的步驟以下:函數

  1. 調用 ThreadPoolExecutor 類的構造器建立一個線程池。
  2. 定義一個普通函數做爲線程任務。
  3. 調用 ThreadPoolExecutor 對象的 submit() 方法來提交線程任務。
  4. 當不想提交任何任務時,調用 ThreadPoolExecutor 對象的 shutdown() 方法來關閉線程池。


下面程序示範瞭如何使用線程池來執行線程任務:性能

 1 def test(value1, value2=None):
 2     print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))
 3     time.sleep(2)
 4     return 'finished'
 5 
 6 def test_result(future):
 7     print(future.result())
 8 
 9 if __name__ == "__main__":
10     import numpy as np
11     from concurrent.futures import ThreadPoolExecutor
12     threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_")
13     for i in range(0,10):
14         future = threadPool.submit(test, i,i+1)
15         
16     threadPool.shutdown(wait=True)
1 結果:
2 
3 test__0 threading is printed 0, 1
4 test__1 threading is printed 1, 2
5 test__2 threading is printed 2, 3
6 test__3 threading is printed 3, 4
7 test__1 threading is printed 4, 5
8 test__0 threading is printed 5, 6
9 test__3 threading is printed 6, 7

 

獲取執行結果

前面程序調用了 Future 的 result() 方法來獲取線程任務的運回值,但該方法會阻塞當前主線程,只有等到錢程任務完成後,result() 方法的阻塞纔會被解除。

若是程序不但願直接調用 result() 方法阻塞線程,則可經過 Future 的 add_done_callback() 方法來添加回調函數,該回調函數形如 fn(future)。當線程任務完成後,程序會自動觸發該回調函數,並將對應的 Future 對象做爲參數傳給該回調函數。
直接調用result函數結果spa

 1 def test(value1, value2=None):
 2     print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))
 3     time.sleep(2)
 4     return 'finished'
 5 
 6 def test_result(future):
 7     print(future.result())
 8 
 9 if __name__ == "__main__":
10     import numpy as np
11     from concurrent.futures import ThreadPoolExecutor
12     threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_")
13     for i in range(0,10):
14         future = threadPool.submit(test, i,i+1)
15 #         future.add_done_callback(test_result)
16         print(future.result())
17         
18     threadPool.shutdown(wait=True)
19     print('main finished')
1 結果:
2 
3 test__0 threading is printed 0, 1
4 finished
5 test__0 threading is printed 1, 2
6 finished
7 test__1 threading is printed 2, 3
8 finished

去掉上面註釋部分,調用future.add_done_callback函數,註釋掉第16行操作系統

 1 test__0 threading is printed 0, 1
 2 test__1 threading is printed 1, 2
 3 test__2 threading is printed 2, 3
 4 test__3 threading is printed 3, 4
 5 finished
 6 finished
 7 finished
 8 test__1 threading is printed 4, 5
 9 test__0 threading is printed 5, 6
10 finished

 

另外,因爲線程池實現了上下文管理協議(Context Manage Protocol),所以,程序可使用 with 語句來管理線程池,這樣便可避免手動關閉線程池,如上面的程序所示。

此外,Exectuor 還提供了一個 map(func, *iterables, timeout=None, chunksize=1) 方法,該方法的功能相似於全局函數 map(),區別在於線程池的 map() 方法會爲 iterables 的每一個元素啓動一個線程,以併發方式來執行 func 函數。這種方式至關於啓動 len(iterables) 個線程,井收集每一個線程的執行結果。

例如,以下程序使用 Executor 的 map() 方法來啓動線程,並收集線程任務的返回值:.net

示例換成多參數的:線程

def test(value1, value2=None):
    print("%s threading is printed %s, %s"%(threading.current_thread().name, value1, value2))
#     time.sleep(2)


if __name__ == "__main__":
    import numpy as np
    from concurrent.futures import ThreadPoolExecutor
    threadPool = ThreadPoolExecutor(max_workers=4, thread_name_prefix="test_")
    for i in range(0,10):
#         test(str(i), str(i+1))
        threadPool.map(test, [i],[i+1]) # 這是運行一次test的參數,衆所周知map可讓test執行屢次,即一個[]表明一個參數,一個參數賦予不一樣的值即增長[]的長度如從[1]到[1,2,3]
    threadPool.shutdown(wait=True)

 

上面程序使用 map() 方法來啓動 4個線程(該程序的線程池包含 4 個線程,若是繼續使用只包含兩個線程的線程池,此時將有一個任務處於等待狀態,必須等其中一個任務完成,線程空閒出來纔會得到執行的機會),map() 方法的返回值將會收集每一個線程任務的返回結果。
經過上面程序能夠看出,使用 map() 方法來啓動線程,並收集線程的執行結果,不只具備代碼簡單的優勢,並且雖然程序會以併發方式來執行 test() 函數,但最後收集的 test() 函數的執行結果,依然與傳入參數的結果保持一致。

 

編寫這個文檔主要是由於示例文檔[1]沒有多參數的。網上不少資料都是基於threadpool方法傳參見[2]

Reference:

[1] http://c.biancheng.net/view/2627.html

[2] https://www.cnblogs.com/gongxijun/p/6862333.html

相關文章
相關標籤/搜索