進程之生產者消費者模型

1、什麼是生產者消費者模型編程

  生產者指的是生產數據的任務,消費者指的是處理數據的任務,在併發編程中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。併發

  該模型的工做方式:生產者生產數據傳遞給消費者處理dom

  實現方式:生產者 ---> 隊列 <--- 消費者ide

2、爲什麼要使用生產者消費者模型spa

  當程序中出現明顯的兩類任務,一類負責生產數據,一類負責處理數據,就能夠引入生產者消費者模型來實現生產者與消費者的解耦合,平衡生產能力與消費能力,從而提高效率3d

  生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。code

  這個阻塞隊列就是用來給生產者和消費者解耦的對象

3、生產者消費者模型的使用blog

基於隊列來實現一個生產者消費者模型隊列

import time, random from multiprocessing import Process, Queue def producer(name, food, q): for i in range(6): res = "%s%s" %(food, i) time.sleep(random.randint(1, 3)) # 模擬生產數據的時間
 q.put(res) print("廚師 %s 生產了 %s" %(name, res)) def consumer(name, q): while True: res = q.get() time.sleep(random.randint(1, 3)) # 模擬處理數據的時間
        print("吃貨 %s 吃了 %s" % (name, res)) if __name__ == '__main__': q = Queue() # 生產者
    p1 = Process(target=producer, args=("qiu", "包子", q)) # 消費者
    c1 = Process(target=consumer, args=("xi", q)) p1.start() c1.start() print("主進程") # 運行結果
主進程 廚師 qiu 生產了 包子0 吃貨 xi 吃了 包子0 廚師 qiu 生產了 包子1 廚師 qiu 生產了 包子2 吃貨 xi 吃了 包子1 吃貨 xi 吃了 包子2 廚師 qiu 生產了 包子3 吃貨 xi 吃了 包子3 廚師 qiu 生產了 包子4 吃貨 xi 吃了 包子4 廚師 qiu 生產了 包子5 吃貨 xi 吃了 包子5
View Code

此時的問題是主進程永遠不會結束,緣由是:生產者 p 在生產完後就結束了,可是消費者 c 在取空了 q 以後,則一直處於死循環中且卡在 q.get() 這一步

解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠 break 出死循環

import time, random from multiprocessing import Process, Queue def producer(name, food, q): for i in range(3): res = "%s%s" %(food, i) time.sleep(random.randint(1, 3)) # 模擬生產數據的時間
 q.put(res) print("廚師 %s 生產了 %s" %(name, res)) q.put(None) def consumer(name, q): while True: res = q.get() if not res: break time.sleep(random.randint(1, 3)) # 模擬處理數據的時間
        print("吃貨 %s 吃了 %s" % (name, res)) if __name__ == '__main__': q = Queue() # 生產者
    p1 = Process(target=producer, args=("qiu", "包子", q)) # 消費者
    c1 = Process(target=consumer, args=("xi", q)) p1.start() c1.start() print("主進程") # 運行結果
主進程 廚師 qiu 生產了 包子0 吃貨 xi 吃了 包子0 廚師 qiu 生產了 包子1 吃貨 xi 吃了 包子1 廚師 qiu 生產了 包子2 吃貨 xi 吃了 包子2
View Code

上述解決方式,在有多個生產者和多個消費者時,會出現生產完了但沒有消費的狀況

import time, random from multiprocessing import Process, Queue def producer(name, food, q): for i in range(3): res = "%s%s" %(food, i) time.sleep(random.randint(1, 3)) # 模擬生產數據的時間
 q.put(res) print("廚師 %s 生產了 %s" %(name, res)) q.put(None) def consumer(name, q): while True: res = q.get() if not res: break time.sleep(random.randint(1, 3)) # 模擬處理數據的時間
        print("吃貨 %s 吃了 %s" % (name, res)) if __name__ == '__main__': q = Queue() # 生產者
    p1 = Process(target=producer, args=("qiu", "包子", q)) p2 = Process(target=producer, args=("xi", "餃子", q)) p3 = Process(target=producer, args=("ru", "饅頭", q)) # 消費者
    c1 = Process(target=consumer, args=("feng", q)) c2 = Process(target=consumer, args=("yan", q)) p1.start() p2.start() p3.start() c1.start() c2.start() print("主進程") # 運行結果
主進程 廚師 qiu 生產了 包子0 廚師 qiu 生產了 包子1 廚師 ru 生產了 饅頭0 廚師 qiu 生產了 包子2 廚師 xi 生產了 餃子0 廚師 ru 生產了 饅頭1 吃貨 feng 吃了 包子0 廚師 ru 生產了 饅頭2 吃貨 yan 吃了 包子1 吃貨 yan 吃了 包子2 廚師 xi 生產了 餃子1 吃貨 feng 吃了 饅頭0 廚師 xi 生產了 餃子2 吃貨 feng 吃了 餃子0 吃貨 feng 吃了 饅頭1 吃貨 feng 吃了 饅頭2
View Code

