用Python多線程實現生產者消費者模式

什麼是生產者消費者模式

在軟件開發的過程當中,常常碰到這樣的場景:
某些模塊負責生產數據,這些數據由其餘模塊來負責處理(此處的模塊多是:函數、線程、進程等)。產生數據的模塊稱爲生產者,而處理數據的模塊稱爲消費者。在生產者與消費者之間的緩衝區稱之爲倉庫。生產者負責往倉庫運輸商品,而消費者負責從倉庫裏取出商品,這就構成了生產者消費者模式。python

結構圖以下編程

爲了你們容易理解,咱們舉一個寄信的例子。假設你要寄一封信,大體過程以下:
 一、你把信寫好——至關於生產者生產數據多線程

 二、你把信放入郵箱——至關於生產者把數據放入緩衝區
 三、郵遞員把信從郵箱取出,作相應處理——至關於消費者把數據取出緩衝區,處理數據併發

生產者消費者模式的優勢

  • 解耦
    假設生產者和消費者分別是兩個線程。若是讓生產者直接調用消費者的某個方法,那麼生產者對於消費者就會產生依賴(也就是耦合)。若是將來消費者的代碼發生變化,可能會影響到生產者的代碼。而若是二者都依賴於某個緩衝區,二者之間不直接依賴,耦合也就相應下降了。dom

舉個例子,咱們去郵局投遞信件,若是不使用郵箱(也就是緩衝區),你必須得把信直接交給郵遞員。有同窗會說,直接給郵遞員不是挺簡單的嘛?其實不簡單,你必須 得認識誰是郵遞員,才能把信給他。這就產生了你和郵遞員之間的依賴(至關於生產者和消費者的強耦合)。萬一哪天郵遞員 換人了,你還要從新認識一下(至關於消費者變化致使修改生產者代碼)。而郵箱相對來講比較固定,你依賴它的成本就比較低(至關於和緩衝區之間的弱耦合)。函數

  • 併發
    因爲生產者與消費者是兩個獨立的併發體,他們之間是用緩衝區通訊的,生產者只須要往緩衝區裏丟數據,就能夠繼續生產下一個數據,而消費者只須要從緩衝區拿數據便可,這樣就不會由於彼此的處理速度而發生阻塞。性能

繼續上面的例子,若是咱們不使用郵箱,就得在郵局等郵遞員,直到他回來,把信件交給他,這期間咱們啥事兒都不能幹(也就是生產者阻塞)。或者郵遞員得挨家挨戶問,誰要寄信(至關於消費者輪詢)。學習

  • 支持忙閒不均
    當生產者製造數據快的時候,消費者來不及處理,未處理的數據能夠暫時存在緩衝區中,慢慢處理掉。而不至於由於消費者的性能形成數據丟失或影響生產者生產。spa

咱們再拿寄信的例子,假設郵遞員一次只能帶走1000封信,萬一碰上情人節(或是聖誕節)送賀卡,須要寄出去的信超過了1000封,這時候郵箱這個緩衝區就派上用場了。郵遞員把來不及帶走的信暫存在郵箱中,等下次過來時再拿走。操作系統

經過上面的介紹你們應該已經明白了生產者消費者模式。

Python中的多線程編程

在實現生產者消費者模式以前,咱們先學習下Python中的多線程編程。
線程是操做系統直接支持的執行單元,高級語言一般都內置多線程的支持,Python也不例外,而且Python的線程是真正的Posix Thread,而不是模擬出來的線程。
Python的標準庫提供了兩個模塊:_thread和threading,_thread是低級模塊,threading是高級模塊,對_thread進行了封裝。絕大多數狀況下,咱們只須要使用threading這個高級模塊。

下面咱們先看一段在Python中實現多線程的代碼。

import time,threading
#線程代碼
class TaskThread(threading.Thread):
    def __init__(self,name):
        threading.Thread.__init__(self,name=name)
    def run(self):
        print('thread %s is running...' % self.getName())

        for i in range(6):
            print('thread %s >>> %s' % (self.getName(), i))
            time.sleep(1)

        print('thread %s finished.' % self.getName())

taskthread = TaskThread('TaskThread')
taskthread.start()
taskthread.join()

下面是程序的執行結果:

thread TaskThread is running...
thread TaskThread >>> 0
thread TaskThread >>> 1
thread TaskThread >>> 2
thread TaskThread >>> 3
thread TaskThread >>> 4
thread TaskThread >>> 5
thread TaskThread finished.

TaskThread類繼承自threading模塊中的Thread線程類。構造函數的name參數指定線程的名字,經過重載基類run函數實現具體任務。

在簡單熟悉了Python的線程後,下面咱們實現一個生產者消費者模shi。

from Queue import Queue
import random,threading,time

#生產者類
class Producer(threading.Thread):
    def __init__(self, name,queue):
        threading.Thread.__init__(self, name=name)
        self.data=queue

    def run(self):
        for i in range(5):
            print("%s is producing %d to the queue!" % (self.getName(), i))
            self.data.put(i)
            time.sleep(random.randrange(10)/5)
        print("%s finished!" % self.getName())

#消費者類
class Consumer(threading.Thread):
    def __init__(self,name,queue):
        threading.Thread.__init__(self,name=name)
        self.data=queue
    def run(self):
        for i in range(5):
            val = self.data.get()
            print("%s is consuming. %d in the queue is consumed!" % (self.getName(),val))
            time.sleep(random.randrange(10))
        print("%s finished!" % self.getName())

def main():
    queue = Queue()
    producer = Producer('Producer',queue)
    consumer = Consumer('Consumer',queue)

    producer.start()
    consumer.start()

    producer.join()
    consumer.join()
    print 'All threads finished!'

if __name__ == '__main__':
    main()

執行結果可能以下:

Producer is producing 0 to the queue!
Consumer is consuming. 0 in the queue is consumed!
Producer is producing 1 to the queue!
Producer is producing 2 to the queue!
Consumer is consuming. 1 in the queue is consumed!
Consumer is consuming. 2 in the queue is consumed!
Producer is producing 3 to the queue!
Producer is producing 4 to the queue!
Producer finished!
Consumer is consuming. 3 in the queue is consumed!
Consumer is consuming. 4 in the queue is consumed!
Consumer finished!
All threads finished!

由於多線程是搶佔式執行的,因此打印出的運行結果不必定和上面的徹底一致。

小結

本例經過Python實現了一個簡單的生產者消費者模型。Python中的Queue模塊已經提供了對線程同步的支持,因此本文並無涉及鎖、同步、死鎖等多線程問題。

相關文章
相關標籤/搜索