4.3 隊列/管道

IPA 通訊

概念原理

建立/方法

Queue([maxsize]) 
  建立共享的進程隊列。
  maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。


Queue的實例q具備如下方法:
q.get( [ block [ ,timeout ] ] )   返回q中的一個項目。
  q爲空,此方法將阻塞,直到隊列中有項目可用爲止。
  block 用於控制阻塞行爲,
    默認爲True.
    設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。
  timeout 可選超時時間
    用在阻塞模式中
    若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。
q.get_nowait( )   同q.get(False)方法。
q.put(item [, block [,timeout ] ] )   將item放入隊列 , 若是隊列已滿,此方法將阻塞至有空間可用爲止。
  block控制阻塞行爲,
    默認爲True。
    設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。
  timeout指定在阻塞模式中等待可用空間的時間長短
    超時後將引起Queue.Full異常。
q.qsize()   返回隊列中目前項目的正確數量。
  此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目
  在某些系統上,此方法可能引起NotImplementedError異常。 q.empty()   若是調用此方法時 q爲空,返回True。
  若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。
q.full()   若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。

  進程間通信示例dom

import time
from multiprocessing import Process, Queue

def f(q):
    q.put([time.asctime(), 'hi', 'hello'])  #調用主函數中p進程傳遞過來的進程參數 put函數爲向隊列中添加一條數據。

if __name__ == '__main__':
    q = Queue() #建立一個Queue對象
    p = Process(target=f, args=(q,)) #建立一個進程
    p.start()
    print(q.get())  # ['Wed Jan 30 23:54:47 2019', 'hi', 'hello']
    # 子進程拿到了父進程的數據
    p.join()
View Code

生產者消費者模型

無信號純阻塞的模型

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()
    print('')
View Code

 帶信號的模型

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))
    q.put(None) #發送結束信號
if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()
    print('')

 改良版——生產者消費者模型
View Code

多消費者時的帶信號模型

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(name,q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='%s%s' %(name,i)
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=('包子',q))
    p2=Process(target=producer,args=('骨頭',q))
    p3=Process(target=producer,args=('泔水',q))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))
    c2=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    p2.start()
    p3.start()
    c1.start()

    p1.join() #必須保證生產者所有生產完畢,才應該發送結束信號
    p2.join()
    p3.join()
    q.put(None) #有幾個消費者就應該發送幾回結束信號None
    q.put(None) #發送結束信號
    print('')

 多個消費者的例子:有幾個消費者就須要發送幾回結束信號
View Code
相關文章
相關標籤/搜索