在學習線程池以前,咱們先看一個例子python
1 # from multiprocessing import Process 2 # import time 3 # 4 # def task(name): 5 # print('name',name) 6 # time.sleep(1) 7 # if __name__ == '__main__': 8 # start=time.time() 9 # p1 = Process(target=task,args=("safly1",)) 10 # p2 = Process(target=task, args=("safly2",)) 11 # p3 = Process(target=task, args=("safly3",)) 12 # 13 # p1.start() 14 # p2.start() 15 # p3.start() 16 # 17 # p1.join() 18 # p2.join() 19 # p3.join() 20 # 21 # print("main") 22 # 23 # end = time.time() 24 # print(end- start)
輸出以下:服務器
以上的方式是一個個建立進程,這樣的耗費時間才1秒多,雖然高效,可是有什麼弊端呢?
若是併發很大的話,會給服務器帶來很大的壓力,因此引入了進程池的概念併發
何時用池:
池的功能是限制啓動的進程數或線程數,
何時應該限制???
當併發的任務數遠遠超過了計算機的承受能力時,即沒法一次性開啓過多的進程數或線程數時
就應該用池的概念將開啓的進程數或線程數限制在計算機可承受的範圍內dom
Python3.2開始,標準庫爲咱們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的進一步抽象,對編寫線程池/進程池提供了直接的支持。異步
經過ProcessPoolExecutor 來作示例。
咱們來看一個最簡單的進程池socket
1 from concurrent.futures import ProcessPoolExecutor 2 import time 3 def task(name): 4 print('name',name) 5 time.sleep(1) 6 if __name__ == '__main__': 7 start=time.time() 8 p1=ProcessPoolExecutor(2) 9 for i in range(5): 10 p1.submit(task,i) 11 p1.shutdown(wait=True) 12 print('主') 13 end=time.time() 14 print(end-start)
輸出以下:ide
1 D:\APPS\Python3.7\python.exe "D:/Python/project one/day20180717/進程池與線程池.py" 2 name 0 3 name 1 4 name 2 5 name 3 6 name 4 7 主 8 3.118098258972168 9 10 Process finished with exit code 0
簡單解釋下:
ProcessPoolExecutor(2)建立一個進程池,容量爲2,循環submit出5個進程,而後就在線程池隊列裏面,執行多個進程,p1.shutdown(wait=True)意思是進程都執行完畢,在執行主進程的內容函數
p1.shutdown(wait=True)是進程池內部的進程都執行完畢,纔會關閉,而後執行後續代碼
若是改爲false呢?看以下代碼學習
1 from concurrent.futures import ProcessPoolExecutor 2 import time 3 def task(name): 4 print('name',name) 5 time.sleep(1) 6 if __name__ == '__main__': 7 start=time.time() 8 p1=ProcessPoolExecutor(2) 9 for i in range(5): 10 p1.submit(task,i) 11 p1.shutdown(wait=False) 12 print('主') 13 end=time.time() 14 print(end-start)
輸出以下:spa
1 D:\APPS\Python3.7\python.exe "D:/Python/project one/day20180717/進程池與線程池.py" 2 主 3 0.008975744247436523 4 name 0 5 name 1 6 name 2 7 name 3 8 name 4 9 10 Process finished with exit code 0
同步:提交完任務後就在原地等待,直到任務運行完畢而且拿到返回值後,才運行下一行代碼
from concurrent.futures import ProcessPoolExecutor import time, random, os def piao(name, n): print('%s is piaoing %s' % (name, os.getpid())) time.sleep(1) return n ** 2 if __name__ == '__main__': p = ProcessPoolExecutor(2) start = time.time() for i in range(5): res=p.submit(piao,'safly %s' %i,i).result() #同步調用 print(res) p.shutdown(wait=True) print('主', os.getpid()) stop = time.time() print(stop - start)
1 D:\APPS\Python3.7\python.exe "D:/Python/project one/day20180717/進程池與線程池.py" 2 safly 0 is piaoing 11448 3 0 4 safly 1 is piaoing 11800 5 1 6 safly 2 is piaoing 11448 7 4 8 safly 3 is piaoing 11800 9 9 10 safly 4 is piaoing 11448 11 16 12 主 8516 13 5.095325946807861 14 15 Process finished with exit code 0
異步:提交完任務(綁定一個回調函數)後不原地等待,直接運行下一行代碼,等到任務運行有返回值自動觸發回調的函數的運行
1 from concurrent.futures import ThreadPoolExecutor 2 import time 3 def task(name): 4 print('name',name) 5 time.sleep(1) 6 if __name__ == '__main__': 7 start=time.time() 8 p1=ThreadPoolExecutor(2) 9 for i in range(5): 10 p1.submit(task,i) 11 p1.shutdown(wait=True) 12 print('主') 13 end=time.time() 14 print(end-start)
1 D:\APPS\Python3.7\python.exe "D:/Python/project one/day20180717/進程池與線程池.py" 2 name 0 3 name 1 4 name 2 5 name 3 6 name 4 7 主 8 3.003053903579712
# from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor # import os # import time # import random # # def task(n): # print('%s run...' %os.getpid()) # time.sleep(5) # return n**2 # # def parse(future): # time.sleep(1) # res=future.result() # print('%s 處理了 %s' %(os.getpid(),res)) # # if __name__ == '__main__': # pool=ProcessPoolExecutor(4) # # pool.submit(task,1) # # pool.submit(task,2) # # pool.submit(task,3) # # pool.submit(task,4) # # start=time.time() # for i in range(1,5): # future=pool.submit(task,i) # future.add_done_callback(parse) # parse會在futrue有返回值時馬上觸發,而且將future看成參數傳給parse # pool.shutdown(wait=True) # stop=time.time() # print('主',os.getpid(),(stop - start))
1 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor 2 from threading import current_thread 3 import os 4 import time 5 import random 6 7 def task(n): 8 print('%s run...' %current_thread().name) 9 time.sleep(5) 10 return n**2 11 12 def parse(future): 13 time.sleep(1) 14 res=future.result() 15 print('%s 處理了 %s' %(current_thread().name,res)) 16 17 if __name__ == '__main__': 18 pool=ThreadPoolExecutor(4) 19 start=time.time() 20 for i in range(1,5): 21 future=pool.submit(task,i) 22 future.add_done_callback(parse) # parse會在futrue有返回值時馬上觸發,而且將future看成參數傳給parse 23 pool.shutdown(wait=True) 24 stop=time.time() 25 print('主',current_thread().name,(stop - start))
1 from socket import * 2 from threading import Thread 3 4 def talk(conn): 5 while True: 6 try: 7 data=conn.recv(1024) 8 if len(data) == 0:break 9 conn.send(data.upper()) 10 except ConnectionResetError: 11 break 12 conn.close() 13 14 def server(ip,port,backlog=5): 15 server = socket(AF_INET, SOCK_STREAM) 16 server.bind((ip, port)) 17 server.listen(backlog) 18 19 print('starting...') 20 while True: 21 conn, addr = server.accept() 22 23 t = Thread(target=talk, args=(conn,)) 24 t.start() 25 26 if __name__ == '__main__': 27 server('127.0.0.1',8080)
1 from socket import * 2 import os 3 4 client=socket(AF_INET,SOCK_STREAM) 5 client.connect(('127.0.0.1',8080)) 6 7 while True: 8 msg='%s say hello' %os.getpid() 9 client.send(msg.encode('utf-8')) 10 data=client.recv(1024) 11 print(data.decode('utf-8'))
擴展:
如下均來自知乎:
回調函數(callback)是什麼? - no.body的回答 - 知乎 https://www.zhihu.com/question/19801131/answer/27459821
很是經典的回答加舉例。