一、生產者消費者模型Queue:隊列(管道+鎖,雙向通訊)安全
爲了解決數據供需不平衡的狀況(即生產者生產的多或者消費者消費的多形成的失衡),有一種解決的方法:子進程生產數據,子進程處理數據。app
生產者消費者模型:dom
一、消費者要處理多少數據是不肯定的ide
二、因此只能用while循環來處理數據,可是while循環沒法結束ui
三、須要生產者發送信號spa
四、有多少個消費者,就須要發送多少個信號操作系統
五、可是發送的信號數量須要根據生產者和消費者的數量進行計算,因此很是不方便。 code
import time import random from multiprocessing import Process from multiprocessing import Queue def producer(q,food): for i in range(5): q.put('%s-%s'%(food,i)) print('生產了%s'%(food)) time.sleep(random.random()) q.put(None) q.put(None) q.put(None) def consumer(q,name): while True: food=q.get() if food == None:break print('%s吃了%s'%(name,food)) if __name__ =='__main__': q=Queue() p1=Process(target=producer,args=(q,'骨頭')) p1.start() p2=Process(target=producer,args=(q,'泔水')) p2.start() p3 = Process(target=consumer, args=(q, 'alex')) p3.start() p4 = Process(target=consumer, args=(q, 'egon')) p4.start() p5 = Process(target=consumer, args=(q, 'jin')) p5.start()
二、joinableQueue:順序爲:生產者生產的數據所有被消費------->生產者進程結束------->主進程執行代碼結束—-->消費者守護進程結束。blog
put和get的一個計數機制,每次get數據以後發送task_done,put端接收到計數-1,直到計數爲0就能感知到。隊列
import time import random from multiprocessing import Process from multiprocessing import JoinableQueue # put q.join # get 處理數據 task_done 消費完了 def producer(q,food): for i in range(5): q.put('%s-%s'%(food,i)) print('生產了%s'%food) # time.sleep(random.randint(1,3)) time.sleep(random.random()) q.join() # 等待 消費者 把全部的數據都處理完 def consumer(q,name): while True: food = q.get() # 生產者不生產仍是生產的慢 print('%s 吃了 %s'%(name,food)) q.task_done() if __name__ == '__main__': q = JoinableQueue() p1 = Process(target=producer,args=(q,'泔水')) p1.start() p2 = Process(target=producer, args=(q, '骨頭')) p2.start() c1 = Process(target=consumer, args=(q, 'alex')) c1.daemon = True c1.start() c2 = Process(target=consumer, args=(q, 'egon')) c2.daemon = True c2.start() c3 = Process(target=consumer, args=(q, 'jin')) c3.daemon = True c3.start() p1.join() # 等待p1執行完畢 p2.join() # 等待p2執行完畢
三、ipc(inter-process-commnication內部進程交流)——
pipe(管道):支持雙向通訊,數據不安全。
首先來看管道是怎麼用的
from multiprocessing import Pipe p1,p2=Pipe() p1.send('hello') #一個發送,一個接收 print(p2.recv()) print('————————————————————————') p2.send('hihihi') p2.close() #關閉p2發送端 print(p1.recv()) #print(p1.recv()) #放開的話,繼續接收,會報錯,報錯類型:EOFError
管道在多進程中的做用:主進程中發送到管道,而後在子進程中接收。
應該特別注意管道端點的正確管理問題。若是是生產者或消費者中都沒有使用管道的某個端點,就應將它關閉。這也說明了爲什麼在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。若是忘記執行這些步驟,程序可能在消費者中的recv()操做上掛起。管道是由操做系統進行引用計數的,必須在全部進程中關閉管道後才能生成EOFError異常。所以,在生產者中關閉管道不會有任何效果,除非消費者也關閉了相同的管道端點。
from multiprocessing import Process from multiprocessing import Pipe,Lock def func(p): foo,son=p foo.close() #只用son,就把foo給關了 while True: try: print(son.recv()) except EOFError: #異常處理 break if __name__ =='__main__': foo,son=Pipe() p=Process(target=func,args=((foo,son),)) p.start() son.close() #只用foo,把son給關了 foo.send('hello') foo.send('hello') foo.send('hello') foo.send('heo') foo.close()
pipe實現消費者生產者模型:
1 from multiprocessing import Process,Pipe,Lock 2 3 def consumer(p,name,lock): 4 produce, consume=p 5 produce.close() 6 while True: 7 # lock.acquire() 8 baozi=consume.recv() 9 # lock.release() 10 if baozi: 11 print('%s 收到包子:%s' %(name,baozi)) 12 else: 13 consume.close() 14 break 15 16 def producer(p,n): 17 produce, consume=p 18 consume.close() 19 for i in range(n): 20 produce.send(i) 21 produce.send(None) 22 produce.send(None) 23 produce.close() 24 25 if __name__ == '__main__': 26 produce,consume=Pipe() 27 lock = Lock() 28 c1=Process(target=consumer,args=((produce,consume),'c1',lock)) 29 c2=Process(target=consumer,args=((produce,consume),'c2',lock)) 30 p1=Process(target=producer,args=((produce,consume),10)) 31 c1.start() 32 c2.start() 33 p1.start() 34 35 produce.close() 36 consume.close() 37 38 c1.join() 39 c2.join() 40 p1.join() 41 print('主進程')
四、Manager:是一個類,提供了能夠進行數據共享的一個機制,提供了不少數據類型dict 、list、pipe(這些並不提供數據安全的支持)
from multiprocessing import Process,Manager,Lock def work(dict,lock): lock.acquire() dict['count']-=1 lock.release() if __name__ =='__main__': lock=Lock() m=Manager() #要用Manager定義dict dict=m.dict({'count':100}) l=[] for i in range(100): p=Process(target=work,args=(dict,lock)) p.start() l.append(p) [p.join() for p in l] print(dict)
五、進程池:相似與有一個池子(工做間),把進程(員工)放進去,進行工做。接下來是發任務,而後循環對任務進行工做。(進程池這個功能已經被實現了)。
import os import time import random from multiprocessing import Pool from multiprocessing import Process def func(i): i+=1 if __name__=='__main__': # p=Pool(5) #一、帶#的是另外一種方法:用進程池 # start=time.time() # p.map(func,range(1000)) #至關於target=func,args=next(iterable) # p.close() #是不容許再向進程池中添加任務 # p.join() # print(time.time()-start) p = Process() #二、起進程的辦法,太佔內存,費時間:起進程的時候要分配資源,分配內存,耗費時間 start=time.time() l=[] for i in range(100): p=Process(target=func,args=(i,)) p.start() l.append(p) [i.join() for i in l] print(time.time()-start)