建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。python
Queue([maxsize])
建立共享的進程隊列。
參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。安全
底層隊列使用管道和鎖定實現。服務器
q.get( [ block [ ,timeout ] ] )
:返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。dom
q.put(item [, block [,timeout ] ] )
:將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。線程
q.close()
:關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。code
q.empty()
:若是調用此方法時 q爲空,返回True。繼承
q.get_nowait()
:取值 沒有值不等待直接報錯隊列
q.full()
:若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的進程
若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。這時候empty,full,get_nowait取到結果和實際管道的結果就可能不一致了!ip
""" 隊列:先進先出 堆棧:先進後出 """ from multiprocessing import Queue q = Queue(5) # 括號內能夠傳參數 表示的是這個隊列的最大存儲數 # 往隊列中添加數據 q.put(1) q.put(2) # print(q.full()) # 判斷隊列是否滿了 q.put(3) q.put(4) q.put(5) # print(q.full()) # q.put(6) # 當隊列滿了以後 再放入數據 不會報錯 會原地等待 直到隊列中有數據被取走(阻塞態) print(q.get()) print(q.get()) print(q.get()) print(q.empty()) # 判斷隊列中的數據是否取完 print(q.get()) print(q.get()) print(q.empty()) # print(q.get_nowait()) # 取值 沒有值不等待直接報錯 # print(q.get()) # 當隊列中的數據被取完以後 再次獲取 程序會阻塞 直到有人往隊列中放入值 """ full get_nowait empty 都不適用於多進程的狀況 """
from multiprocessing import Process,Queue def producer(q): q.put('hello GF~') def consumer(q): print(q.get()) if __name__ == '__main__': q = Queue() p = Process(target=producer,args=(q,)) c = Process(target=consumer, args=(q,)) p.start() c.start() """ 子進程放數據 主進程獲取數據 兩個子進程相互放 取數據 """
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。
生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('%s 吃 %s' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('生產了 %s' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) #開始 p1.start() c1.start() print('主')
此時的問題是主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。
解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環。
由於數據沒有,get()會一直阻塞,根據隊列先進先出的原理,這樣生產者在發完全部數據後發個標誌數據,好比None,而消費者進行判斷,當最後結果爲None時候結束循環.
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束 time.sleep(random.randint(1,3)) print('%s 吃 %s' %(os.getpid(),res)) def producer(q): for i in range(10): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('生產了 %s' %(os.getpid(),res)) q.put(None) #發送結束信號 if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) #開始 p1.start() c1.start() print('主')
from multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束 time.sleep(random.randint(1,3)) print('%s 吃 %s' %(os.getpid(),res)) def producer(q): for i in range(2): time.sleep(random.randint(1,3)) res='包子%s' %i q.put(res) print('生產了 %s' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=(q,)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) #開始 p1.start() c1.start() p1.join() q.put(None) #發送結束信號 print('主')
注意:結束信號None,不必定要由生產者發,主進程裏一樣能夠發,但主進程須要等生產者結束後才應該發送該信號。
建立可鏈接的共享進程隊列。但隊列容許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。
本質就是繼承了Queue,而後基礎上添加了task_done和join方法.
q.task_done()
:使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。q.join()
:生產者將使用此方法進行阻塞,直到隊列中全部項目均被處理。阻塞將持續到爲隊列中的每一個項目均調用q.task_done()方法爲止。""" 生產者:生產/製造數據的 消費者:消費/處理數據的 例子:作包子的,買包子的 1.作包子遠比買包子的多 2.作包子的遠比包子的少 供需不平衡的問題 """ from multiprocessing import Process,Queue,JoinableQueue import random import time def producer(name,food,q): for i in range(10): data = '%s生產了%s%s'%(name,food,i) time.sleep(random.random()) q.put(data) print(data) def consumer(name,q): while True: data = q.get() if data == None:break print('%s吃了%s'%(name,data)) time.sleep(random.random()) q.task_done() # 告訴隊列你已經從隊列中取出了一個數據 而且處理完畢了 if __name__ == '__main__': q = JoinableQueue() p = Process(target=producer,args=('大廚egon','饅頭',q)) p1 = Process(target=producer,args=('跟班tank','生蠔',q)) c = Process(target=consumer,args=('許兆龍',q)) c1 = Process(target=consumer,args=('吃貨jerry',q)) p.start() p1.start() c.daemon = True c1.daemon = True c.start() c1.start() p.join() p1.join() q.join() # 等到隊列中數據所有取出 # q.put(None) # q.put(None)
基於管道實現進程間通訊(與隊列的方式是相似的,隊列就是管道加鎖實現的)
管道能夠用於雙向通訊,利用一般在客戶端/服務器中使用的請求/響應模型或遠程過程調用,就可使用管道編寫與進程交互的程序