前面咱們已經將線程併發編程與進程並行編程所有摸了個透,其實我第一次學習他們的時候感受很是困難甚至是吃力。由於概念實在是太多了,各類鎖,數據共享同步,各類方法等等讓人十分頭痛。因此這邊要告訴你一個好消息,前面的全部學習的知識點其實都是爲本章知識點作鋪墊,在學習了本章節的內容後關於如何使用多線程併發與多進程並行就採起本章節中介紹的方式便可。html
這裏要介紹一點與以前內容不一樣的地方,即若是使用隊列進行由進程池建立的進程之間數據共享的話不論是multiprocessing
模塊下的Queue
仍是queue
模塊下的Queue
都不能爲進程池中所建立的進程進行數據共享,咱們須要用到另外一個隊列即multiprocessing.Manager()
中的Queue
。固然這個我也會在下面介紹到。那麼開始學習吧!python
官方文檔編程
最先期的Python2中是沒有線程池這一律唸的,只有進程池。直到Python3的出現才引入了線程池,其實關於他們的使用都是很是簡單,並且接口也是高度統一甚至說如出一轍的。而線程池與進程池的做用便是爲了讓咱們可以更加便捷的管理線程或進程。多線程
咱們先說一下,若是須要使用線程池或進程池,須要導入模塊concurrent.futures
。併發
from concurrent.futures import ThreadPoolExecutor
# 線程池執行器app
from concurrent.futures import ProcessPoolExecutor
# 進程池執行器框架
這裏介紹一下,關於線程池或者進程池建立出的線程與進程與咱們使用multiprocessing
模塊或者threading
模塊中建立的線程或進程有什麼區別。咱們以多線程爲例:異步
import threading def task(): ident = threading.get_ident() print(ident) # 銷燬當前執行任務的線程 if __name__ == '__main__': for i in range(10): t1 = threading.Thread(target=task,) # 領任務 t1.start() # 等待CPU調度,而不是當即執行 # 執行 # ==== 執行結果 ==== Ps:能夠看到每一個線程的id號都不同,這也印證了圖上說的。 """ 10392 12068 5708 13864 2604 7196 7324 9728 9664 472 """
import threading from concurrent.futures import ThreadPoolExecutor # 線程池執行器 def task(): ident = threading.get_ident() print(ident) # 結束任務,不銷燬當前執行任務的線程,直到全部任務都執行完畢。 if __name__ == '__main__': pool = ThreadPoolExecutor(max_workers=2) # 這裏表明有2個線程能夠領取任務 for i in range(10): pool.submit(task) # 執行器啓動任務,將這些任務給2我的分配,也就是說task這個任務會被這2個線程不斷的執行,直到執行完畢後這2個線程纔會死亡 # ==== 執行結果 ==== Ps:能夠看到這裏都讓這2個線程把任務接了,內存開銷相比於上面的要小。 """ 7272 7272 7272 7272 11596 7272 11596 11596 11596 11596 """
執行器方法大全 | |
---|---|
submit(fn, *args, **kwargs) | 調度可調用對象 fn ,以 fn(*args **kwargs) 方式執行並返回 |
map(func, *iterables, timeout=None, chunksize=1) | 相似於 |
shutdown(wait=True) | 等待,相似join() 方法,而且在全部的任務完成後關閉執行器。wait=True 爲關閉,爲False 則是不關閉執行器的意思。 |
Ps:其實對於線程池或進程池來講,他們的池都有一個官方的名稱叫作執行器,接口都是同樣的。那麼接下來我就將線程池進程池這樣的名字換作執行器了,也是方便理解。 |
其實關於執行器的使用,咱們有兩種方式,一種是依賴於with
語句,一種是不依賴於with
語句,那麼我在這裏推薦使用依賴於wait語句的執行器。ide
不依賴於with
語句的執行器使用:函數
import threading from concurrent.futures import ThreadPoolExecutor # 線程池執行器 def task(): print("執行了") if __name__ == '__main__': pool = ThreadPoolExecutor(max_workers=2) # 這裏表明有2個線程能夠領取任務 , 對於線程池來說它是默認值是CPU核心數+4,對於進程池來說最大開啓的進程數是CPU核心數。 for i in range(10): pool.submit(task) # 執行器啓動任務,將這些任務給2我的分配,也就是說task這個任務會被這2個線程不斷的執行,直到執行完畢後這2個線程纔會死亡 # ==== 執行結果 ==== Ps:能夠看到這裏都讓這2個線程把任務接了,內存開銷相比於上面的要小。 """ 執行了 執行了 執行了 執行了 執行了 執行了 執行了 執行了 執行了 執行了 """
依賴於with
語句的執行器使用:
import threading from concurrent.futures import ThreadPoolExecutor # 線程池執行器 def task(): print("執行了") # 銷燬 if __name__ == '__main__': with ThreadPoolExecutor(max_workers=2) as pool: # 這裏表明有2個線程能夠領取任務 , 對於線程池來說它是默認值是CPU核心數+4,對於進程池來說最大開啓的進程數是CPU核心數。 for i in range(10): pool.submit(task) # 執行器啓動任務,將這些任務給2我的分配,也就是說task這個任務會被這2個線程不斷的執行,直到執行完畢後這2個線程纔會死亡 # ==== 執行結果 ==== Ps:能夠看到這裏都讓這2個線程把任務接了,內存開銷相比於上面的要小。 """ 執行了 執行了 執行了 執行了 執行了 執行了 執行了 執行了 執行了 執行了 """
期程對象(由執行器執行的任務的返回結果)方法大全 | |
---|---|
方法/屬性名稱 | 功能描述 |
cancel() | 嘗試取消調用。 若是調用正在執行或已結束運行不能被取消則該方法將返回 False ,不然調用會被取消而且該方法將返回 True 。 |
cancelled() | 若是調用成功取消返回 True 。 |
running() | 若是調用正在執行並且不能被取消那麼返回 True 。 |
done() | 若是調用已被取消或正常結束那麼返回 True 。 |
result(timeout=None) | 即獲取任務的返回結果,最大等待timeout秒,如不設置則死等,超時觸發CancelledError 異常。 |
add_done_callback(fn) | 增長回調函數fn ,這個fn 應該至少有一個形參來接收當前期程對象。 |
exception(timeout=None) | 返回由調用引起的異常。若是調用還沒完成那麼這個方法將等待 timeout 秒。若是在 timeout 秒內沒有執行完成,concurrent.futures.TimeoutError 將會被觸發。timeout 能夠是整數或浮點數。若是 timeout 沒有指定或爲 None ,那麼等待時間就沒有限制。 |
Ps:還有一些期程對象的方法沒有舉例出來。詳情參見文檔 |
咱們能夠看到,咱們上面的函數並無返回值,若是有返回值的話怎麼辦呢?
import threading from concurrent.futures import ThreadPoolExecutor # 線程池執行器 def task(): print("執行了") return "玫瑰花" # 銷燬 if __name__ == '__main__': with ThreadPoolExecutor(max_workers=2) as pool: res = pool.submit(task) print(res) # <Future at 0x2539ea97850 state=finished returned str> 這個就是期程對象,能夠看到他裏面還有當前任務的執行狀態。 finished = 執行完了的意思 print(res.result()) # 經過該方法就能夠拿到任務的返回結果 # ==== 執行結果 ==== """ 執行了 <Future at 0x2539ea97850 state=finished returned str> 玫瑰花 """
期程對象,也被稱爲將來對象,是一個很是重要的概念。這裏能夠記一筆,在Django
框架中也有些地方採起了期程對象這樣的設定,這是後話,後面再聊。
咱們嘗試着將它的任務數量增多,發現使用期程對象直接獲取任務結果會致使阻塞,怎麼解決?
import time import threading from concurrent.futures import ThreadPoolExecutor # 線程池執行器 def task(x): print("執行了,這是第%s個任務"%x) time.sleep(3) return "玫瑰花" # 銷燬 if __name__ == '__main__': with ThreadPoolExecutor(max_workers=2) as pool: for i in range(10): res = pool.submit(task,i) print(res.result()) # 每次獲取結果的時候都是阻塞,怎麼辦?這個速率就變得很是的Low逼了。 # ==== 執行結果 ==== """ 執行了,這是第0個任務 玫瑰花 執行了,這是第1個任務 玫瑰花 執行了,這是第2個任務 玫瑰花 執行了,這是第3個任務 玫瑰花 執行了,這是第4個任務 玫瑰花 執行了,這是第5個任務 玫瑰花 執行了,這是第6個任務 玫瑰花 執行了,這是第7個任務 玫瑰花 執行了,這是第8個任務 玫瑰花 執行了,這是第9個任務 玫瑰花 """
我這裏有一個辦法,能夠值得嘗試一下。就是執行器自己有個方法shutdown(wait=True)
,它會致使當前主線程的阻塞。那麼咱們就能夠這樣操做,主程序阻塞住,再將啓程對象所有放到一個列表中,當全部任務處理完畢後阻塞通行,這個時候咱們再循環這個列表拿出其中的結果。
import time import threading from concurrent.futures import ThreadPoolExecutor # 線程池執行器 def task(x): print("執行了,這是第%s個任務"%x) time.sleep(3) return "玫瑰花" # 銷燬 if __name__ == '__main__': res_list = [] # 用於存放全部期程對象 with ThreadPoolExecutor(max_workers=2) as pool: for i in range(10): res = pool.submit(task,i) res_list.append(res) # 將期程對象放入列表 pool.shutdown(wait=True) # 表明必須將全部子線程的任務跑完再繼續向下執行主線程。 for i in res_list: print(i.result()) # ==== 執行結果 ==== """ 執行了,這是第0個任務 執行了,這是第1個任務 執行了,這是第2個任務 執行了,這是第3個任務 執行了,這是第4個任務 執行了,這是第5個任務 執行了,這是第6個任務 執行了,這是第7個任務 執行了,這是第8個任務 執行了,這是第9個任務 玫瑰花 玫瑰花 玫瑰花 玫瑰花 玫瑰花 玫瑰花 玫瑰花 玫瑰花 玫瑰花 玫瑰花 """
若是你以爲這種方法很贊,我只能送你兩個字,太low了。咱們注意執行器的submit()
方法,這玩意兒是異步提交。異步提交的結果須要用到回調函數來進行調用,咱們來看一下它有多牛逼。
import time import threading from concurrent.futures import ThreadPoolExecutor # 線程池執行器 def task(x): print("執行了,這是第%s個任務"%x) time.sleep(3) return "玫瑰花" # 銷燬 def callback(res): # 必須有一個形參,來接收期程對象 print(res.result()) # 打印結果,即task任務的返回結果 if __name__ == '__main__': with ThreadPoolExecutor(max_workers=2) as pool: for i in range(10): res = pool.submit(task,i) res.add_done_callback(callback) # <--- 增長回調函數,當期程對象中的任務處理狀態完畢後將自動調用回調函數 # ==== 執行結果 ==== # 異步提交牛逼不?只要任務返回了咱們立馬就能夠獲取到結果進行處理。 """ 執行了,這是第0個任務 執行了,這是第1個任務 玫瑰花 玫瑰花 執行了,這是第2個任務 執行了,這是第3個任務 玫瑰花 玫瑰花 執行了,這是第4個任務 執行了,這是第5個任務 玫瑰花 玫瑰花 執行了,這是第6個任務 執行了,這是第7個任務 玫瑰花 玫瑰花 執行了,這是第8個任務 執行了,這是第9個任務 玫瑰花 玫瑰花 """
當咱們使用進程池執行器啓動多進程執行任務時,若是想用數據共享,單純multiprocessing.Queue
進程隊列並不支持。
import multiprocessing from concurrent.futures import ProcessPoolExecutor # 進程池執行器 def task_1(q): q.put("玫瑰花") print("放完了...") def task_2(q): print(q.get()) print("取到了") if __name__ == '__main__': q = multiprocessing.Queue() with ProcessPoolExecutor(max_workers=2) as pool: pool.submit(task_1,q) pool.submit(task_2,q) # ==== 執行結果 ==== # 阻塞住 """ """
這個時候咱們須要用到multiprocessing
中的Manager()
中的Queue
。
from multiprocessing import Manager from concurrent.futures import ProcessPoolExecutor # 進程池執行器 def task_1(q): q.put("玫瑰花") print("放完了...") def task_2(q): print(q.get()) print("取到了") if __name__ == '__main__': q = Manager().Queue() with ProcessPoolExecutor(max_workers=2) as pool: pool.submit(task_1,q) pool.submit(task_2,q) # ==== 執行結果 ==== # 成功 """ 放完了... 玫瑰花 取到了 """