Python中的進程池與線程池

  • 引入進程池與線程池

  • 使用ProcessPoolExecutor進程池,使用ThreadPoolExecutor

  • 使用shutdown

  • 使用submit同步調用

  • 使用submit異步調用

  • 異步+回調函數

  • 併發實現套接字通訊

引入進程池

在學習線程池以前,咱們先看一個例子python

 1 # from multiprocessing import Process
 2 # import time
 3 #
 4 # def task(name):
 5 #     print('name',name)
 6 #     time.sleep(1)
 7 # if __name__ == '__main__':
 8 #     start=time.time()
 9 #     p1 = Process(target=task,args=("safly1",))
10 #     p2 = Process(target=task, args=("safly2",))
11 #     p3 = Process(target=task, args=("safly3",))
12 #
13 #     p1.start()
14 #     p2.start()
15 #     p3.start()
16 #
17 #     p1.join()
18 #     p2.join()
19 #     p3.join()
20 #
21 #     print("main")
22 #
23 #     end = time.time()
24 #     print(end- start)

輸出以下:服務器

 

 以上的方式是一個個建立進程,這樣的耗費時間才1秒多,雖然高效,可是有什麼弊端呢? 
若是併發很大的話,會給服務器帶來很大的壓力,因此引入了進程池的概念併發

使用ProcessPoolExecutor進程池

何時用池:
池的功能是限制啓動的進程數或線程數,
何時應該限制???
當併發的任務數遠遠超過了計算機的承受能力時,即沒法一次性開啓過多的進程數或線程數時
就應該用池的概念將開啓的進程數或線程數限制在計算機可承受的範圍內dom

Python3.2開始,標準庫爲咱們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實現了對threading和multiprocessing的進一步抽象,對編寫線程池/進程池提供了直接的支持。異步

經過ProcessPoolExecutor 來作示例。 
咱們來看一個最簡單的進程池socket

 1 from concurrent.futures import ProcessPoolExecutor
 2 import time
 3 def task(name):
 4     print('name',name)
 5     time.sleep(1)
 6 if __name__ == '__main__':
 7     start=time.time()
 8     p1=ProcessPoolExecutor(2)
 9     for i in range(5):
10         p1.submit(task,i)
11     p1.shutdown(wait=True)
12     print('')
13     end=time.time()
14     print(end-start)

輸出以下:ide

 1 D:\APPS\Python3.7\python.exe "D:/Python/project one/day20180717/進程池與線程池.py"
 2 name 0
 3 name 1
 4 name 2
 5 name 3
 6 name 4
 7  8 3.118098258972168
 9 
10 Process finished with exit code 0

簡單解釋下: 
ProcessPoolExecutor(2)建立一個進程池,容量爲2,循環submit出5個進程,而後就在線程池隊列裏面,執行多個進程,p1.shutdown(wait=True)意思是進程都執行完畢,在執行主進程的內容函數

使用shutdown

p1.shutdown(wait=True)是進程池內部的進程都執行完畢,纔會關閉,而後執行後續代碼 
若是改爲false呢?看以下代碼學習

 1 from concurrent.futures import ProcessPoolExecutor
 2 import time
 3 def task(name):
 4     print('name',name)
 5     time.sleep(1)
 6 if __name__ == '__main__':
 7     start=time.time()
 8     p1=ProcessPoolExecutor(2)
 9     for i in range(5):
10         p1.submit(task,i)
11     p1.shutdown(wait=False)
12     print('')
13     end=time.time()
14     print(end-start)

輸出以下:spa

 1 D:\APPS\Python3.7\python.exe "D:/Python/project one/day20180717/進程池與線程池.py"
 2  3 0.008975744247436523
 4 name 0
 5 name 1
 6 name 2
 7 name 3
 8 name 4
 9 
10 Process finished with exit code 0

使用submit同步調用

同步:提交完任務後就在原地等待,直到任務運行完畢而且拿到返回值後,才運行下一行代碼

from concurrent.futures import ProcessPoolExecutor
import time, random, os

def piao(name, n):
    print('%s is piaoing %s' % (name, os.getpid()))
    time.sleep(1)
    return n ** 2


