Python中的進程池與線程池(包含代碼) Python中的進程池與線程池

 

Python中的進程池與線程池

  • 引入進程池與線程池

  • 使用ProcessPoolExecutor進程池,使用ThreadPoolExecutor

  • 使用shutdown

  • 使用submit同步調用

  • 使用submit異步調用

  • 異步+回調函數

  • 併發實現套接字通訊

引入進程池

在學習線程池以前,咱們先看一個例子html

複製代碼
 1 # from multiprocessing import Process  2 # import time  3 #  4 # def task(name):  5 # print('name',name)  6 # time.sleep(1)  7 # if __name__ == '__main__':  8 # start=time.time()  9 # p1 = Process(target=task,args=("safly1",)) 10 # p2 = Process(target=task, args=("safly2",)) 11 # p3 = Process(target=task, args=("safly3",)) 12 # 13 # p1.start() 14 # p2.start() 15 # p3.start() 16 # 17 # p1.join() 18 # p2.join() 19 # p3.join() 20 # 21 # print("main") 22 # 23 # end = time.time() 24 # print(end- start)
複製代碼

輸出以下:python

 

 以上的方式是一個個建立進程,這樣的耗費時間才1秒多,雖然高效,可是有什麼弊端呢? 
若是併發很大的話,會給服務器帶來很大的壓力,因此引入了進程池的概念服務器

使用ProcessPoolExecutor進程池

何時用池:
池的功能是限制啓動的進程數或線程數,
何時應該限制???
當併發的任務數遠遠超過了計算機的承受能力時,即沒法一次性開啓過多的進程數或線程數時
就應該用池的概念將開啓的進程數或線程數限制在計算機可承受的範圍內併發

Python3.2開始,標準庫爲咱們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的進一步抽象,對編寫線程池/進程池提供了直接的支持。dom

經過ProcessPoolExecutor 來作示例。 
咱們來看一個最簡單的進程池異步

複製代碼
 1 from concurrent.futures import ProcessPoolExecutor  2 import time  3 def task(name):  4 print('name',name)  5 time.sleep(1)  6 if __name__ == '__main__':  7 start=time.time()  8 p1=ProcessPoolExecutor(2)  9 for i in range(5): 10  p1.submit(task,i) 11 p1.shutdown(wait=True) 12 print('') 13 end=time.time() 14 print(end-start)
複製代碼

輸出以下:ide

複製代碼
 1 D:\APPS\Python3.7\python.exe "D:/Python/project one/day20180717/進程池與線程池.py"  2 name 0  3 name 1  4 name 2  5 name 3  6 name 4  7  8 3.118098258972168  9 10 Process finished with exit code 0
複製代碼

簡單解釋下: 
ProcessPoolExecutor(2)建立一個進程池,容量爲2,循環submit出5個進程,而後就在線程池隊列裏面,執行多個進程,p1.shutdown(wait=True)意思是進程都執行完畢,在執行主進程的內容函數

使用shutdown

p1.shutdown(wait=True)是進程池內部的進程都執行完畢,纔會關閉,而後執行後續代碼 
若是改爲false呢?看以下代碼post

複製代碼
 1 from concurrent.futures import ProcessPoolExecutor  2 import time  3 def task(name):  4 print('name',name)  5 time.sleep(1)  6 if __name__ == '__main__':  7 start=time.time()  8 p1=ProcessPoolExecutor(2)  9 for i in range(5): 10  p1.submit(task,i) 11 p1.shutdown(wait=False) 12 print('') 13 end=time.time() 14 print(end-start)
複製代碼

輸出以下:學習

複製代碼
 1 D:\APPS\Python3.7\python.exe "D:/Python/project one/day20180717/進程池與線程池.py"  2  3 0.008975744247436523  4 name 0  5 name 1  6 name 2  7 name 3  8 name 4  9 10 Process finished with exit code 0
複製代碼

使用submit同步調用

同步:提交完任務後就在原地等待,直到任務運行完畢而且拿到返回值後,才運行下一行代碼

複製代碼
from concurrent.futures import ProcessPoolExecutor import time, random, os def piao(name, n): print('%s is piaoing %s' % (name, os.getpid())) time.sleep(1) return n ** 2 if __name__ == '__main__': p = ProcessPoolExecutor(2) start = time.time() for i in range(5): res=p.submit(piao,'safly %s' %i,i).result() #同步調用 print(res) p.shutdown(wait=True) print('', os.getpid()) stop = time.time() print(stop - start)
複製代碼
複製代碼
 1 D:\APPS\Python3.7\python.exe "D:/Python/project one/day20180717/進程池與線程池.py"  2 safly 0 is piaoing 11448  3 0  4 safly 1 is piaoing 11800  5 1  6 safly 2 is piaoing 11448  7 4  8 safly 3 is piaoing 11800  9 9 10 safly 4 is piaoing 11448 11 16 12 主 8516 13 5.095325946807861 14 15 Process finished with exit code 0
複製代碼

使用submit異步調用

異步:提交完任務(綁定一個回調函數)後不原地等待,直接運行下一行代碼,等到任務運行有返回值自動觸發回調的函數的運行

複製代碼
 1 from concurrent.futures import ThreadPoolExecutor  2 import time  3 def task(name):  4 print('name',name)  5 time.sleep(1)  6 if __name__ == '__main__':  7 start=time.time()  8 p1=ThreadPoolExecutor(2)  9 for i in range(5): 10  p1.submit(task,i) 11 p1.shutdown(wait=True) 12 print('') 13 end=time.time() 14 print(end-start)
複製代碼
複製代碼
1 D:\APPS\Python3.7\python.exe "D:/Python/project one/day20180717/進程池與線程池.py" 2 name 0 3 name 1 4 name 2 5 name 3 6 name 4 7 8 3.003053903579712
複製代碼

使用回調函數+異步

進程
複製代碼
# from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor # import os # import time # import random # # def task(n): # print('%s run...' %os.getpid()) # time.sleep(5) # return n**2 # # def parse(future): # time.sleep(1) # res=future.result() # print('%s 處理了 %s' %(os.getpid(),res)) # # if __name__ == '__main__': # pool=ProcessPoolExecutor(4) # # pool.submit(task,1) # # pool.submit(task,2) # # pool.submit(task,3) # # pool.submit(task,4) # # start=time.time() # for i in range(1,5): # future=pool.submit(task,i) # future.add_done_callback(parse) # parse會在futrue有返回值時馬上觸發,而且將future看成參數傳給parse # pool.shutdown(wait=True) # stop=time.time() # print('主',os.getpid(),(stop - start))
複製代碼
複製代碼
 1 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor  2 from threading import current_thread  3 import os  4 import time  5 import random  6  7 def task(n):  8 print('%s run...' %current_thread().name)  9 time.sleep(5) 10 return n**2 11 12 def parse(future): 13 time.sleep(1) 14 res=future.result() 15 print('%s 處理了 %s' %(current_thread().name,res)) 16 17 if __name__ == '__main__': 18 pool=ThreadPoolExecutor(4) 19 start=time.time() 20 for i in range(1,5): 21 future=pool.submit(task,i) 22 future.add_done_callback(parse) # parse會在futrue有返回值時馬上觸發,而且將future看成參數傳給parse 23 pool.shutdown(wait=True) 24 stop=time.time() 25 print('',current_thread().name,(stop - start))
複製代碼

併發實現套接字通訊

服務端
客戶端

擴展:

回調函數(callback)是什麼?

如下均來自知乎:

相關文章
相關標籤/搜索