生產者消費者模型

********生產者消費者模型********


在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費
線程的工做能力來提升程序的總體處理數據的速度。

爲何要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是
生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產
數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這
個問題因而引入了生產者和消費者模式。

什麼是生產者消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不
直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給
阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平
衡了生產者和消費者的處理能力。

基於隊列實現生產者消費者模型


JoinableQueue([maxsize]) 
建立可鏈接的共享進程隊列。這就像是一個Queue對象,但隊列容許項目的使用者通知生產者項目已
經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

****方法介紹

JoinableQueue的實例p除了與Queue對象相同的方法以外,還具備如下方法:

q.task_done()
使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。若是調用此方法的次數大於從隊列
中刪除的項目數量,將引起ValueError異常。

q.join()
生產者將使用此方法進行阻塞,直到隊列中全部項目均被處理。阻塞將持續到尾隊列中的每一個項目均調用
q.task_done()方法爲止。


創建永遠運行的進程,使用和處理隊列上的項目。生產者將項目放入隊列,並等待它們被處理。



實現方法一(通用)

from multiprocessing import Process, JoinableQueue


def consumer(q, name):
    while 1:
        info = q.get()
        print("%s吃了" % name + info)
        q.task_done()


def producer(q, name):
    for i in range(10):
        info = '%s生產了%s號包子' % (name, i)
        q.put(info)
    q.join()


if __name__ == '__main__':
    q = JoinableQueue(10)
    pro = Process(target=producer, args=(q, 'Tom',))
    con = Process(target=consumer, args=(q, 'Jake'))
    con1 = Process(target=consumer, args=(q, 'Jason'))
    con.daemon = True  # 有幾個消費者就有幾個守護進程
    con1.daemon = True
    con1.start()
    con.start()
    pro.start()
    pro.join()


實現方法二(比較野)

from multiprocessing import Process, Queue
import time

def consumer(q, name):
    while 1:
        info = q.get()
        if info:
            print("%s吃了" % name + info)
        else:
            break

def producer(q, name):
    for i in range(10):
        time.sleep(0.1)
        info = '%s生產了%s號包子' % (name, i)
        q.put(info)



if __name__ == '__main__':
    q = Queue(10)
    pro = Process(target=producer, args=(q, 'Tom',))
    con = Process(target=consumer, args=(q, 'Jake'))
    con1 = Process(target=consumer, args=(q, 'Jason'))
    pro.start()
    con1.start()
    con.start()
    pro.join()
    q.put(None)#有幾個消費者就要put幾個None
    q.put(None)

實現方法三(比較野)

from multiprocessing import Process, Queue
import time

def consumer(q, name):
    while 1:
        info = q.get()
        if info:
            print("%s吃了" % name + info)
        else:
            break


def producer(q, name):
    for i in range(10):
        time.sleep(0.1)
        info = '%s生產了%s號包子' % (name, i)
        q.put(info)
    q.put(None) #有幾個消費者就要put幾個Nne
    q.put(None)


if __name__ == '__main__':
    q = Queue(10)

    pro = Process(target=producer, args=(q, 'Tom',))
    Process(target=producer, args=(q, 'Tom1',)).start()
    Process(target=producer, args=(q, 'Tom2',)).start()
    con = Process(target=consumer, args=(q, 'Jake'))
    con1 = Process(target=consumer, args=(q, 'Jason'))
    con1.start()
    pro.start()
    con.start()
相關文章
相關標籤/搜索