Python
中已經有了threading
模塊,爲何還須要線程池呢,線程池又是什麼東西呢?在介紹線程同步的信號量機制的時候,舉得例子是爬蟲的例子,須要控制同時爬取的線程數,例子中建立了20個線程,而同時只容許3個線程在運行,可是20個線程都須要建立和銷燬,線程的建立是須要消耗系統資源的,有沒有更好的方案呢?其實只須要三個線程就好了,每一個線程各分配一個任務,剩下的任務排隊等待,當某個線程完成了任務的時候,排隊任務就能夠安排給這個線程繼續執行。html
這就是線程池的思想(固然沒這麼簡單),可是本身編寫線程池很難寫的比較完美,還須要考慮複雜狀況下的線程同步,很容易發生死鎖。從Python3.2
開始,標準庫爲咱們提供了concurrent.futures
模塊,它提供了ThreadPoolExecutor
和ProcessPoolExecutor
兩個類,實現了對threading
和multiprocessing
的進一步抽象(這裏主要關注線程池),不只能夠幫咱們自動調度線程,還能夠作到:python
from concurrent.futures import ThreadPoolExecutor import time # 參數times用來模擬網絡請求的時間 def get_html(times): time.sleep(times) print("get page {}s finished".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) # 經過submit函數提交執行的函數到線程池中,submit函數當即返回,不阻塞 task1 = executor.submit(get_html, (3)) task2 = executor.submit(get_html, (2)) # done方法用於斷定某個任務是否完成 print(task1.done()) # cancel方法用於取消某個任務,該任務沒有放入線程池中才能取消成功 print(task2.cancel()) time.sleep(4) print(task1.done()) # result方法能夠獲取task的執行結果 print(task1.result()) # 執行結果 # False # 代表task1未執行完成 # False # 代表task2取消失敗,由於已經放入了線程池中 # get page 2s finished # get page 3s finished # True # 因爲在get page 3s finished以後纔打印,因此此時task1必然完成了 # 3 # 獲得task1的任務返回值
ThreadPoolExecutor
構造實例的時候,傳入max_workers
參數來設置線程池中最多能同時運行的線程數目。submit
函數來提交線程須要執行的任務(函數名和參數)到線程池中,並返回該任務的句柄(相似於文件、畫圖),注意submit()
不是阻塞的,而是當即返回。submit
函數返回的任務句柄,可以使用done()
方法判斷該任務是否結束。上面的例子能夠看出,因爲任務有2s的延時,在task1
提交後馬上判斷,task1
還未完成,而在延時4s以後判斷,task1
就完成了。cancel()
方法能夠取消提交的任務,若是任務已經在線程池中運行了,就取消不了。這個例子中,線程池的大小設置爲2,任務已經在運行了,因此取消失敗。若是改變線程池的大小爲1,那麼先提交的是task1
,task2
還在排隊等候,這是時候就能夠成功取消。result()
方法能夠獲取任務的返回值。查看內部代碼,發現這個方法是阻塞的。上面雖然提供了判斷任務是否結束的方法,可是不能在主線程中一直判斷啊。有時候咱們是得知某個任務結束了,就去獲取結果,而不是一直判斷每一個任務有沒有結束。這是就可使用as_completed
方法一次取出全部任務的結果。網絡
from concurrent.futures import ThreadPoolExecutor, as_completed import time # 參數times用來模擬網絡請求的時間 def get_html(times): time.sleep(times) print("get page {}s finished".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) urls = [3, 2, 4] # 並非真的url all_task = [executor.submit(get_html, (url)) for url in urls] for future in as_completed(all_task): data = future.result() print("in main: get page {}s success".format(data)) # 執行結果 # get page 2s finished # in main: get page 2s success # get page 3s finished # in main: get page 3s success # get page 4s finished # in main: get page 4s success
as_completed()
方法是一個生成器,在沒有任務完成的時候,會阻塞,在有某個任務完成的時候,會yield
這個任務,就能執行for循環下面的語句,而後繼續阻塞住,循環到全部的任務結束。從結果也能夠看出,先完成的任務會先通知主線程。多線程
除了上面的as_completed
方法,還可使用executor.map
方法,可是有一點不一樣。函數
from concurrent.futures import ThreadPoolExecutor import time # 參數times用來模擬網絡請求的時間 def get_html(times): time.sleep(times) print("get page {}s finished".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) urls = [3, 2, 4] # 並非真的url for data in executor.map(get_html, urls): print("in main: get page {}s success".format(data)) # 執行結果 # get page 2s finished # get page 3s finished # in main: get page 3s success # in main: get page 2s success # get page 4s finished # in main: get page 4s success
使用map
方法,無需提早使用submit
方法,map
方法與python
標準庫中的map
含義相同,都是將序列中的每一個元素都執行同一個函數。上面的代碼就是對urls
的每一個元素都執行get_html
函數,並分配各線程池。能夠看到執行結果與上面的as_completed
方法的結果不一樣,輸出順序和urls
列表的順序相同,就算2s的任務先執行完成,也會先打印出3s的任務先完成,再打印2s的任務完成。編碼
wait
方法可讓主線程阻塞,直到知足設定的要求。url
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED import time # 參數times用來模擬網絡請求的時間 def get_html(times): time.sleep(times) print("get page {}s finished".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) urls = [3, 2, 4] # 並非真的url all_task = [executor.submit(get_html, (url)) for url in urls] wait(all_task, return_when=ALL_COMPLETED) print("main") # 執行結果 # get page 2s finished # get page 3s finished # get page 4s finished # main
wait
方法接收3個參數,等待的任務序列、超時時間以及等待條件。等待條件
return_when
默認爲
ALL_COMPLETED
,代表要等待全部的任務都結束。能夠看到運行結果中,確實是全部任務都完成了,主線程纔打印出
main
。等待條件還能夠設置爲
FIRST_COMPLETED
,表示第一個任務完成就中止等待。