目錄python
是一個解耦合的過程
補充: queue不適合傳大文件,通產傳一些消息.編程
生產者: 生產數據的任務多線程
消費者: 處理數據的任務
生產者能夠不停的生產,達到了本身最大的生產效率,消費者能夠不停的消費,也達到了本身最大的消費效率.生產者消費者模型大大提升了生產者生產的效率和消費者消費的效率.併發
生產者--隊列(盆)-->消費者dom
在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。線程
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。code
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。對象
from multiprocessing import Process,Queue def producer(q,name,food): """生產者""" for i in range(1,11): print(f'{name}生產{food}{i}') res = f'{food}{i}' q.put(res) q.put(None) def consumer(q,name): """消費者""" while True: res = q.get(timeout=3) if res is None:break print(f'{name}吃了{res}') if __name__ == '__main__': q = Queue() p1 = Process(target=producer,args=(q,'ocean','包子')) c1 = Process(target=consumer,args=(q,'chen')) p1.start() c1.start() ##################### ocean生產包子1 ocean生產包子2 ocean生產包子3 ocean生產包子4 ocean生產包子5 ocean生產包子6 ocean生產包子7 ocean生產包子8 ocean生產包子9 ocean生產包子10 chen吃了包子1 chen吃了包子2 chen吃了包子3 chen吃了包子4 chen吃了包子5 chen吃了包子6 chen吃了包子7 chen吃了包子8 chen吃了包子9 chen吃了包子10
from multiprocessing import Process,Queue import time,random def producer(q,name,food): """生產者""" for i in range(1,4): print(f'{name}生產了{food}{i}') time.sleep(random.randint(1,3)) res = f'{food}{i}' q.put(res) def consumer(q,name): """消費者""" while True: res = q.get(timeout=3) if res is None:break time.sleep(random.randint(1,3)) print(f'{name}吃了{res}') if __name__ == '__main__': q = Queue() p1 = Process(target=producer,args=(q,'ocean','包子')) p2 = Process(target=producer,args=(q,'sky','韭菜')) p3 = Process(target=producer,args=(q,'rocky','酒')) c1 = Process(target=consumer,args=(q,'nick')) c2 = Process(target=consumer,args=(q,'mac')) p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join()#生產者生產完畢 q.put(None)#存在幾個消費者,put幾回None q.put(None) # for i in range(2): # q.put(None) ############################# ocean生產了包子1 sky生產了韭菜1 rocky生產了酒1 ocean生產了包子2 sky生產了韭菜2 ocean生產了包子3 sky生產了韭菜3 mac吃了韭菜1 rocky生產了酒2 nick吃了包子1 mac吃了包子2 rocky生產了酒3 nick吃了韭菜2 mac吃了酒1 nick吃了包子3 nick吃了酒2 mac吃了韭菜3 nick吃了酒3
建立可鏈接的共享進程隊列。這就像是一個Queue對象,但隊列容許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。隊列
q.task_done()
:使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。若是調用此方法的次數大於從隊列中刪除的項目數量,將引起ValueError異常。進程
q.join()
:生產者將使用此方法進行阻塞,直到隊列中全部項目均被處理。阻塞將持續到爲隊列中的每一個項目均調用q.task_done()方法爲止。
from multiprocessing import JoinableQueue #用法和Queue類似 q = JoinableQueue() q.put('ocean')#隊列放入一個任務,內存在一個計數機制,+1 q.put('can')#計數機制 +1 print(q.get()) q.task_done()#完成一次任務,計數機制-1 print(q.get()) q.task_done()#計數機制 -1 q.join()#計數機制不爲0的時候,阻塞等待計數器爲0後經過 ################## ocean can
from multiprocessing import Process, JoinableQueue import time, random def producer(q, name, food): """生產者""" for i in range(3): print(f'{name}生產了{food}{i}') time.sleep(random.randint(1, 3)) res = f'{food}{i}' q.put(res) def consumer(q, name): """消費者""" while True: res = q.get() time.sleep(random.randint(1, 3)) print(f'{name}吃了{res}') q.task_done() # 用來計數 if __name__ == '__main__': q = JoinableQueue() p1 = Process(target=producer, args=(q, 'ocean', '包子')) p2 = Process(target=producer, args=(q, 'chen', '菜')) p3 = Process(target=producer, args=(q, 'nick', '酒')) c1 = Process(target=consumer, args=(q, 'pocky')) c2 = Process(target=consumer, args=(q, 'mac')) p1.start() p2.start() p3.start() c1.daemon = True # 定義爲收回進程 c2.daemon = True c1.start() c2.start() p1.join() p2.join() p3.join() # 生產者生產完畢 q.join() # 生產者生產完畢--這是主程序的最後一行代碼,執行後結束---q.join()消費者已經取乾淨了,沒有存在的意思了。 # 這個是主程序的最後一行代碼結束,守護進程結束,也是守護進程的概念 ############################### ocean生產了包子0 chen生產了菜0 nick生產了酒0 chen生產了菜1 nick生產了酒1 ocean生產了包子1 chen生產了菜2 pocky吃了菜0 nick生產了酒2 ocean生產了包子2 mac吃了酒0 pocky吃了包子0 mac吃了菜1 mac吃了包子1 mac吃了菜2 pocky吃了酒1 pocky吃了包子2 mac吃了酒2