該模型中包含兩類重要的角色:dom
一、生產者:將負責造數據的任務比喻爲生產者 二、消費者:接收生產者造出的數據來作進一步的處理的被比喻成消費者spa
實現生產者消費者模型三要素:一、生產者 二、消費者 三、隊列code
何時用該模型:對象
程序中出現明顯的兩類任何,一類任務是負責生產,另一類任務是負責處理生產的數據的blog
該模型的好處:隊列
一、實現了生產者與消費者解耦和進程
二、平衡了生產力與消費力,即生產者能夠一直不停地生產,消費者能夠不停地處理,由於兩者再也不直接溝通的,而是跟隊列溝通,從而提升程序總體處理數據的速度ip
import time import random from multiprocessing import Process,Queue def consumer(name,q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[46m消費者===》%s 吃了 %s\033[0m' %(name,res)) def producer(name,q,food): for i in range(5): time.sleep(random.randint(1,2)) res='%s%s' %(food,i) q.put(res) print('\033[45m生產者者===》%s 生產了 %s\033[0m' %(name,res)) if __name__ == '__main__': q=Queue() #一、共享的盆 p1=Process(target=producer,args=('egon',q,'包子')) #二、生產者們 p2=Process(target=producer,args=('劉清政',q,'泔水')) p3=Process(target=producer,args=('楊軍',q,'米飯')) c1=Process(target=consumer,args=('alex',q)) #三、消費者們 c2=Process(target=consumer,args=('xxx',q)) p1.start() p2.start() p3.start() c1.start() c2.start()
問題:消費者c1和c2在取空了q以後,則一直處於死循環中且卡在q.get()這一步get
解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break退出死循環it
import time import random from multiprocessing import Process,Queue def consumer(name,q): while True: res=q.get() if res is None:break time.sleep(random.randint(1,3)) print('\033[46m消費者===》%s 吃了 %s\033[0m' %(name,res)) def producer(name,q,food): for i in range(5): time.sleep(random.randint(1,2)) res='%s%s' %(food,i) q.put(res) print('\033[45m生產者者===》%s 生產了 %s\033[0m' %(name,res)) #q.put(None) if __name__ == '__main__': #一、共享的盆 q=Queue() #二、生產者們 p1=Process(target=producer,args=('egon',q,'包子')) p2=Process(target=producer,args=('劉清政',q,'泔水')) p3=Process(target=producer,args=('楊軍',q,'米飯')) #三、消費者們 c1=Process(target=consumer,args=('alex',q)) c2=Process(target=consumer,args=('梁書東',q)) p1.start() p2.start() p3.start() c1.start() c2.start() p1.join()# 在生產者生產完畢後,往隊列的末尾添加一個結束信號None p2.join() p3.join() # 有幾個消費者就應該放幾個結束信號 q.put(None)#隊列是共享的,主進程一樣能夠往隊列裏放None q.put(None)
升級版:設置守護進程,向隊列發送結束信號,解決管道取空阻塞問題
JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
maxsize是隊列中容許最大項數,省略則無大小限制
q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常
q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止
import time import random from multiprocessing import Process,JoinableQueue def consumer(name,q): while True: res=q.get() if res is None:break time.sleep(random.randint(1,3)) print('\033[46m消費者===》%s 吃了 %s\033[0m' %(name,res)) q.task_done() #向q.join()發送一次信號,證實一個數據已經被取走了 def producer(name,q,food): .......... if __name__ == '__main__': #一、共享的盆 q=JoinableQueue() #二、生產者們 p1=Process(target=producer,args=('egon',q,'包子')) p2=Process(target=producer,args=('劉清政',q,'泔水')) p3=Process(target=producer,args=('楊軍',q,'米飯')) #三、消費者們 c1=Process(target=consumer,args=('alex',q)) c2=Process(target=consumer,args=('梁書東',q)) c1.daemon=True c2.daemon=True p1.start() p2.start() p3.start() c1.start() c2.start() p1.join()# 肯定生產者確確實實已經生產完畢 p2.join() p3.join() # 在生產者生產完畢後,拿到隊列中元素的總個數,而後直到元素總數變爲0,q.join()這一行代碼纔算運行完畢 q.join() #q.join()一旦結束就意味着隊列確實被取空,消費者已經確確實實把數據都取乾淨了 print('主進程結束')