112 python程序中的進程操做-進程之間進行通訊(mulitiProcessing Queue隊列)

1、進程間通訊

IPC(Inter-Process Communication)python

IPC機制:實現進程之間通信編程

管道:pipe 基於共享的內存空間安全

隊列:pipe+鎖的概念--->queue多線程

2、隊列(Queue)

2.1 概念-----multiProcess.Queue

建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。併發

Queue([maxsize])建立共享的進程隊列。
參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。dom

底層隊列使用管道和鎖定實現。函數

2.2 Queue方法使用

2.2.1 q.get的使用:

  1. 是從隊列裏面取值而且把隊列面的取出來的值刪掉,沒有參數的狀況下就是是默認一直等着取值測試

  2. 就算是隊列裏面沒有可取的值的時候,程序也不會結束,就會卡在哪裏,一直等着操作系統

    from multiprocessing import Queue
    
    q = Queue() # 生成一個隊列對象
    # put方法是往隊列裏面放值
    q.put('Cecilia陳')
    q.put('xuchen')
    q.put('喜陳')
    
    # get方法是從隊列裏面取值
    print(q.get())
    print(q.get())
    print(q.get())
    
    q.put(5)
    q.put(6)
    print(q.get())

    Cecilia陳
    xuchen
    喜陳
    5線程

2.2.2 Queue(參數) +參數的使用:

  1. Queue加參數之後,參數是數值

  2. 參數實幾就表示實例化的這個Queue隊列能夠放幾個值

  3. 當隊列已經滿的時候,再放值,程序會阻塞,但不會結束

    from multiprocessing import Queue
    q = Queue(3)
    q.put('Cecilia陳')
    q.put('xuchen')
    q.put('喜陳')
    print(q.full()) # 判斷隊列是否滿了 返回的是True/False
    q.put(2) # 當隊列已經滿的時候,再放值,程序會阻塞,但不會結束

    True 隊列已經滿了

2.2.3 q.put(參數1,參數2,參數3,參數4):

q.put(self, obj, block=True, timeout=None)

  1. self :put就至關因而Queue裏的一個方法,這個時候q.put就至關因而隊列對象q來調用對象的綁定方法,這個參數能夠省略便可

  2. obj:是咱們須要往隊列裏面放的值

  3. block=True :隊列若是滿了的話,再往隊列裏放值的話會等待,程序不會結束

  4. timeout=None:是再block這個參數的基礎上的,當block的值爲真的時候,timeout是用來等待多少秒,若是再這個時間裏,隊列一直是滿的,那麼程序就會報錯並結束(Queue.Full異常)

    from multiprocessing import Queue
    q = Queue(3)
    q.put('zhao',block=True,timeout=2)
    q.put('zhao',block=True,timeout=2)
    q.put('zhao',block=True,timeout=2)
    
    q.put('zhao',block=True,timeout=5) # 此時程序將對等待5秒之後報錯了

2.2.4 q.get(參數1,參數2,參數3,參數4):

q.get(self,block=True, timeout=None)

  1. self :get就至關因而Queue裏的一個方法,這個時候q.get就至關因而隊列對象q來調用對象的綁定方法,這個參數能夠省略便可

  2. block=True :從隊列q對象裏面取值,若是娶不到值的話,程序不會結束

  3. timeout=None:是再block這個參數的基礎上的,當block的值爲真的時候,timeout是用來等待多少秒,若是再這個時間裏,get取不到隊列裏面的值的話,那麼程序就會報錯並結束(queue.Empty異常)

    from multiprocessing import Queue
    q = Queue()
    q.put('Cecilia陳')
    print(q.get())
    
    q.get(block=True,timeout=2) # 此時程序會等待2秒後,報錯了,隊列裏面沒有值了

2.2.5 block=False

  1. 若是block的值是False的話,那麼put方法再隊列是滿的狀況下,不會等待阻塞,程序直接報錯(Queue.Full異常)結束
  2. 若是block的值是False的話,那麼get方法再隊列裏面沒有值的狀況下,再去取的時候,不會等待阻塞,程序直接報錯(queue.Empty異常)結束

1.put()的block=False

from multiprocessing import Queue
q = Queue(2)
q.put('Cecilia陳')
q.put('喜陳')
print(q.full())

q.put('xichen',block=False) # 隊列已經滿了,我不等待了,直接報錯

2.get()的block=Flase

from multiprocessing import Queue
q = Queue(2)
q.put('Cecilia陳')
q.put('喜陳')
print(q.get())
print(q.get())

print(q.get(block=False)) # 隊列已經沒有值了,我不等待了,直接報錯

2.2.6 put_nowait()/get_nowait()

1.put_nowait() 至關於bolok=False,隊列滿的時候,再放值的時候,程序不等待,不阻塞,直接報錯

