Python
中已經有了threading
模塊,爲何還須要線程池呢,線程池又是什麼東西呢?在介紹線程同步的信號量機制的時候,舉得例子是爬蟲的例子,須要控制同時爬取的線程數,例子中建立了20個線程,而同時只容許3個線程在運行,可是20個線程都須要建立和銷燬,線程的建立是須要消耗系統資源的,有沒有更好的方案呢?其實只須要三個線程就好了,每一個線程各分配一個任務,剩下的任務排隊等待,當某個線程完成了任務的時候,排隊任務就能夠安排給這個線程繼續執行。html
這就是線程池的思想(固然沒這麼簡單),可是本身編寫線程池很難寫的比較完美,還須要考慮複雜狀況下的線程同步,很容易發生死鎖。從Python3.2
開始,標準庫爲咱們提供了concurrent.futures
模塊,它提供了ThreadPoolExecutor
和ProcessPoolExecutor
兩個類,實現了對threading
和multiprocessing
的進一步抽象(這裏主要關注線程池),不只能夠幫咱們自動調度線程,還能夠作到:python
from concurrent.futures import ThreadPoolExecutor
import time
# 參數times用來模擬網絡請求的時間
def get_html(times):
time.sleep(times)
print("get page {}s finished".format(times))
return times
executor = ThreadPoolExecutor(max_workers=2)
# 經過submit函數提交執行的函數到線程池中,submit函數當即返回,不阻塞
task1 = executor.submit(get_html, (3))
task2 = executor.submit(get_html, (2))
# done方法用於斷定某個任務是否完成
print(task1.done())
# cancel方法用於取消某個任務,該任務沒有放入線程池中才能取消成功
print(task2.cancel())
time.sleep(4)
print(task1.done())
# result方法能夠獲取task的執行結果
print(task1.result())
# 執行結果
# False # 代表task1未執行完成
# False # 代表task2取消失敗,由於已經放入了線程池中
# get page 2s finished
# get page 3s finished
# True # 因爲在get page 3s finished以後纔打印,因此此時task1必然完成了
# 3 # 獲得task1的任務返回值
ThreadPoolExecutor
構造實例的時候,傳入max_workers
參數來設置線程池中最多能同時運行的線程數目。submit
函數來提交線程須要執行的任務(函數名和參數)到線程池中,並返回該任務的句柄(相似於文件、畫圖),注意submit()
不是阻塞的,而是當即返回。submit
函數返回的任務句柄,可以使用done()
方法判斷該任務是否結束。上面的例子能夠看出,因爲任務有2s的延時,在task1
提交後馬上判斷,task1
還未完成,而在延時4s以後判斷,task1
就完成了。cancel()
方法能夠取消提交的任務,若是任務已經在線程池中運行了,就取消不了。這個例子中,線程池的大小設置爲2,任務已經在運行了,因此取消失敗。若是改變線程池的大小爲1,那麼先提交的是task1
,task2
還在排隊等候,這是時候就能夠成功取消。result()
方法能夠獲取任務的返回值。查看內部代碼,發現這個方法是阻塞的。上面雖然提供了判斷任務是否結束的方法,可是不能在主線程中一直判斷啊。有時候咱們是得知某個任務結束了,就去獲取結果,而不是一直判斷每一個任務有沒有結束。這是就可使用as_completed
方法一次取出全部任務的結果。編程
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
# 參數times用來模擬網絡請求的時間
def get_html(times):
time.sleep(times)
print("get page {}s finished".format(times))
return times
executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 並非真的url
all_task = [executor.submit(get_html, (url)) for url in urls]
for future in as_completed(all_task):
data = future.result()
print("in main: get page {}s success".format(data))
# 執行結果
# get page 2s finished
# in main: get page 2s success
# get page 3s finished
# in main: get page 3s success
# get page 4s finished
# in main: get page 4s success
as_completed()
方法是一個生成器,在沒有任務完成的時候,會阻塞,在有某個任務完成的時候,會yield
這個任務,就能執行for循環下面的語句,而後繼續阻塞住,循環到全部的任務結束。從結果也能夠看出,先完成的任務會先通知主線程。網絡
除了上面的as_completed
方法,還可使用executor.map
方法,可是有一點不一樣。多線程
from concurrent.futures import ThreadPoolExecutor
import time
# 參數times用來模擬網絡請求的時間
def get_html(times):
time.sleep(times)
print("get page {}s finished".format(times))
return times
executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 並非真的url
for data in executor.map(get_html, urls):
print("in main: get page {}s success".format(data))
# 執行結果
# get page 2s finished
# get page 3s finished
# in main: get page 3s success
# in main: get page 2s success
# get page 4s finished
# in main: get page 4s success
使用map
方法,無需提早使用submit
方法,map
方法與python
標準庫中的map
含義相同,都是將序列中的每一個元素都執行同一個函數。上面的代碼就是對urls
的每一個元素都執行get_html
函數,並分配各線程池。能夠看到執行結果與上面的as_completed
方法的結果不一樣,輸出順序和urls
列表的順序相同,就算2s的任務先執行完成,也會先打印出3s的任務先完成,再打印2s的任務完成。併發
wait
方法可讓主線程阻塞,直到知足設定的要求。異步
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
import time
# 參數times用來模擬網絡請求的時間
def get_html(times):
time.sleep(times)
print("get page {}s finished".format(times))
return times
executor = ThreadPoolExecutor(max_workers=2)
urls = [3, 2, 4] # 並非真的url
all_task = [executor.submit(get_html, (url)) for url in urls]
wait(all_task, return_when=ALL_COMPLETED)
print("main")
# 執行結果
# get page 2s finished
# get page 3s finished
# get page 4s finished
# main
wait
方法接收3個參數,等待的任務序列、超時時間以及等待條件。等待條件return_when
默認爲ALL_COMPLETED
,代表要等待全部的任務都結束。能夠看到運行結果中,確實是全部任務都完成了,主線程纔打印出main
。等待條件還能夠設置爲FIRST_COMPLETED
,表示第一個任務完成就中止等待。異步編程
cocurrent.future
模塊中的future
的意思是將來對象,能夠把它理解爲一個在將來完成的操做,這是異步編程的基礎 。在線程池submit()
以後,返回的就是這個future
對象,返回的時候任務並無完成,但會在未來完成。也能夠稱之爲task的返回容器,這個裏面會存儲task的結果和狀態。那ThreadPoolExecutor
內部是如何操做這個對象的呢?函數
下面簡單介紹ThreadPoolExecutor
的部分代碼:源碼分析
init方法
init
方法中主要重要的就是任務隊列和線程集合,在其餘方法中須要使用到。
submit方法
submit
中有兩個重要的對象,_base.Future()
和_WorkItem()
對象,_WorkItem()
對象負責運行任務和對future
對象進行設置,最後會將future
對象返回,能夠看到整個過程是當即返回的,沒有阻塞。
adjust_thread_count方法
這個方法的含義很好理解,主要是建立指定的線程數。可是實現上有點難以理解,好比線程執行函數中的weakref.ref,涉及到了弱引用等概念,留待之後理解。
_WorkItem對象
_WorkItem
對象的職責就是執行任務和設置結果。這裏面主要複雜的仍是self.future.set_result(result)
。
線程執行函數--_worker
這是線程池建立線程時指定的函數入口,主要是從隊列中依次取出task執行,可是函數的第一個參數還不是很明白。留待之後。