python併發編程:多進程-生產者消費者模型

一 生產者消費者模型介紹python

爲何要使用生產者消費者模型編程

  生產者指的是生產數據的任務,消費者指的是處理數據的任務,在併發編程中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。併發

什麼是生產者和消費者模式dom

  生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題,生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。spa

這個阻塞隊列就是用來給生產者和消費者解耦的對象

 

二 生產者消費者模型實現blog

from multiprocessing import Process, Queue
import time
import random
import os


def consumer(q, name):
    while True:
        res = q.get()
        time.sleep(random.randint(1, 3))
        print("\033[43m %s 吃%s\033[0m" % (name, res))


def producer(q, name, food):
    for i in range(3):
        time.sleep(random.randint(1, 3))
        res = "%s%s" % (food, i)
        q.put(res)
        print("\033[45m %s 生產了 %s\033[0m" % (name, res))


if __name__ == '__main__':
    q = Queue()
    # 生產者們:即廚師們
    p1 = Process(target=producer, args=(q, 'egon', '包子'))

    # 消費者們:即吃貨們
    c1 = Process(target=consumer, args=(q, 'mike'))

    # 開始
    p1.start()
    c1.start()
    print('主')

  執行結果隊列

主
 egon 生產了 包子0
 egon 生產了 包子1
 mike 吃包子0
 egon 生產了 包子2
 mike 吃包子1
 mike 吃包子2

  

  此時的問題是主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步進程

  解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環ip

def consumer(q, name):
    while True:
        res = q.get()
        if res is None:
            break
        time.sleep(random.randint(1, 3))
        print("\033[43m %s 吃%s\033[0m" % (name, res))


def producer(q, name, food):
    for i in range(3):
        time.sleep(random.randint(1, 3))
        res = "%s%s" % (food, i)
        q.put(res)
        print("\033[45m %s 生產了 %s\033[0m" % (name, res))


if __name__ == '__main__':
    q = Queue()
    # 生產者們:即廚師們
    p1 = Process(target=producer, args=(q, 'egon', '包子'))

    # 消費者們:即吃貨們
    c1 = Process(target=consumer, args=(q, 'mike'))

    # 開始
    p1.start()
    c1.start()

    p1.join()
    q.put(None)
    print('主')

  但上述解決方法,在有多個生產者和多個消費者時,咱們則須要用一個很low的方式去解決,有幾個消費者就須要發送幾回結束信號:至關low,例如:

def consumer(q, name):
    while True:
        res = q.get()
        if res is None:
            break
        time.sleep(random.randint(1, 3))
        print("\033[43m %s 吃%s\033[0m" % (name, res))


def producer(q, name, food):
    for i in range(3):
        time.sleep(random.randint(1, 3))
        res = "%s%s" % (food, i)
        q.put(res)
        print("\033[45m %s 生產了 %s\033[0m" % (name, res))


if __name__ == '__main__':
    q = Queue()
    # 生產者們:即廚師們
    p1 = Process(target=producer, args=(q, 'egon1', '包子'))
    p2 = Process(target=producer, args=(q, 'egon2', '燒麥'))
    p3 = Process(target=producer, args=(q, 'egon3', '豆漿'))

    # 消費者們:即吃貨們
    c1 = Process(target=consumer, args=(q, 'mike1'))
    c2 = Process(target=consumer, args=(q, 'mike2'))

    # 開始
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    q.put(None)
    q.put(None)
    q.put(None)
    print('主')

  其實,咱們的思路無非是發送結束信號而已,有另一種隊列提供了這種機制

JoinableQueue(maxsize)

這就是一個Queue對象,但隊列容許項目的使用者通知生產者項目已經被成功處理,通知進程時使用共享的信號和條件變量來實現的

  參數實現

maxsize是隊列中容許最大項數,省略則無大小限制

  方法介紹

JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:
q.task_done():使用者使用此方法發出信號,表示q.get()的返回項已經被處理,若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常
q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用q.task_done()方法爲止

  基於JoinableQueue實現生產者消費者模型

from multiprocessing import Process, JoinableQueue
import time
import random


def consumer(q, name):
    while True:
        res = q.get()
        time.sleep(random.randint(1, 3))
        print('%s 吃 %s' % (name, res))
        q.task_done()  # 發送信號給q.join(),說明已經從隊列中取走一個數據並處理完畢


def producer(q, name, food):
    for i in range(3):
        time.sleep(random.randint(1, 3))
        res = '%s%s' % (food, i)
        q.put(res)
        print('%s 生產了 %s' % (name, res))
    q.join()    # 等到消費者把本身放入隊列中的全部的數據都取走以後,生產者才結束


if __name__ == '__main__':
    q = JoinableQueue()     # 使用JoinableQueue()

    # 生產者:即廚師們
    p1 = Process(target=producer, args=(q, 'egon1', '包子'))
    p2 = Process(target=producer, args=(q, 'egon2', '燒麥'))
    p3 = Process(target=producer, args=(q, 'egon3', '豆漿'))

    # 消費者們:即吃貨們
    c1 = Process(target=consumer, args=(q, 'mike1'))
    c2 = Process(target=consumer, args=(q, 'mike2'))
    c1.daemon = True
    c2.daemon = True

    # 開始
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    # 一、主進程等生產者p1,p2,p3結束
    # 二、而p1,p2,p3,是在消費者把全部數據都取乾淨以後纔會結束
    # 三、因此一旦p1,p2,p3結束了,證實消費者也不必存在了,應該隨着主進程一塊死掉,於是須要將生產者們設置成守護進程
    print("主")

  

三 生產者消費者模型總結

一、程序中有兩類角色

一類負責生產數據(生產者)
一類負責處理數據(消費者)

二、引入生產者消費者模型爲了解決的問題是

平衡生產者與消費者之間的速度差
程序解開耦合

三、如何實現生產者消費者模型

生產者<----->隊列<------>消費者
相關文章
相關標籤/搜索