if __name__ == '__main__':
    p = ProcessPoolExecutor(2)
    start = time.time()
    for i in range(5):
        res=p.submit(piao,'safly %s' %i,i).result() #同步調用
        print(res)

    p.shutdown(wait=True)
    print('', os.getpid())

    stop = time.time()
    print(stop - start)
 1 D:\APPS\Python3.7\python.exe "D:/Python/project one/day20180717/進程池與線程池.py"
 2 safly 0 is piaoing 11448
 3 0
 4 safly 1 is piaoing 11800
 5 1
 6 safly 2 is piaoing 11448
 7 4
 8 safly 3 is piaoing 11800
 9 9
10 safly 4 is piaoing 11448
11 16
12 主 8516
13 5.095325946807861
14 
15 Process finished with exit code 0

使用submit異步調用

異步:提交完任務(綁定一個回調函數)後不原地等待,直接運行下一行代碼,等到任務運行有返回值自動觸發回調的函數的運行

 1 from concurrent.futures import ThreadPoolExecutor
 2 import time
 3 def task(name):
 4     print('name',name)
 5     time.sleep(1)
 6 if __name__ == '__main__':
 7     start=time.time()
 8     p1=ThreadPoolExecutor(2)
 9     for i in range(5):
10         p1.submit(task,i)
11     p1.shutdown(wait=True)
12     print('')
13     end=time.time()
14     print(end-start)
簡單小例子
1 D:\APPS\Python3.7\python.exe "D:/Python/project one/day20180717/進程池與線程池.py"
2 name 0
3 name 1
4 name 2
5 name 3
6 name 4
7 8 3.003053903579712
結果

使用回調函數+異步

進程
# from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
# import os
# import time
# import random
#
# def task(n):
#     print('%s run...' %os.getpid())
#     time.sleep(5)
#     return n**2
#
# def parse(future):
#     time.sleep(1)
#     res=future.result()
#     print('%s 處理了 %s' %(os.getpid(),res))
#
# if __name__ == '__main__':
#     pool=ProcessPoolExecutor(4)
#     # pool.submit(task,1)
#     # pool.submit(task,2)
#     # pool.submit(task,3)
#     # pool.submit(task,4)
#
#     start=time.time()
#     for i in range(1,5):
#         future=pool.submit(task,i)
#         future.add_done_callback(parse) # parse會在futrue有返回值時馬上觸發,而且將future看成參數傳給parse
#     pool.shutdown(wait=True)
#     stop=time.time()
#     print('主',os.getpid(),(stop - start))

 1 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
 2 from threading import current_thread
 3 import os
 4 import time
 5 import random
 6 
 7 def task(n):
 8     print('%s run...' %current_thread().name)
 9     time.sleep(5)
10     return n**2
11 
12 def parse(future):
13     time.sleep(1)
14     res=future.result()
15     print('%s 處理了 %s' %(current_thread().name,res))
16 
17 if __name__ == '__main__':
18     pool=ThreadPoolExecutor(4)
19     start=time.time()
20     for i in range(1,5):
21         future=pool.submit(task,i)
22         future.add_done_callback(parse) # parse會在futrue有返回值時馬上觸發,而且將future看成參數傳給parse
23     pool.shutdown(wait=True)
24     stop=time.time()
25     print('',current_thread().name,(stop - start))
線程

併發實現套接字通訊

 1 from socket import *
 2 from threading import Thread
 3 
 4 def talk(conn):
 5     while True:
 6         try:
 7             data=conn.recv(1024)
 8             if len(data) == 0:break
 9             conn.send(data.upper())
10         except ConnectionResetError:
11             break
12     conn.close()
13 
14 def server(ip,port,backlog=5):
15     server = socket(AF_INET, SOCK_STREAM)
16     server.bind((ip, port))
17     server.listen(backlog)
18 
19     print('starting...')
20     while True:
21         conn, addr = server.accept()
22 
23         t = Thread(target=talk, args=(conn,))
24         t.start()
25 
26 if __name__ == '__main__':
27     server('127.0.0.1',8080)
服務端
 1 from socket import *
 2 import os
 3 
 4 client=socket(AF_INET,SOCK_STREAM)
 5 client.connect(('127.0.0.1',8080))
 6 
 7 while True:
 8     msg='%s say hello' %os.getpid()
 9     client.send(msg.encode('utf-8'))
10     data=client.recv(1024)
11     print(data.decode('utf-8'))
客戶端

擴展:

回調函數(callback)是什麼?

如下均來自知乎:

 

回調函數(callback)是什麼? - no.body的回答 - 知乎 https://www.zhihu.com/question/19801131/answer/27459821

很是經典的回答加舉例。

相關文章
相關標籤/搜索