5 併發編程-(進程)-隊列&生產者消費者模型

一、隊列的介紹

進程彼此之間互相隔離,要實現進程間通訊(IPC),multiprocessing模塊支持兩種形式:隊列和管道,這兩種方式都是使用消息傳遞的編程

建立隊列的類(底層就是以管道和鎖定的方式實現):安全

Queue([maxsize]):建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。併發

參數介紹:ide

maxsize是隊列中容許最大項數,省略則無大小限制。 但須要明確: 1、隊列內存放的是消息而非大數據 二、隊列佔用的是內存空間,於是maxsize即使是無大小限制也受限於內存大小

主要方法:大數據

q.put方法用以插入數據到隊列中。
q.get方法能夠從隊列讀取而且刪除一個元素。
from multiprocessing import Process,Queue

q=Queue(3)

#put ,get ,put_nowait,get_nowait,full,empty
q.put(1)
q.put(2)
q.put(3)
print(q.full()) #滿了
# q.put(4) #再放就阻塞住了

print(q.get())
print(q.get())
print(q.get())
print(q.empty()) #空了
# print(q.get()) #再取就阻塞住了

True
1
2
3
True

2、生產者消費者模型

一、生產者消費者模型介紹

爲何要使用生產者消費者模型

生產者指的是生產數據的任務,消費者指的是處理數據的任務,在併發編程中,spa

若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,code

才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。對象

爲了解決這個問題因而引入了生產者和消費者模式。blog

什麼是生產者和消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。隊列

這個阻塞隊列就是用來給生產者和消費者解耦的

二、生產者消費者模型實現

2.1 引入模型(生產一個消費一個)

import time
def producer():
    for i in range(3):
        res = f"包子 {i}"
        time.sleep(0.5)
        print(f"生產者生產了{res}")
        consumer(res)
def consumer(res):
    time.sleep(1)
    print(f"消費者吃了{res}")
if __name__ == '__main__':
    producer()

生產者生產了包子 0
消費者吃了包子 0
生產者生產了包子 1
消費者吃了包子 1
生產者生產了包子 2
消費者吃了包子 2
View Code

2.2 實現生產者消費者模型,但有小問題主進程永遠不會結束

消費者不知道生產者已經完畢,一直處於等待狀態,死循環

from multiprocessing import Process,Queue
import time
def producer(q):
    for i in range(3):
        res = f"包子 {i}"
        time.sleep(0.5)
        print(f"生產者生產了{res}")
        # 把生產的給隊列保存
        q.put(res)

def consumer(q):
    while True:# 消費者一直接收
        res = q.get()
        time.sleep(1)
        print(f"消費者吃了{res}")
if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer,args=(q,))
    p2 = Process(target=consumer,args=(q,))
    p1.start()
    p2.start()
    print('')

主
生產者生產了包子 0
生產者生產了包子 1
生產者生產了包子 2
消費者吃了包子 0
消費者吃了包子 1
消費者吃了包子 2
View Code

此時的問題是主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。

解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環

2.三、解決辦法--低階--生產者通知消費者生產結束

隊列先進先出

from multiprocessing import Process,Queue
import time
def producer(q):
    for i in range(3):
        res = f"包子 {i}"
        time.sleep(0.5)
        print(f"生產者生產了{res}")
        # 把生產的給隊列保存
        q.put(res)

def consumer(q):
    while True:# 消費者一直接收
        res = q.get()
        if res == None:
            break
        time.sleep(1)
        print(f"消費者吃了{res}")
if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer,args=(q,))
    p2 = Process(target=consumer,args=(q,))
    p1.start()
    p2.start()
    p1.join()# 主進程等待p1子進程執行完畢--即生產者生產完畢
    q.put(None)
    print('')

生產者生產了包子 0
生產者生產了包子 1
生產者生產了包子 2
消費者吃了包子 0
主
消費者吃了包子 1
消費者吃了包子 2
View Code

但上述解決方式,在有多個生產者和多個消費者時,咱們則須要用一個很low的方式去解決,有幾個消費者就須要發送幾回結束信號:至關low,例如

if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,'egon1','包子'))
    p2=Process(target=producer,args=(q,'egon2','骨頭'))
    p3=Process(target=producer,args=(q,'egon3','泔水'))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,'alex1'))
    c2=Process(target=consumer,args=(q,'alex2'))

    #開始
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    q.put(None)
    q.put(None)
    q.put(None)
    print('')

2.4 JoinableQueue([maxsize]) 解決辦法--高階--消費者通知生生產者 項目已經被成功處理

這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

 

JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:
q.task_done():使用者使用此方法發出信號,表示q.get()的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常

q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止

 

from multiprocessing import Process,JoinableQueue
import time
def producer(q):
    for i in range(2):
        res = f"包子 {i}"
        time.sleep(0.5)
        print(f"生產者生產了{res}")
        # 把生產的給隊列保存
        q.put(res)
    q.join()#等待消費者把本身放入隊列中的全部的數據都取走以後,消費者才結束

def consumer(q):
    while True:# 消費者一直接收
        res = q.get()
        # if res == None:
        #     break
        time.sleep(1)
        print(f"消費者吃了{res}")
        q.task_done() #發送信號給q.join() 說明已經從隊列中取走一個數據並處理完畢了
if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer,args=(q,))
    p2 = Process(target=producer,args=(q,))
    p3 = Process(target=producer,args=(q,))

    #------消費者--------
    c1 = Process(target=consumer,args=(q,))
    c2 = Process(target=consumer,args=(q,))
    # 守護進程,消費者設置成守護進程,主進程結束完後 消費者 結束
    c1.daemon = True
    c2.daemon = True

    p1.start()
    p2.start()
    p3.start()

    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

    print('')
View Code

 

三、總結

一、程序中有兩類角色

一類負責生產數據(生產者)
一類負責處理數據(消費者)

二、引入生產者消費者模型爲了解決的問題是

平衡生產者與消費者之間的速度差
程序解開耦合

三、如何實現生產者消費者模型

生產者<--->隊列<--->消費者
相關文章
相關標籤/搜索