[python] ThreadPoolExecutor線程池

初識

Python中已經有了threading模塊,爲何還須要線程池呢,線程池又是什麼東西呢?在介紹線程同步的信號量機制的時候,舉得例子是爬蟲的例子,須要控制同時爬取的線程數,例子中建立了20個線程,而同時只容許3個線程在運行,可是20個線程都須要建立和銷燬,線程的建立是須要消耗系統資源的,有沒有更好的方案呢?其實只須要三個線程就好了,每一個線程各分配一個任務,剩下的任務排隊等待,當某個線程完成了任務的時候,排隊任務就能夠安排給這個線程繼續執行。html

這就是線程池的思想(固然沒這麼簡單),可是本身編寫線程池很難寫的比較完美,還須要考慮複雜狀況下的線程同步,很容易發生死鎖。從Python3.2開始,標準庫爲咱們提供了concurrent.futures模塊,它提供了ThreadPoolExecutorProcessPoolExecutor兩個類,實現了對threadingmultiprocessing的進一步抽象(這裏主要關注線程池),不只能夠幫咱們自動調度線程,還能夠作到:python

  1. 主線程能夠獲取某一個線程(或者任務的)的狀態,以及返回值。
  2. 當一個線程完成的時候,主線程可以當即知道。
  3. 讓多線程和多進程的編碼接口一致。

實例

簡單使用

from concurrent.futures import ThreadPoolExecutor
import time

# 參數times用來模擬網絡請求的時間
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)
# 經過submit函數提交執行的函數到線程池中,submit函數當即返回,不阻塞
task1 = executor.submit(get_html, (3))
task2 = executor.submit(get_html, (2))
# done方法用於斷定某個任務是否完成
print(task1.done())
# cancel方法用於取消某個任務,該任務沒有放入線程池中才能取消成功
print(task2.cancel())
time.sleep(4)
print(task1.done())
# result方法能夠獲取task的執行結果
print(task1.result())

# 執行結果
# False # 代表task1未執行完成
# False # 代表task2取消失敗,由於已經放入了線程池中
# get page 2s finished
# get page 3s finished
# True # 因爲在get page 3s finished以後纔打印,因此此時task1必然完成了
# 3 # 獲得task1的任務返回值
  1. ThreadPoolExecutor構造實例的時候,傳入max_workers參數來設置線程池中最多能同時運行的線程數目。
  2. 使用submit函數來提交線程須要執行的任務(函數名和參數)到線程池中,並返回該任務的句柄(相似於文件、畫圖),注意submit()不是阻塞的,而是當即返回。
  3. 經過submit函數返回的任務句柄,可以使用done()方法判斷該任務是否結束。上面的例子能夠看出,因爲任務有2s的延時,在task1提交後馬上判斷,task1還未完成,而在延時4s以後判斷,task1就完成了。
  4. 使用cancel()方法能夠取消提交的任務,若是任務已經在線程池中運行了,就取消不了。這個例子中,線程池的大小設置爲2,任務已經在運行了,因此取消失敗。若是改變線程池的大小爲1,那麼先提交的是task1task2還在排隊等候,這是時候就能夠成功取消。
  5. 使用result()方法能夠獲取任務的返回值。查看內部代碼,發現這個方法是阻塞的。

as_completed

上面雖然提供了判斷任務是否結束的方法,可是不能在主線程中一直判斷啊。有時候咱們是得知某個任務結束了,就去獲取結果,而不是一直判斷每一個任務有沒有結束。這是就可使用as_completed方法一次取出全部任務的結果。編程

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

# 參數times用來模擬網絡請求的時間
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 並非真的url
all_task = [executor.submit(get_html, (url)) for url in urls]

for future in as_completed(all_task):
    data = future.result()
    print("in main: get page {}s success".format(data))

# 執行結果
# get page 2s finished
# in main: get page 2s success
# get page 3s finished
# in main: get page 3s success
# get page 4s finished
# in main: get page 4s success

