[python] ThreadPoolExecutor線程池 python 線程池

初識

Python中已經有了threading模塊,爲何還須要線程池呢,線程池又是什麼東西呢?在介紹線程同步的信號量機制的時候,舉得例子是爬蟲的例子,須要控制同時爬取的線程數,例子中建立了20個線程,而同時只容許3個線程在運行,可是20個線程都須要建立和銷燬,線程的建立是須要消耗系統資源的,有沒有更好的方案呢?其實只須要三個線程就好了,每一個線程各分配一個任務,剩下的任務排隊等待,當某個線程完成了任務的時候,排隊任務就能夠安排給這個線程繼續執行。html

這就是線程池的思想(固然沒這麼簡單),可是本身編寫線程池很難寫的比較完美,還須要考慮複雜狀況下的線程同步,很容易發生死鎖。從Python3.2開始,標準庫爲咱們提供了concurrent.futures模塊,它提供了ThreadPoolExecutorProcessPoolExecutor兩個類,實現了對threadingmultiprocessing的進一步抽象(這裏主要關注線程池),不只能夠幫咱們自動調度線程,還能夠作到:python

  1. 主線程能夠獲取某一個線程(或者任務的)的狀態,以及返回值。
  2. 當一個線程完成的時候,主線程可以當即知道。
  3. 讓多線程和多進程的編碼接口一致。

實例

簡單使用

from concurrent.futures import ThreadPoolExecutor
import time

# 參數times用來模擬網絡請求的時間
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)
# 經過submit函數提交執行的函數到線程池中,submit函數當即返回,不阻塞
task1 = executor.submit(get_html, (3))
task2 = executor.submit(get_html, (2))
# done方法用於斷定某個任務是否完成
print(task1.done())
# cancel方法用於取消某個任務,該任務沒有放入線程池中才能取消成功
print(task2.cancel())
time.sleep(4)
print(task1.done())
# result方法能夠獲取task的執行結果
print(task1.result())

# 執行結果
# False  # 代表task1未執行完成
# False  # 代表task2取消失敗,由於已經放入了線程池中
# get page 2s finished
# get page 3s finished
# True  # 因爲在get page 3s finished以後纔打印,因此此時task1必然完成了
# 3     # 獲得task1的任務返回值

 

  • ThreadPoolExecutor構造實例的時候,傳入max_workers參數來設置線程池中最多能同時運行的線程數目。
  • 使用submit函數來提交線程須要執行的任務(函數名和參數)到線程池中,並返回該任務的句柄(相似於文件、畫圖),注意submit()不是阻塞的,而是當即返回。
  • 經過submit函數返回的任務句柄,可以使用done()方法判斷該任務是否結束。上面的例子能夠看出,因爲任務有2s的延時,在task1提交後馬上判斷,task1還未完成,而在延時4s以後判斷,task1就完成了。
  • 使用cancel()方法能夠取消提交的任務,若是任務已經在線程池中運行了,就取消不了。這個例子中,線程池的大小設置爲2,任務已經在運行了,因此取消失敗。若是改變線程池的大小爲1,那麼先提交的是task1task2還在排隊等候,這是時候就能夠成功取消。
  • 使用result()方法能夠獲取任務的返回值。查看內部代碼,發現這個方法是阻塞的。

as_completed

上面雖然提供了判斷任務是否結束的方法,可是不能在主線程中一直判斷啊。有時候咱們是得知某個任務結束了,就去獲取結果,而不是一直判斷每一個任務有沒有結束。這是就可使用as_completed方法一次取出全部任務的結果。網絡

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

# 參數times用來模擬網絡請求的時間
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 並非真的url
all_task = [executor.submit(get_html, (url)) for url in urls]

for future in as_completed(all_task):
    data = future.result()
    print("in main: get page {}s success".format(data))

# 執行結果
# get page 2s finished
# in main: get page 2s success
# get page 3s finished
# in main: get page 3s success
# get page 4s finished
# in main: get page 4s success

 

as_completed()方法是一個生成器,在沒有任務完成的時候,會阻塞,在有某個任務完成的時候,會yield這個任務,就能執行for循環下面的語句,而後繼續阻塞住,循環到全部的任務結束。從結果也能夠看出,先完成的任務會先通知主線程多線程

map

除了上面的as_completed方法,還可使用executor.map方法,可是有一點不一樣。函數


from concurrent.futures import ThreadPoolExecutor
import time

# 參數times用來模擬網絡請求的時間
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 並非真的url

for data in executor.map(get_html, urls):
    print("in main: get page {}s success".format(data))
# 執行結果
# get page 2s finished
# get page 3s finished
# in main: get page 3s success
# in main: get page 2s success
# get page 4s finished
# in main: get page 4s success

 

使用map方法,無需提早使用submit方法,map方法與python標準庫中的map含義相同,都是將序列中的每一個元素都執行同一個函數。上面的代碼就是對urls的每一個元素都執行get_html函數,並分配各線程池。能夠看到執行結果與上面的as_completed方法的結果不一樣,輸出順序和urls列表的順序相同,就算2s的任務先執行完成,也會先打印出3s的任務先完成,再打印2s的任務完成。編碼

wait

wait方法可讓主線程阻塞,直到知足設定的要求。url


from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
import time

# 參數times用來模擬網絡請求的時間
def get_html(times):
    time.sleep(times)
    print("get page {}s finished".format(times))
    return times

executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 並非真的url
all_task = [executor.submit(get_html, (url)) for url in urls]
wait(all_task, return_when=ALL_COMPLETED)
print("main")
# 執行結果 
# get page 2s finished
# get page 3s finished
# get page 4s finished
# main

 

wait方法接收3個參數,等待的任務序列、超時時間以及等待條件。等待條件 return_when默認爲 ALL_COMPLETED,代表要等待全部的任務都結束。能夠看到運行結果中,確實是全部任務都完成了,主線程纔打印出 main。等待條件還能夠設置爲 FIRST_COMPLETED,表示第一個任務完成就中止等待。
相關文章
相關標籤/搜索