python 併發編程 多進程 生產者消費者模型介紹

 

 

一 生產者消費者模型介紹

爲何要使用生產者消費者模型

生產者指的是生產數據的任務,消費者指的是處理數據的任務,html

生產數據目的,是爲了給消費者處理。編程

 

在併發編程中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。併發

 

import time


def producer():
    '''生產者是廚師'''


    for i in range(1,4):  # 模擬生產三個數據
        res = "包子%s" % i
        time.sleep(2)
        print("生產者生產%s" % res)

        consumer(res)

def consumer(res):
    '''消費者吃包子'''

    time.sleep(1)
    print("消費者消費%s" % res)

if __name__ == "__main__":
    producer()

'''
生產者生產包子1
消費者消費包子1
生產者生產包子2
消費者消費包子2
生產者生產包子3
消費者消費包子3
'''

 

 

 

生產者必須等消費者,消費者也必須等生產者!spa

 

什麼是生產者和消費者模型

生產者消費者模型是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。code

+

 

 

 

 

這個阻塞隊列就是用來給生產者和消費者解耦的htm

 

二 生產者消費者模型實現

經過隊列blog

 

from multiprocessing import Process
from multiprocessing import Queue
import time


def producer(q):

    for i in range(1,6):
        res = "包子%s" %i
        time.sleep(0.5)
        print("生產者生產%s" % res)

        # 如今不是給消費者了 放入數據 res 把包子放到隊列
        q.put(res)

def consumer(q):
    while True:
        # 一直接取數據
        res = q.get()
        time.sleep(1)
        print("消費者消費%s" % res)

if __name__ == "__main__":

    # 容器
    q = Queue()

    # 生產者們
    # 須要傳參數 把生產後數據的往隊列裏丟
    p = Process(target=producer, args=(q,))

    # 消費者們
    c = Process(target=consumer, args=(q,))

    p.start()
    c.start()
    print("")


 

 

執行結果,不是生產一個消費一個,生成者與消費者互相不影響隊列

主
生產者生產包子1
生產者生產包子2
生產者生產包子3
消費者消費包子1
生產者生產包子4
生產者生產包子5
消費者消費包子2
消費者消費包子3
消費者消費包子4
消費者消費包子5

 

 

此時的問題是主進程永遠不會結束

緣由是:生產者p在生產完後就結束了,可是消費者c在取空了隊列以後,則一直處於死循環中且卡在q.get()這一步。(生產者從隊列取不到數據就卡住)進程

一旦隊列取空了 會被鎖住 就卡住了,程序卡在消費者ip

 

解決:

解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環

 

from multiprocessing import Process
from multiprocessing import Queue
import time


def producer(q):

    for i in range(1,6):
        res = "包子%s" %i
        time.sleep(0.5)
        print("生產者生產%s" % res)

        # 如今不是給消費者了 放入數據 res 把包子放到隊列
        q.put(res)

def consumer(q):
    while True:
        # 接取數據
        res = q.get()
        # 消費者在最後一次收到None後退出
        if res is None:break
        time.sleep(1)
        print("消費者消費%s" % res)

if __name__ == "__main__":

    # 容器
    q = Queue()

    # 生產者們
    # 須要傳參數 把生產後數據的往隊列裏丟
    p = Process(target=producer, args=(q,))

    # 消費者們
    c = Process(target=consumer, args=(q,))

    p.start()
    c.start()

    # 保證全部子進程執行完 主進程才能工做,否則一直阻塞
    p.join()
    # 保證生產者把全部數據扔到隊列後 而後發送結束信號
    q.put(None)
    print("")

'''
生產者生產包子1
生產者生產包子2
生產者生產包子3
消費者消費包子1
生產者生產包子4
生產者生產包子5
消費者消費包子2
主
消費者消費包子3
消費者消費包子4
消費者消費包子5
'''

 

 

 若是有多個消費者 生產者時候

有幾個消費者就須要發送幾回結束信號

from multiprocessing import Process, Queue
import time


def producer(q):

    for i in range(5):
        res = "包子%s" %i
        time.sleep(0.5)
        print("生產者生產%s" % res)

        # 如今不是給消費者了 放入數據 res 把包子放到隊列
        q.put(res)

def consumer(q):
    while True:
        # 接取數據
        res = q.get()
        if res is None:break
        time.sleep(1)
        print("消費者消費%s" % res)

if __name__ == "__main__":

    # 容器
    q = Queue()

    # 生產者們
    # 須要傳參數 把生產後數據的往隊列裏丟
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=producer, args=(q,))
    p3= Process(target=producer, args=(q,))

    # 消費者們
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))

    p1.start()
    p2.start()
    p3.start()

    c1.start()
    c2.start()

    # 保證全部子進程執行完 主進程才能工做,否則一直阻塞
    p1.join()
    p2.join()
    p3.join()

    # 發完數據後 發送結束信號 有幾個消費者就應該來幾個信號
    
    q.put(None)
    q.put(None)
    print("")

 結束信號必定是跟在正常數據後面,保證全部消費者把正常數據取走之後,接下來取的是結束信號

結束信號應該在主進程裏面確保全部的生產者,都生產完畢之後,才發送結束信號

 

 

其實咱們的思路無非是發送結束信號而已,有另一種隊列提供了這種機制 能夠點擊這個文章

JoinableQueue

相關文章
相關標籤/搜索