並行 : 並行是指二者同時執行,好比賽跑,兩我的都在不停的往前跑;(資源夠用,好比三個線程,四核的CPU )
併發 : 併發是指資源有限的狀況下,二者交替輪流使用資源,好比一段路(單核CPU資源)同時只能過一我的,A走一段後,讓給B,B用完繼續給A ,交替使用,目的是提升效率。
區別:
並行是從微觀上,也就是在一個精確的時間片刻,有不一樣的程序在執行,這就要求必須有多個處理器。
併發是從宏觀上,在一個時間段上能夠看出是同時執行的,好比一個服務器同時處理多個session。
這張圖很關鍵html
所謂同步:就是一個任務的完成須要依賴另一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成,這是一種可靠的任務序列
。要麼成功都成功,失敗都失敗,兩個任務的狀態能夠保持一致。 所謂異步:是不須要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工做,依賴的任務也當即執行,只要本身完成了整個任務就算完成了
。至於被依賴的任務最終是否真正完成,依賴它的任務沒法肯定,因此它是不可靠的任務序列
。好比我去銀行辦理業務,可能會有兩種方式:
第一種 :選擇排隊等候;
第二種 :選擇取一個小紙條上面有個人號碼,等到排到我這一號時由櫃檯的人通知我輪到我去辦理業務了;
第一種:前者(排隊等候)就是同步等待消息通知,也就是我要一直在等待銀行辦理業務狀況;
第二種:後者(等待別人通知)就是異步等待消息通知。在異步消息處理中,等待消息通知者(在這個例子中就是等待辦理業務的人)每每註冊一個回調機制,在所等待的事件被觸發時由觸發機制(在這裏是櫃檯的人)經過某種機制(在這裏是寫在小紙條上的號碼,喊號)找到等待該事件的人。
阻塞和非阻塞這兩個概念與程序(線程)等待消息通知(無所謂同步或者異步)時的狀態有關。也就是說阻塞與非阻塞主要是程序(線程)等待消息通知時的狀態角度來講的
1 Process([group [, target [, name [, args [, kwargs]]]]]),由該類實例化獲得的對象,表示一個子進程中的任務(還沒有啓動) 2 3 強調: 4 1. 須要使用關鍵字的方式來指定參數 5 2. args指定的爲傳給target函數的位置參數,是一個元組形式,必須有逗號 6 7 參數介紹: 8 1 group參數未使用,值始終爲None 9 2 target表示調用對象,即子進程要執行的任務 10 3 args表示調用對象的位置參數元組,args=(1,2,'egon',) 11 4 kwargs表示調用對象的字典,kwargs={'name':'egon','age':18} 12 5 name爲子進程的名稱
實例代碼:python
1 # 在python進程中開啓一個子進程 2 import time 3 from multiprocessing import Process 4 5 6 def f(name): 7 print("hello", name) 8 print("我是子進程") 9 10 11 if __name__ == '__main__': 12 p = Process(target=f, args=("pontoon", )) 13 p.start() 14 time.sleep(1) 15 print("執行主進程的內容了") 16 17 >>>hello pontoon 18 我是子進程 19 執行主進程的內容了
1 import os 2 import time 3 from multiprocessing import Process 4 5 6 def func(): 7 print(54321) 8 time.sleep(1) 9 print("子進程:", os.getpid()) 10 print("子進程的父進程:", os.getppid()) 11 print(12345) 12 13 14 if __name__ == '__main__': 15 p = Process(target=func) 16 p.start() 17 print("*" * 20) 18 print("父進程:", os.getpid()) # 查看當前進程的進程號 19 print("父進程的父進程", os.getppid()) 20 21 22 >>>******************** 23 父進程: 2972 24 父進程的父進程 14060 25 54321 26 子進程: 13992 27 子進程的父進程: 2972 28 12345
進程中join()方法的用處,感知一個子進程的結束,將異步的程序變成同步的程序,在join()方法調用以前,子進程與主進程都是異步的,可是當調用了join()方法以後,那麼下面的代碼就變成了同步!web
1 import os 2 import time 3 from multiprocessing import Process 4 5 6 def func(): 7 print(54321) 8 time.sleep(1) 9 print("子進程:", os.getpid()) 10 print("子進程的父進程:", os.getppid()) 11 print(12345) 12 13 14 if __name__ == '__main__': 15 p = Process(target=func) 16 p.start() 17 p.join() # 將異步的程序變成同步的 18 print("*" * 20) 19 print("進程結束了") # 查看當前進程的進程號 20 21 22 # 代碼彷佛這樣執行纔是正常的,這就是join()方法的做用 23 >>>54321 24 子進程: 10472 25 子進程的父進程: 8692 26 12345 27 ******************** 28 進程結束了
1 def func(a, b): 2 print(a) 3 time.sleep(3) 4 print(b) 5 6 7 if __name__ == '__main__': 8 p = Process(target=func, args=(10, 20)) 9 p.start() 10 p = Process(target=func, args=(11, 21)) 11 p.start() 12 p = Process(target=func, args=(12, 22)) 13 p.start() 14 p = Process(target=func, args=(13, 23)) 15 p.start() 16 17 print("*" * 20) 18 # join()之上的程序是異步的,join()以後的方法變成了同步 19 p.join() 20 print("進程結束了") # 查看當前進程的進程號 21 22 23 >>>******************** 24 10 25 11 26 12 27 13 28 20 29 21 30 22 31 23 32 進程結束了
for 循環配合p.join()方法值得注意的地方算法
1 import os 2 import time 3 from multiprocessing import Process 4 5 6 def func(a, b): 7 print("*" * a) 8 time.sleep(3) 9 print("*" * b) 10 11 12 if __name__ == '__main__': 13 14 for i in range(10): 15 p = Process(target=func, args=(5, 10)) 16 p.start() 17 18 print("*" * 20) 19 # join()之上的程序是異步的,join()以後的方法變成了同步 20 p.join() 21 print("進程結束了") # 查看當前進程的進程號 22 23 >>>***** 24 ***** 25 ***** 26 ***** 27 ***** 28 ***** 29 ***** 30 ******************** 31 ***** 32 ***** 33 ***** 34 ********** 35 ********** 36 ********** 37 ********** 38 ********** 39 ********** 40 ********** 41 ********** 42 進程結束了 # 出問題了 43 ********** 44 **********
1 import time 2 from multiprocessing import Process 3 4 5 def func(a, b): 6 print("*" * a) 7 time.sleep(6) 8 print("*" * b) 9 10 11 if __name__ == '__main__': 12 p_list = [] 13 for i in range(5): 14 p = Process(target=func, args=(5, 10)) 15 p_list.append(p) 16 p.start() 17 [p.join() for p in p_list] 18 print("進程結束了") # 查看當前進程的進程號 19 20 >>>***** 21 ***** 22 ***** 23 ***** 24 ***** 25 ********** 26 ********** 27 ********** 28 ********** 29 ********** 30 進程結束了
1 # 需求想500個文件裏面寫數據,用異步的方式實現 2 import os 3 from multiprocessing import Process 4 5 6 def func(file_name, contents): 7 with open(file_name, 'w') as f: 8 f.write(contents * '+') 9 10 11 if __name__ == '__main__': 12 p_list = [] 13 for i in range(5): 14 p = Process(target=func, args=("info{0}".format(i), i)) 15 p_list.append(p) 16 p.start() 17 [p.join() for p in p_list] 18 print([i for i in os.walk(r'G:\進線程')])
1 import os 2 from multiprocessing import Process 3 4 5 # 開啓多進程的第二種方式 6 class MyProcess(Process): 7 def run(self): 8 print(os.getpid()) 9 10 11 if __name__ == '__main__': 12 print("主:", os.getpid()) 13 p1 = MyProcess() 14 p1.start() 15 p2 = MyProcess() 16 p2.start() 17 18 >>>主: 7824 19 12324 20 14660
1 # Process 類中的屬性一覽 2 3 class Process(object): 4 def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): 5 self.name = '' # 進程名 6 self.daemon = False # 7 self.authkey = None # 8 self.exitcode = None 9 self.ident = 0 10 self.pid = 0 # 進程號 11 self.sentinel = None 12 self.is_alive = '' # 判斷子進程是否活着 13 self.terminate = '' # 結束一個子進程
1 import os 2 from multiprocessing import Process 3 4 5 # 開啓多進程的第二種方式 6 class MyProcess(Process): 7 def __init__(self, args1, args2): 8 super(MyProcess, self).__init__() 9 self.arg1 = args1 10 self.arg2 = args2 11 12 def run(self): 13 print(self.pid) 14 print(self.name) 15 print(self.arg1) 16 print(self.arg2) 17 18 19 if __name__ == '__main__': 20 print("主:", os.getpid()) 21 p1 = MyProcess(10, 20) 22 p1.start() 23 p2 = MyProcess(11, 22) 24 p2.start() 25 26 >>>主: 6916 27 9288 28 MyProcess-1 29 10 30 20 31 7412 32 MyProcess-2 33 11 34 22
進程與進程之間數據是隔離的json
父進程與子進程之間的數據也是隔離的windows
1 from multiprocessing import Process 2 3 def work(): 4 global n 5 n=0 6 print('子進程內: ',n) 7 8 9 if __name__ == '__main__': 10 n = 100 11 p=Process(target=work) 12 p.start() 13 p.join() # 執行玩子進程以後在執行父進程 14 print('主進程內: ',n) 15 16 >>>子進程內: 0 17 主進程內: 100 18 19 # 在子進程中設置了全局變量可是在主進程中的n的值並無發生改變,得出結論:主進程與子進程之間的數據也是隔離的
# server端 from socket import * from multiprocessing import Process server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': #windows下start進程必定要寫到這下面 while True: conn,client_addr=server.accept() p=Process(target=talk,args=(conn,client_addr)) p.start()
# client端 from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
守護進程是一種新的進程,本身開的一個子進程轉換成的。當在一個主進程中開啓子進程的時候,想要子進程隨着主進程的結束而結束,那麼就要將子進程變成一個守護進程。安全
import time from multiprocessing import Process # 守護進程 def func(): while 1: time.sleep(0.5) print('2222') if __name__ == '__main__': p = Process(target=func) # p.deamon = True p.start() i = 0 while i < 4: print('11111111') time.sleep(1) i += 1 # 查看輸出的結果 >>>11111111 2222 11111111 2222 2222 11111111 2222 2222 11111111 2222 2222 2222 2222 2222 ...
上面的2會一直執行下去。這顯然不是咱們但願看見的效果,我但願當主進程執行結束以後,子進程也會跟着結束。守護進程就是作這件事的。如何來設置守護進程 。很簡單加上一段代碼!服務器
1 import time 2 from multiprocessing import Process 3 4 5 # 守護進程 6 def func(): 7 while 1: 8 time.sleep(0.5) 9 print('2222') 10 11 12 if __name__ == '__main__': 13 p = Process(target=func) 14 p.daemon = True # 在start以前加 15 p.start() 16 i = 0 17 while i < 4: 18 print('11111111') 19 time.sleep(1) 20 i += 1 21 22 >>>11111111 23 2222 24 11111111 25 2222 26 2222 27 11111111 28 2222 29 2222 30 11111111 31 2222 32 2222
改進:添加守護進程網絡
1 import time 2 from multiprocessing import Process 3 4 5 def func(): 6 while 1: 7 time.sleep(0.5) 8 print('2222') 9 10 11 if __name__ == '__main__': 12 p = Process(target=func) 13 p.daemon = True # 在start以前加 14 p.start() 15 i = 0 16 while i < 4: 17 print('11111111') 18 time.sleep(1) 19 i += 1 20 21 >>>11111111 22 2222 23 11111111 24 2222 25 2222 26 11111111 27 2222 28 2222 29 11111111 30 2222 31 2222
有上面的現象引出結論:session
進程鎖適用於多進程的程序,就是將異步的程序變成同步的
模擬一個現象來引出鎖的概念。
春運到了你們都在搶火車票。100我的搶一張火車票(併發的過程)。但最後只能賣出去一張(心塞~)。
買票的過程用代碼來表示就是這樣的
#文件db的內容爲:{"count":1} #注意必定要用雙引號,否則json沒法識別 #併發運行,效率高,但競爭寫同一文件,數據寫入錯亂 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db')) print('\033[43m剩餘票數%s\033[0m' %dic['count']) def get(): dic=json.load(open('db')) time.sleep(0.1) #模擬讀數據的網絡延遲 if dic['count'] >0: dic['count']-=1 time.sleep(0.2) #模擬寫數據的網絡延遲 json.dump(dic,open('db','w')) print('\033[43m購票成功\033[0m') def task(): search() get() if __name__ == '__main__': for i in range(100): #模擬併發100個客戶端搶票 p=Process(target=task) p.start()
引出概念爲子進程加鎖
# 由併發變成了串行,犧牲了運行效率,但避免了競爭 import random from multiprocessing import Process,Lock def func(n, l): l.acquire() time.sleep(random.random()) print("爲子線程加鎖了%s" % n) l.release() if __name__ == '__main__': l = Lock() for i in range(3): p = Process(target=func, args=(i, l)) p.start() >>>爲子線程加鎖了0 爲子線程加鎖了1 爲子線程加鎖了2 # -------------------------------------------------------------- def func(n): time.sleep(random.random()) print("爲子線程加鎖了%s" % n) if __name__ == '__main__': for i in range(3): p = Process(target=func, args=(i)) p.start() >>>爲子線程加鎖了1 爲子線程加鎖了2 爲子線程加鎖了0 # 對比兩段代碼的執行結果——加鎖使得子進程變成了‘有序’的狀態了
加鎖實現春運買票
1 #文件db的內容爲:{"count":5} 2 #注意必定要用雙引號,否則json沒法識別 3 #併發運行,效率高,但競爭寫同一文件,數據寫入錯亂 4 from multiprocessing import Process,Lock 5 import time,json,random 6 def search(): 7 dic=json.load(open('db')) 8 print('\033[43m剩餘票數%s\033[0m' %dic['count']) 9 10 def get(): 11 dic=json.load(open('db')) 12 time.sleep(random.random()) #模擬讀數據的網絡延遲 13 if dic['count'] >0: 14 dic['count']-=1 15 time.sleep(random.random()) #模擬寫數據的網絡延遲 16 json.dump(dic,open('db','w')) 17 print('\033[32m購票成功\033[0m') 18 else: 19 print('\033[31m購票失敗\033[0m') 20 21 def task(lock): 22 search() 23 lock.acquire() 24 get() 25 lock.release() 26 27 if __name__ == '__main__': 28 lock = Lock() 29 for i in range(100): #模擬併發100個客戶端搶票 30 p=Process(target=task,args=(lock,)) 31 p.start()
鎖的總結
1 #加鎖能夠保證多個進程修改同一塊數據時,同一時間只能有一個任務能夠進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了數據安全。 2 雖然能夠用文件共享數據實現進程間通訊,但問題是: 3 1.效率低(共享數據基於文件,而文件是硬盤上的數據) 4 2.須要本身加鎖處理 5 6 #所以咱們最好找尋一種解決方案可以兼顧:一、效率高(多個進程共享一塊內存的數據)二、幫咱們處理好鎖問題。這就是mutiprocessing模塊爲咱們提供的基於消息的IPC通訊機制:隊列和管道。 7 隊列和管道都是將數據存放於內存中 8 隊列又是基於(管道+鎖)實現的,可讓咱們從複雜的鎖問題中解脫出來, 9 咱們應該儘可能避免使用共享數據,儘量使用消息傳遞和隊列,避免處理複雜的同步和鎖問題,並且在進程數目增多時,每每能夠得到更好的可獲展性。
互斥鎖同時只容許一個線程更改數據,而信號量Semaphore是同時容許必定數量的線程更改數據 。
1 import time 2 import random 3 from multiprocessing import Process 4 from multiprocessing import Semaphore 5 6 7 # 信號量 8 def ktv(i, sem): 9 sem.acquire() # 獲取鑰匙 10 print('%s走進KTV' % i) 11 time.sleep(random.randint(1, 5)) 12 print('%s走出了KTV' % i) 13 sem.release() 14 15 16 if __name__ == '__main__': 17 sem = Semaphore(4) # 設置4個鎖 18 for i in range(20): 19 p = Process(target=ktv, args=(i, sem)) 20 p.start() 21 22 >>>0走進KTV 23 1走進KTV 24 2走進KTV 25 3走進KTV 26 1走出了KTV 27 4走進KTV 28 2走出了KTV 29 6走進KTV 30 0走出了KTV 31 ...
感受有點意思
1 from multiprocessing import Event 2 3 4 # 一個信號可使全部的進程都進入阻塞狀態,也能夠控制全部的進程接觸阻塞。 5 # 一個事件被建立以後默認是阻塞狀態 6 e = Event() # 建立了一個事件 7 print(e.is_set()) # 查看是否阻塞,默認是阻塞狀態的 8 e.set() # 設置阻塞 9 print(e.is_set()) 10 e.wait() # 根據e.is_set()的值決定是否阻塞 11 print("wahaha") 12 e.clear() # 清除阻塞 13 print(e.is_set()) 14 e.wait() 15 print('shuangwaiwai') # 阻塞了就不會在打印了
1 from multiprocessing import Event 2 3 4 def cars(e, i): 5 if not e.is_set(): 6 print('car%i在等待' % i) 7 e.wait() 8 print("car%i經過" % i) 9 10 11 # 紅綠燈事件 12 def light(e): 13 while True: 14 if e.is_set(): 15 e.clear() 16 print("紅燈亮了") 17 else: 18 e.set() 19 print('綠燈亮了') 20 time.sleep(2) 21 22 23 if __name__ == '__main__': 24 e = Event() 25 traffic = Process(target=light, args=(e, )) 26 traffic.start() 27 for i in range(20): 28 car = Process(target=cars, args=(e, i)) 29 car.start() 30 time.sleep(random.random())
1 from multiprocessing import Queue, Process 2 3 4 def produce(q): 5 q.put("hello") # 向對列中放入數據 6 7 8 def consume(q): 9 print(q.get()) # 向隊列中取數據 10 11 12 if __name__ == '__main__': 13 q = Queue() 14 p = Process(target=produce, args=(q,)) 15 p.start() 16 c = Process(target=consume, args=(q, )) 17 c.start()
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
1 from multiprocessing import Process, Queue 2 import random 3 import time 4 5 6 def consumer(q, name): 7 # 定義消費者 8 while True: 9 food = q.get() 10 if food is None: 11 print("%s獲取到了一個空" % name) 12 break 13 print("%s消費了%s" % (name, food)) 14 time.sleep(random.randint(1, 3)) 15 16 17 def produce(name, food, q): 18 # 定義生產者 19 for i in range(10): 20 time.sleep(random.randint(1, 3)) 21 f = "{0}生產了{1}{2}".format(name, food, i) 22 print(f) 23 q.put(f) 24 25 26 if __name__ == '__main__': 27 q = Queue(10) 28 p1 = Process(target=produce, args=("lee", "包子", q)) 29 p2 = Process(target=produce, args=("dan", "泔水", q)) 30 c1 = Process(target=consumer, args=(q, 'pon')) 31 c2 = Process(target=consumer, args=(q, 'toon')) 32 p1.start() 33 p2.start() 34 c1.start() 35 c2.start() 36 p1.join() 37 p2.join() 38 q.put(None) 39 q.put(None)
1 # 升級版 2 from multiprocessing import Process, JoinableQueue 3 import random 4 import time 5 6 7 def consumer(q, name): 8 while True: 9 food = q.get() 10 print("%s消費了%s" % (name, food)) 11 time.sleep(random.randint(1, 3)) 12 q.task_done() # 使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。 13 14 15 def produce(name, food, q): 16 # 定義生產者 17 for i in range(10): 18 time.sleep(random.randint(1, 3)) 19 f = "{0}生產了{1}{2}".format(name, food, i) 20 print(f) 21 q.put(f) 22 q.join() # 當隊列中全部的包子都被消費玩了,程序執行完 23 24 25 if __name__ == '__main__': 26 q = JoinableQueue(20) 27 p1 = Process(target=produce, args=("lee", "包子", q)) 28 p2 = Process(target=produce, args=("dan", "泔水", q)) 29 c1 = Process(target=consumer, args=(q, 'pon')) 30 c2 = Process(target=consumer, args=(q, 'toon')) 31 p1.start() 32 p2.start() 33 c1.daemon = True # 建立一個守護進程 循環的案例中,隨着主進程的結束,子進程也會跟着結束 因此他的做用是判斷 主進程是否結束 34 c2.daemon = True # 建立一個守護進程 循環的案例中,隨着主進程的結束,子進程也會跟着結束 35 c1.start() 36 c2.start() 37 p1.join() # join執行完,那麼主進程就執行完 38 p2.join() # join執行完,那麼主進程就執行完
管道是什麼?
Linux進程間通訊方式的一種,管道有兩端,讀端和寫端。建立管道,而後從父進程fork出子進程,
父進程和子進程擁有共同的讀寫文件描述符,能夠實現子進程寫文件,父進程讀文件的操做。
1 # 管道 2 from multiprocessing import Pipe, Process 3 4 5 def func(conn1, conn2): 6 conn2.close() 7 while True: 8 try: 9 msg = conn1.recv() 10 print(msg) 11 except EOFError: 12 conn1.close() 13 break 14 15 16 if __name__ == '__main__': 17 conn1, conn2 = Pipe() 18 Process(target=func, args=(conn1, conn2)).start() 19 conn1.close() 20 for i in range(20): 21 conn2.send("吃了麼") 22 conn2.close()
1 from multiprocessing import Manager, Process 2 3 4 def main(dic): 5 dic['count'] -= 1 6 print(dic) 7 8 9 if __name__ == '__main__': 10 m = Manager() 11 dic = m.dict({'count': 100}) 12 p_lst = [] 13 p = Process(target=main, args=(dic, )) 14 p.start() 15 p.join() 16 print('主線程:', dic) 17 18 19 >>>{'count': 99} 20 主線程: {'count': 99}
上面的作到了數據共享,but:
1 from multiprocessing import Manager, Process 2 3 4 def main(dic): 5 dic['count'] -= 1 6 7 8 if __name__ == '__main__': 9 m = Manager() 10 dic = m.dict({'count': 100}) 11 p_list = [] 12 for i in range(30): 13 p = Process(target=main, args=(dic,)) 14 p.start() 15 p_list.append(p) 16 for i in p_list: p.join() 17 print('主程序', dic) 18 19 >>>主程序 {'count': 70}
1 from multiprocessing import Manager, Process, Lock 2 3 4 def main(dic, lock): 5 lock.acquire() 6 dic['count'] -= 1 7 lock.release() 8 9 10 if __name__ == '__main__': 11 m = Manager() 12 lock = Lock() 13 dic = m.dict({'count': 100}) 14 p_list = [] 15 for i in range(30): 16 p = Process(target=main, args=(dic, lock)) 17 p.start() 18 p_list.append(p) 19 for i in p_list: p.join() 20 print('主程序', dic) 21 22 >>>主程序 {'count': 70}
什麼是進程池?爲何要用進程池?
在程序實際處理問題過程當中,忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。那麼在成千上萬個任務須要被執行的時候,咱們就須要去建立成千上萬個進程麼?首先,建立進程須要消耗時間,銷燬進程也須要消耗時間。第二即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,這樣反而會影響程序的效率。所以咱們不能無限制的根據任務開啓或者結束進程。那麼咱們要怎麼作呢?
在這裏,要給你們介紹一個進程池的概念,定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等處處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。這樣不會增長操做系統的調度難度,還節省了開閉進程的時間,也必定程度上可以實現併發效果。
1 # 進程池 2 import time 3 from multiprocessing import Pool, Process 4 5 6 def func(n): 7 for i in range(5): 8 print(n+1) 9 10 11 if __name__ == '__main__': 12 start = time.time() 13 pool = Pool(5) 14 pool.map(func, range(100)) 15 t1 = time.time() - start 16 17 start = time.time() 18 p_List = [] 19 for i in range(100): 20 p = Process(target=func, args=(i, )) 21 p_List.append(p) 22 p.start() 23 for p in p_List: 24 p.join() 25 t2 = time.time() - start 26 print(t1, t2) 27 28 29 >>>0.22115707397460938 6.861854076385498 # 差距是至關的明顯
1 import os, time 2 from multiprocessing import Pool 3 4 5 def func(n): 6 print('start func%s' % n, os.getpid()) 7 time.sleep(3) # 模擬代碼執行 8 print('end func%s' % n, os.getpid()) 9 10 11 if __name__ == '__main__': 12 p = Pool(5) 13 for i in range(10): 14 p.apply_async(func, args=(i, )) 15 p.close() # 結束進程池接收任務 16 p.join() # 感知進程池中的任務執行結束 17 18 >>>start func0 17124 19 start func1 14436 20 start func2 8872 21 start func3 7396 22 start func4 1268 23 end func0 17124 24 start func5 17124 25 end func1 14436 26 start func6 14436 27 end func2 8872 28 start func7 8872 29 end func3 7396 30 start func8 7396 31 end func4 1268 32 start func9 1268 33 end func5 17124 34 end func6 14436 35 end func7 8872 36 end func8 7396 37 end func9 1268
1 # 基於進程池的tcp 2 # client 端 3 import socket 4 5 6 sk = socket.socket() 7 sk.connect(('127.0.0.1', 8080)) 8 9 ret = sk.recv(1024).decode('utf-8') 10 print(ret) 11 msg = input('>>>').encode('utf-8') 12 sk.send(msg) 13 sk.close()
1 # server 端 2 import socket 3 4 5 sk = socket.socket() 6 sk.bind(('127.0.0.1', 8080)) 7 sk.listen() 8 while True: 9 conn, addr = sk.accept() 10 conn.send(b'hello') 11 print(conn.recv(1024).decode('utf-8')) 12 conn.close() 13 sk.close()
1 # server 端 2 import socket 3 from multiprocessing import Pool 4 5 6 def func(conn): 7 conn.send(b'hello') 8 print(conn.recv(1024).decode('utf-8')) 9 conn.close() 10 11 12 if __name__ == '__main__': 13 p = Pool(5) 14 sk = socket.socket() 15 sk.bind(('127.0.0.1', 8080)) 16 sk.listen() 17 while True: 18 conn, addr = sk.accept() 19 p.apply_async(func, args=(conn, )) 20 sk.close()
1 # 進程池的返回值 2 from multiprocessing import Pool 3 def func(i): 4 return i*i 5 6 7 if __name__ == '__main__': 8 p = Pool(5) 9 ret_list = [] 10 for i in range(10): 11 ret = p.apply_async(func, args=(i, )) 12 # print(ret) 獲得的是對象 13 ret_list.append(ret) 14 for ret in ret_list: print(ret.get()) # 等着func的計算結果 15 16 >>>0 17 1 18 4 19 9 20 16 21 25 22 36 23 49 24 64 25 81
1 # 進程池的回調函數 2 from multiprocessing import Pool 3 4 5 def func1(i): 6 print('in func1') 7 return i*i 8 9 def func2(n): 10 print('in func2' ) 11 print(n) 12 13 14 if __name__ == '__main__': 15 p = Pool(5) 16 p.apply_async(func1, args=(10, ), callback=func2) 17 p.close() 18 p.join() 19 20 >>>in func1 21 in func2 22 100
進程池的執行位置
1 # 進程池的回調函數 2 import os 3 from multiprocessing import Pool 4 5 6 def func1(i): 7 print('in func1', os.getpid()) 8 return i*i 9 10 11 def func2(n): 12 print('in func2', os.getpid()) 13 print(n) 14 15 16 if __name__ == '__main__': 17 print('主進程:', os.getpid()) 18 p = Pool(5) 19 p.apply_async(func1, args=(10, ), callback=func2) # 注意這裏回調函數的執行位置,實在主進程中執行的 20 p.close() 21 p.join() 22 23 >>>主進程: 13732 24 in func1 5292 25 in func2 13732 26 100