JoinableQueue和Queue 使用同樣
這就像是一個Queue對象,但隊列容許項目的使用者通知生成者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
JoinableQueue([maxsize])
參數介紹spa
maxsize是隊列中容許最大項數,省略則無大小限制
方法介紹code
JoinableQueue的實例p除了與Queue對象相同的方法以外還具備:
q.task_done():使用者使用此方法發出信號,表示q.get()的返回數據已經被處理 數據所有接好了。
若是調用此方法的次數大於從隊列中刪除項目的數量,將引起ValueError異常
q.join():生產者調用此方法進行阻塞,直到隊列中全部的項目均被處理。
直到隊列中全部的數據都被取。
阻塞將持續到隊列中的每一個數據都調用q.task_done()方法爲止
q.join() 做用是 等隊列執行完了 隊列數據取完 就執行完了對象
基於JoinableQueue實現生產者消費者模型
守護進程應用
如今是消費者給生產者發送結束信號
在全部生產者生產完之後 加上q.join() 等隊列全部數據被取完,就不等了,
消費者把數據取完之後,消費者發送結束信號, 就是 q.task_done()
from multiprocessing import Process from multiprocessing import JoinableQueue import time def producer(q): for i in range(1,3): res = "包子%s" %i time.sleep(0.5) print("生產者生產%s" % res) q.put(res) # 等着生產者從隊列裏放好全部數據 q.join() def consumer(q): while True: # 接取數據 res = q.get() if res is None:break time.sleep(1) print("消費者消費%s" % res) # 發信號由消費者去發送 表明生產者已經把隊列數據都取走了 q.task_done() if __name__ == "__main__": # 容器 q = JoinableQueue() # 生產者們 # 須要傳參數 把生產後數據的往隊列裏丟 p1 = Process(target=producer, args=(q,)) p2 = Process(target=producer, args=(q,)) p3 = Process(target=producer, args=(q,)) # 消費者們 c1 = Process(target=consumer, args=(q,)) c2 = Process(target=consumer, args=(q,)) # 消費者從隊列裏取空數據,沒有意義存在了 # 設置守護進程 c1.daemon = True c2.daemon = True p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() print("主")