python 併發編程 多進程 JoinableQueue

 

JoinableQueue和Queue 使用同樣

這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
JoinableQueue([maxsize])

參數介紹spa

maxsize是隊列中容許最大項數,省略則無大小限制

方法介紹code

 

JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:
q.task_done():使用者使用此方法發出信號,表示q.get()的返回數據已經被處理 數據所有接好了。
若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常

q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。
直到隊列中全部的數據都被取。
阻塞將持續到隊列中的每一個數據都調用q.task_done()方法爲止

 

 

q.join()  做用是 等隊列執行完了   隊列數據取完 就執行完了對象

 

基於JoinableQueue實現生產者消費者模型
守護進程應用

如今是消費者給生產者發送結束信號

在全部生產者生產完之後 加上q.join() 等隊列全部數據被取完,就不等了,
消費者把數據取完之後,消費者發送結束信號, 就是 q.task_done()

from multiprocessing import Process
from multiprocessing import JoinableQueue
import time


def producer(q):

    for i in range(1,3):
        res = "包子%s" %i
        time.sleep(0.5)
        print("生產者生產%s" % res)

        q.put(res)

    # 等着生產者從隊列裏放好全部數據
    q.join()


def consumer(q):
    while True:
        # 接取數據
        res = q.get()
        if res is None:break
        time.sleep(1)
        print("消費者消費%s" % res)

        # 發信號由消費者去發送 表明生產者已經把隊列數據都取走了
        q.task_done()

if __name__ == "__main__":

    # 容器
    q = JoinableQueue()

    # 生產者們
    # 須要傳參數 把生產後數據的往隊列裏丟
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=producer, args=(q,))
    p3 = Process(target=producer, args=(q,))

    # 消費者們
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))

    # 消費者從隊列裏取空數據,沒有意義存在了
    # 設置守護進程
    c1.daemon = True
    c2.daemon = True

    p1.start()
    p2.start()
    p3.start()

    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

    print("")
相關文章
相關標籤/搜索