from multiprocessing import Queue
q = Queue(2)
q.put('Cecilia陳')
q.put('喜陳')
print(q.full())

q.put_nowait('xichen') # 程序不等待,不阻塞,直接報錯

2.get_nowait() 至關於bolok=False,當隊列裏沒有值的時候,再取值的時候,程序不等待,不阻塞,程序直接報錯

from multiprocessing import Queue
q = Queue(2)
q.put('Cecilia陳')
q.put('喜陳')
print(q.get())
print(q.get())
print(q.full())

q.get_nowait()# 再取值的時候,程序不等待,不阻塞,程序直接報錯

3、代碼實例

3.1 單看隊列的存取數據用法

這個例子尚未加入進程通訊,只是先來看看隊列爲咱們提供的方法,以及這些方法的使用和現象。

'''
multiprocessing模塊支持進程間通訊的兩種主要形式:管道和隊列
都是基於消息傳遞實現的,可是隊列接口
'''

from multiprocessing import Queue
q=Queue(3)

#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3)   # 若是隊列已經滿了,程序就會停在這裏,等待數據被別人取走,再將數據放入隊列。
           # 若是隊列中的數據一直不被取走,程序就會永遠停在這裏。
try:
    q.put_nowait(3) # 可使用put_nowait,若是隊列滿了不會阻塞,可是會由於隊列滿了而報錯。
except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,可是會丟掉這個消息。
    print('隊列已經滿了')

# 所以,咱們再放入數據以前,能夠先看一下隊列的狀態,若是已經滿了,就不繼續put了。
print(q.full()) #滿了

print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # 同put方法同樣,若是隊列已經空了,那麼繼續取就會出現阻塞。
try:
    q.get_nowait(3) # 可使用get_nowait,若是隊列滿了不會阻塞,可是會由於沒取到值而報錯。
except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。
    print('隊列已經空了')

print(q.empty()) #空了

3.2 子進程向父進程發送數據

這是一個queue的簡單應用,使用隊列q對象調用get函數來取得隊列中最早進入的數據。

from multiprocessing import Process, Queue

def f(q,name,age):
    q.put(name,age)  #調用主函數中p進程傳遞過來的進程參數 put函數爲向隊列中添加一條數據。

if __name__ == '__main__':
    q = Queue() #建立一個Queue對象
    p = Process(target=f, args=(q,'Cecilia陳',18)) #建立一個進程
    p.start()
    print(q.get())
    p.join()

['Cecilia陳', 18]

4、生產者消費者模型

生產者: 生產數據的任務

消費者: 處理數據的任務

生產者--隊列(盆)-->消費者

生產者能夠不停的生產,達到了本身最大的生產效率,消費者能夠不停的消費,也達到了本身最大的消費效率.

生產者消費者模型大大提升了生產者生產的效率和消費者消費的效率.

補充: queue不適合傳大文件,通產傳一些消息.

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。

4.1 爲何要使用生產者和消費者模型

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

4.2 什麼是生產者消費者模型

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

4.3 基於Queue隊列實現的生產者消費者模型

from multiprocessing import Queue,Process

# 生產者
def producer(q,name,food):
    for i in range(3):
        print(f'{name}生產了{food}{i}')
        res = f'{food}{i}'
        q.put(res)

# 消費者
def consumer(q,name):
    while True:
        res = q.get(timeout=5)
        print(f'{name}吃了{res}')

if __name__ == '__main__':
    q = Queue() # 爲的是讓生產者和消費者使用同一個隊列,使用同一個隊列進行通信
    p1 = Process(target=producer,args=(q,'Cecilia陳','巧克力'))
    c1 = Process(target=consumer,args=(q,'Tom'))

    p1.start()
    c1.start()

此時的問題是主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。

解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環。

4.4 改良版----生產者消費者模型

注意:結束信號None,不必定要由生產者發,主進程裏一樣能夠發,但主進程須要等生產者結束後才應該發送該信號

from multiprocessing import Queue,Process

def producer(q,name,food):
    for i in range(3):
        print(f'{name}生產了{food}{i}')
        res = f'{food}{i}'
        q.put(res)
    q.put(None) # 當生產者結束生產的的時候,咱們再隊列的最後再作一個表示,告訴消費者,生產者已經不生產了,讓消費者不要再去隊列裏拿東西了


def consumer(q,name):
    while True:
        res = q.get(timeout=5)
        if res == None:break # 判斷隊列拿出的是否是生產者放的結束生產的標識,若是是則不取,直接退出,結束程序
        print(f'{name}吃了{res}')

if __name__ == '__main__':
    q = Queue() # 爲的是讓生產者和消費者使用同一個隊列,使用同一個隊列進行通信
    p1 = Process(target=producer,args=(q,'Cecilia陳','巧克力'))
    c1 = Process(target=consumer,args=(q,'Tom'))

    p1.start()
    c1.start()