可見,生產了餃子1 和餃子 2,可是沒有人吃。其實這是由於生產者 1 生產完就往隊列裏放,生產者 2 生產完也往隊列裏放,3 也是,這是同一個隊列,但三個生產者的 for 循環並非同時結束,因此要把結束信號放到數據的最後

import time, random from multiprocessing import Process, Queue def producer(name, food, q): for i in range(3): res = "%s%s" %(food, i) time.sleep(random.randint(1, 3)) # 模擬生產數據的時間
 q.put(res) print("廚師 %s 生產了 %s" %(name, res)) def consumer(name, q): while True: res = q.get() if not res: break time.sleep(random.randint(1, 3)) # 模擬處理數據的時間
        print("吃貨 %s 吃了 %s" % (name, res)) if __name__ == '__main__': q = Queue() # 生產者
    p1 = Process(target=producer, args=("qiu", "包子", q)) p2 = Process(target=producer, args=("xi", "餃子", q)) p3 = Process(target=producer, args=("ru", "饅頭", q)) # 消費者
    c1 = Process(target=consumer, args=("feng", q)) c2 = Process(target=consumer, args=("yan", q)) p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() q.put(None) q.put(None) print("主進程") # 運行結果
廚師 qiu 生產了 包子0 廚師 ru 生產了 饅頭0 廚師 xi 生產了 餃子0 吃貨 feng 吃了 包子0 廚師 qiu 生產了 包子1 廚師 xi 生產了 餃子1 吃貨 yan 吃了 饅頭0 廚師 ru 生產了 饅頭1 吃貨 feng 吃了 餃子0 吃貨 yan 吃了 包子1 吃貨 feng 吃了 餃子1 廚師 qiu 生產了 包子2 廚師 xi 生產了 餃子2 吃貨 yan 吃了 饅頭1 廚師 ru 生產了 饅頭2 主進程 吃貨 yan 吃了 餃子2 吃貨 feng 吃了 包子2 吃貨 yan 吃了 饅頭2
View Code

但這種方式很是 low,有幾個消費者就須要發送幾回結束信號,因此有另一種隊列提供瞭解決方案

JoinableQueue([maxsize])

  這就像是一個 Queue 對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。maxsize 是隊列中容許最大項數,省略則無大小限制。

方法介紹:JoinableQueue 的實例 q 除了與 Queue 對象相同的方法以外還具備:

  q.task_done():使用者使用此方法發出信號,表示 q.get() 的返回項目已經被處理。若是調用此方法的次數大於從隊列中刪除項目的數量,將引起 ValueError 異常

  q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。阻塞將持續到隊列中的每一個項目均調用 q.task_done() 方法爲止

import time, random from multiprocessing import Process, JoinableQueue def producer(name, food, q): for i in range(3): res = "%s%s" %(food, i) time.sleep(random.randint(1, 3)) # 模擬生產數據的時間
 q.put(res) print("廚師 %s 生產了 %s" %(name, res)) def consumer(name, q): while True: res = q.get() time.sleep(random.randint(1, 3)) # 模擬處理數據的時間
        print("吃貨 %s 吃了 %s" % (name, res)) q.task_done() if __name__ == '__main__': q = JoinableQueue() # 生產者
    p1 = Process(target=producer, args=("qiu", "包子", q)) p2 = Process(target=producer, args=("xi", "餃子", q)) p3 = Process(target=producer, args=("ru", "饅頭", q)) # 消費者
    c1 = Process(target=consumer, args=("feng", q)) c2 = Process(target=consumer, args=("yan", q)) # 主進程等生產者p一、p二、p3結束, 而p一、p二、p3是在消費者把全部數據都取乾淨以後纔會結束
    # 因此一旦p一、p二、p3結束了,證實消費者也不必存在了,應該隨着主進程一塊死掉,於是須要將消費者設置成守護進程
    c1.daemon = True c2.daemon = True p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() q.join() # 主進程等q結束, 即q內的數據被取乾淨了
    print("主進程") # 運行結果
廚師 xi 生產了 餃子0 廚師 ru 生產了 饅頭0 廚師 qiu 生產了 包子0 廚師 xi 生產了 餃子1 吃貨 feng 吃了 餃子0 廚師 qiu 生產了 包子1 廚師 qiu 生產了 包子2 廚師 xi 生產了 餃子2 吃貨 yan 吃了 饅頭0 廚師 ru 生產了 饅頭1 吃貨 feng 吃了 包子0 吃貨 yan 吃了 餃子1 廚師 ru 生產了 饅頭2 吃貨 feng 吃了 包子1 吃貨 yan 吃了 包子2 吃貨 yan 吃了 饅頭1 吃貨 feng 吃了 餃子2 吃貨 yan 吃了 饅頭2 主進程
View Code
相關文章
相關標籤/搜索