as_completed()方法是一個生成器,在沒有任務完成的時候,會阻塞,在有某個任務完成的時候,會yield這個任務,就能執行for循環下面的語句,而後繼續阻塞住,循環到全部的任務結束。從結果也能夠看出,先完成的任務會先通知主線程網絡

map

除了上面的as_completed方法,還可使用executor.map方法,可是有一點不一樣。多線程

from concurrent.futures import ThreadPoolExecutor
import time

# 參數times用來模擬網絡請求的時間
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 並非真的url

for data in executor.map(get_html, urls):
    print("in main: get page {}s success".format(data))
# 執行結果
# get page 2s finished
# get page 3s finished
# in main: get page 3s success
# in main: get page 2s success
# get page 4s finished
# in main: get page 4s success

使用map方法,無需提早使用submit方法,map方法與python標準庫中的map含義相同,都是將序列中的每一個元素都執行同一個函數。上面的代碼就是對urls的每一個元素都執行get_html函數,並分配各線程池。能夠看到執行結果與上面的as_completed方法的結果不一樣,輸出順序和urls列表的順序相同,就算2s的任務先執行完成,也會先打印出3s的任務先完成,再打印2s的任務完成。併發

wait

wait方法可讓主線程阻塞,直到知足設定的要求。異步

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
import time

# 參數times用來模擬網絡請求的時間
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 並非真的url
all_task = [executor.submit(get_html, (url)) for url in urls]
wait(all_task, return_when=ALL_COMPLETED)
print("main")
# 執行結果 
# get page 2s finished
# get page 3s finished
# get page 4s finished
# main

wait方法接收3個參數,等待的任務序列、超時時間以及等待條件。等待條件return_when默認爲ALL_COMPLETED,代表要等待全部的任務都結束。能夠看到運行結果中,確實是全部任務都完成了,主線程纔打印出main。等待條件還能夠設置爲FIRST_COMPLETED,表示第一個任務完成就中止等待。異步編程

源碼分析

cocurrent.future模塊中的future的意思是將來對象,能夠把它理解爲一個在將來完成的操做,這是異步編程的基礎 。在線程池submit()以後,返回的就是這個future對象,返回的時候任務並無完成,但會在未來完成。也能夠稱之爲task的返回容器,這個裏面會存儲task的結果和狀態。那ThreadPoolExecutor內部是如何操做這個對象的呢?函數

下面簡單介紹ThreadPoolExecutor的部分代碼:源碼分析

  1. init方法

     

    init方法中主要重要的就是任務隊列和線程集合,在其餘方法中須要使用到。

     

  2. submit方法

     

     

    submit中有兩個重要的對象,_base.Future()_WorkItem()對象,_WorkItem()對象負責運行任務和對future對象進行設置,最後會將future對象返回,能夠看到整個過程是當即返回的,沒有阻塞。

  3. adjust_thread_count方法

     

     


    這個方法的含義很好理解,主要是建立指定的線程數。可是實現上有點難以理解,好比線程執行函數中的weakref.ref,涉及到了弱引用等概念,留待之後理解。

  4. _WorkItem對象

     

     

     


    _WorkItem對象的職責就是執行任務和設置結果。這裏面主要複雜的仍是self.future.set_result(result)

     

  5. 線程執行函數--_worker

     

    這是線程池建立線程時指定的函數入口,主要是從隊列中依次取出task執行,可是函數的第一個參數還不是很明白。留待之後。

總結

  • future的設計理念很棒,在線程池/進程池和攜程中都存在future對象,是異步編程的核心。
  • ThreadPoolExecutor 讓線程的使用更加方便,減少了線程建立/銷燬的資源損耗,無需考慮線程間的複雜同步,方便主線程與子線程的交互。
  • 線程池的抽象程度很高,多線程和多進程的編碼接口一致。

未完成

  • 對future模塊的理解。
  • weakref.ref是什麼?
  • 線程執行函數入口_worker的第一個參數的意思。

參考

  1. Python併發編程之線程池/進程池
  2. Python3高級編程和異步IO併發編程
相關文章
相關標籤/搜索