python 之 併發編程(生產者消費者模型、守護進程的應用)

9.8 生產者消費者模型

該模型中包含兩類重要的角色:dom

一、生產者:將負責造數據的任務比喻爲生產者 二、消費者:接收生產者造出的數據來作進一步的處理的被比喻成消費者spa

實現生產者消費者模型三要素:一、生產者 二、消費者 三、隊列code

何時用該模型:對象

程序中出現明顯的兩類任何,一類任務是負責生產,另一類任務是負責處理生產的數據的blog

該模型的好處:隊列

一、實現了生產者與消費者解耦和進程

二、平衡了生產力與消費力,即生產者能夠一直不停地生產,消費者能夠不停地處理,由於兩者再也不直接溝通的,而是跟隊列溝通,從而提升程序總體處理數據的速度ip

import time
import random
from multiprocessing import Process,Queue
def consumer(name,q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[46m消費者===》%s 吃了 %s\033[0m' %(name,res))
​
def producer(name,q,food):
    for i in range(5):
        time.sleep(random.randint(1,2))
        res='%s%s' %(food,i)
        q.put(res)
        print('\033[45m生產者者===》%s 生產了 %s\033[0m' %(name,res))
​
if __name__ == '__main__':
    q=Queue()                                       #一、共享的盆
   
    p1=Process(target=producer,args=('egon',q,'包子'))  #二、生產者們
    p2=Process(target=producer,args=('劉清政',q,'泔水'))
    p3=Process(target=producer,args=('楊軍',q,'米飯'))
​
    c1=Process(target=consumer,args=('alex',q))         #三、消費者們
    c2=Process(target=consumer,args=('xxx',q))
​
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

9.81 守護進程的應用

問題:消費者c1和c2在取空了q以後,則一直處於死循環中且卡在q.get()這一步get

解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break退出死循環it

import time
import random
from multiprocessing import Process,Queue
​
def consumer(name,q):
    while True:
        res=q.get()
        if res is None:break
        time.sleep(random.randint(1,3))
        print('\033[46m消費者===》%s 吃了 %s\033[0m' %(name,res))
​
def producer(name,q,food):
    for i in range(5):
        time.sleep(random.randint(1,2))
        res='%s%s' %(food,i)
        q.put(res)
        print('\033[45m生產者者===》%s 生產了 %s\033[0m' %(name,res))
    #q.put(None)
if __name__ == '__main__':
    #一、共享的盆
    q=Queue()
    #二、生產者們
    p1=Process(target=producer,args=('egon',q,'包子'))
    p2=Process(target=producer,args=('劉清政',q,'泔水'))
    p3=Process(target=producer,args=('楊軍',q,'米飯'))
    #三、消費者們
    c1=Process(target=consumer,args=('alex',q))
    c2=Process(target=consumer,args=('梁書東',q))
​
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
​
    p1.join()# 在生產者生產完畢後,往隊列的末尾添加一個結束信號None
    p2.join()
    p3.join()
    # 有幾個消費者就應該放幾個結束信號
    q.put(None)#隊列是共享的,主進程一樣能夠往隊列裏放None
    q.put(None)

升級版:設置守護進程,向隊列發送結束信號,解決管道取空阻塞問題

JoinableQueue([maxsize]):這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

  • maxsize是隊列中容許最大項數,省略則無大小限制

  • q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常

  • q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止

import time
import random
from multiprocessing import Process,JoinableQueue
​
def consumer(name,q):
    while True:
        res=q.get()
        if res is None:break
        time.sleep(random.randint(1,3))
        print('\033[46m消費者===》%s 吃了 %s\033[0m' %(name,res))
        q.task_done()   #向q.join()發送一次信號,證實一個數據已經被取走了
def producer(name,q,food):
..........
if __name__ == '__main__':
    #一、共享的盆
    q=JoinableQueue()
    #二、生產者們
    p1=Process(target=producer,args=('egon',q,'包子'))
    p2=Process(target=producer,args=('劉清政',q,'泔水'))
    p3=Process(target=producer,args=('楊軍',q,'米飯'))
    #三、消費者們
    c1=Process(target=consumer,args=('alex',q))
    c2=Process(target=consumer,args=('梁書東',q))
    c1.daemon=True
    c2.daemon=True
​
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
​
    p1.join()# 肯定生產者確確實實已經生產完畢
    p2.join()
    p3.join()
    # 在生產者生產完畢後,拿到隊列中元素的總個數,而後直到元素總數變爲0,q.join()這一行代碼纔算運行完畢
    q.join()
    #q.join()一旦結束就意味着隊列確實被取空,消費者已經確確實實把數據都取乾淨了
    print('主進程結束')
相關文章
相關標籤/搜索