4.5 主進程在生產者生產結束之後,發送結束信號

使用這個方法的話,是很low的,有幾個消費者就要在主進程中向隊列中put幾個結束信號

from multiprocessing import Queue,Process
import time,random

def producer(q,name,food):
    for i in range(3):
        print(f'{name}生產了{food}{i}')
        time.sleep((random.randint(1,3)))
        res = f'{food}{i}'
        q.put(res)
    # q.put(None) # 當生產者結束生產的的時候,咱們再隊列的最後再作一個表示,告訴消費者,生產者已經不生產了,讓消費者不要再去隊列裏拿東西了



def consumer(q,name):
    while True:
        res = q.get(timeout=5)
        if res == None:break # 判斷隊列拿出的是否是生產者放的結束生產的標識,若是是則不取,直接退出,結束程序
        time.sleep((random.randint(1, 3)))
        print(f'{name}吃了{res}')

if __name__ == '__main__':
    q = Queue() # 爲的是讓生產者和消費者使用同一個隊列,使用同一個隊列進行通信
    # 多個生產者進程
    p1 = Process(target=producer,args=(q,'Cecilia陳','巧克力'))
    p2 = Process(target=producer,args=(q,'xichen','冰激凌'))
    p3 = Process(target=producer,args=(q,'喜陳','可樂'))
    # 多個消費者進程
    c1 = Process(target=consumer,args=(q,'Tom'))
    c2 = Process(target=consumer,args=(q,'jack'))


    # 告訴操做系統啓動生產者進程
    p1.start()
    p2.start()
    p3.start()

    # 告訴操做系統啓動消費者進程
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

    q.put(None) # 幾個消費者put幾回
    q.put(None)

5、JoinableQueue方法

建立可鏈接的共享進程隊列。這就像是一個Queue對象,但隊列容許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

5.1 方法介紹

JoinableQueue的實例p除了與Queue對象相同的方法以外,還具備如下方法:

q.task_done():使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。若是調用此方法的次數大於從隊列中刪除的項目數量,將引起ValueError異常。

q.join():生產者將使用此方法進行阻塞,直到隊列中全部項目均被處理。阻塞將持續到爲隊列中的每一個項目均調用q.task_done()方法爲止。

5.2 joinableQueue隊列實現生產者消費者模型

from multiprocessing import Queue,Process,JoinableQueue
import time,random

def producer(q,name,food):
    for i in range(3):
        print(f'{name}生產了{food}{i}')
        # time.sleep((random.randint(1,3)))
        res = f'{food}{i}'
        q.put(res)
    # q.put(None) # 當生產者結束生產的的時候,咱們再隊列的最後再作一個表示,告訴消費者,生產者已經不生產了,讓消費者不要再去隊列裏拿東西了
    q.join()


def consumer(q,name):
    while True:
        res = q.get(timeout=5)
        # if res == None:break # 判斷隊列拿出的是否是生產者放的結束生產的標識,若是是則不取,直接退出,結束程序
        # time.sleep((random.randint(1, 3)))
        print(f'{name}吃了{res}')
        q.task_done()#向q.join()發送一次信號,證實一個數據已經被取走了


if __name__ == '__main__':
    q = JoinableQueue() # 爲的是讓生產者和消費者使用同一個隊列,使用同一個隊列進行通信
    # 多個生產者進程
    p1 = Process(target=producer,args=(q,'Cecilia陳','巧克力'))
    p2 = Process(target=producer,args=(q,'xichen','冰激凌'))
    p3 = Process(target=producer,args=(q,'喜陳','可樂'))
    # 多個消費者進程
    c1 = Process(target=consumer,args=(q,'Tom'))
    c2 = Process(target=consumer,args=(q,'jack'))


    # 告訴操做系統啓動生產者進程
    p1.start()
    p2.start()
    p3.start()

    # 把生產者設爲守護進程
    c1.daemon = True
    c2.daemon = True
    # 告訴操做系統啓動消費者進程
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join() # 等待生產者生產完畢

    print('主進程')

    ### 分析
    # 生產者生產完畢--這是主進程最後一行代碼結束--q.join()消費者已經取乾淨了,沒有存在的意義了
    # 這是主進程最後一行代碼結束,消費者已經取乾淨了,沒有存在的意義了.守護進程的概念.

5.3 測試joinableQueue

from multiprocessing import Process,Queue,JoinableQueue

q = JoinableQueue()

q.put('zhao') # 放隊列裏一個任務
q.put('qian')

print(q.get())
q.task_done() # 完成了一次任務
print(q.get())
q.task_done() # 完成了一次任務
q.join() #計數器不爲0的時候 阻塞等待計數器爲0後經過

# 想象成一個計數器 :put +1   task_done -1
相關文章
相關標籤/搜索