併發編程-進程-07進程間通訊和生產消費者模型

一丶隊列模塊調用

(推薦使用隊列)

建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。python

from multiprocessing import Queue

Queue([maxsize])建立共享的進程隊列。
參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。安全

底層隊列使用管道和鎖定實現。服務器

二丶隊列方法介紹

2.1核心取放數據

經常使用

q.get( [ block [ ,timeout ] ] ):返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。dom

q.put(item [, block [,timeout ] ] ) :將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。線程

不經常使用

q.close() :關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。code

2.2有弊端的方法

q.empty() :若是調用此方法時 q爲空,返回True。繼承

q.get_nowait() :取值 沒有值不等待直接報錯隊列

q.full() :若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的進程

2.21有弊端緣由

若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。這時候empty,full,get_nowait取到結果和實際管道的結果就可能不一致了!ip

三丶進程隊列的代碼實現

"""
隊列:先進先出
堆棧:先進後出
"""
from multiprocessing import Queue

q = Queue(5)  # 括號內能夠傳參數 表示的是這個隊列的最大存儲數
# 往隊列中添加數據
q.put(1)
q.put(2)
# print(q.full())  # 判斷隊列是否滿了
q.put(3)
q.put(4)
q.put(5)
# print(q.full())
# q.put(6)  # 當隊列滿了以後 再放入數據 不會報錯 會原地等待 直到隊列中有數據被取走(阻塞態)

print(q.get())
print(q.get())
print(q.get())
print(q.empty())  # 判斷隊列中的數據是否取完
print(q.get())
print(q.get())
print(q.empty())
# print(q.get_nowait())  # 取值 沒有值不等待直接報錯
# print(q.get())  # 當隊列中的數據被取完以後 再次獲取 程序會阻塞 直到有人往隊列中放入值
"""
full
get_nowait
empty
都不適用於多進程的狀況
"""

四丶進程間通訊

進程間通訊:IPC(Inter-Process Communication)

from multiprocessing import Process,Queue

def producer(q):
    q.put('hello GF~')

def consumer(q):
    print(q.get())

if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer,args=(q,))
    c = Process(target=consumer, args=(q,))
    p.start()
    c.start()

"""
子進程放數據 主進程獲取數據
兩個子進程相互放 取數據
"""

五丶生產者消費者模型

5.1什麼是生產者消費者模型

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。

5.2生產者消費者模型好處

生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。

5.3基於隊列實現生產者消費者模型

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('%s 吃 %s' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('生產了 %s' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()
    print('主')

此時的問題是主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,則一直處於死循環中且卡在q.get()這一步。

解決方式無非是讓生產者在生產完畢後,往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環。

5.4解決死循環方法

由於數據沒有,get()會一直阻塞,根據隊列先進先出的原理,這樣生產者在發完全部數據後發個標誌數據,好比None,而消費者進行判斷,當最後結果爲None時候結束循環.

方式一

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('%s 吃 %s' %(os.getpid(),res))

def producer(q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('生產了 %s' %(os.getpid(),res))
    q.put(None) #發送結束信號
if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()
    print('主')

方式二

from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('%s 吃 %s' %(os.getpid(),res))

def producer(q):
    for i in range(2):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('生產了 %s' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))

    #開始
    p1.start()
    c1.start()

    p1.join()
    q.put(None) #發送結束信號
    print('主')

注意:結束信號None,不必定要由生產者發,主進程裏一樣能夠發,但主進程須要等生產者結束後才應該發送該信號。

六丶通知進程JoinableQueue

建立可鏈接的共享進程隊列。但隊列容許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。

本質就是繼承了Queue,而後基礎上添加了task_done和join方法.

6.1方法介紹

  1. q.task_done():使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。
  2. q.join():生產者將使用此方法進行阻塞,直到隊列中全部項目均被處理。阻塞將持續到爲隊列中的每一個項目均調用q.task_done()方法爲止。

6.2JoinableQueue隊列實現生產消費模型

"""
生產者:生產/製造數據的
消費者:消費/處理數據的
例子:作包子的,買包子的
        1.作包子遠比買包子的多
        2.作包子的遠比包子的少
        供需不平衡的問題
"""
from multiprocessing import Process,Queue,JoinableQueue
import random
import time


def producer(name,food,q):
    for i in range(10):
        data = '%s生產了%s%s'%(name,food,i)
        time.sleep(random.random())
        q.put(data)
        print(data)

def consumer(name,q):
    while True:
        data = q.get()
        if data == None:break
        print('%s吃了%s'%(name,data))
        time.sleep(random.random())
        q.task_done()  # 告訴隊列你已經從隊列中取出了一個數據 而且處理完畢了


if __name__ == '__main__':
    q = JoinableQueue()

    p = Process(target=producer,args=('大廚egon','饅頭',q))
    p1 = Process(target=producer,args=('跟班tank','生蠔',q))
    c = Process(target=consumer,args=('許兆龍',q))
    c1 = Process(target=consumer,args=('吃貨jerry',q))
    p.start()
    p1.start()
    c.daemon = True
    c1.daemon = True
    c.start()
    c1.start()
    p.join()
    p1.join()

    q.join()  # 等到隊列中數據所有取出
    # q.put(None)
    # q.put(None)

七丶管道(推薦使用隊列)

  1. 基於管道實現進程間通訊(與隊列的方式是相似的,隊列就是管道加鎖實現的)

  2. 管道能夠用於雙向通訊,利用一般在客戶端/服務器中使用的請求/響應模型或遠程過程調用,就可使用管道編寫與進程交互的程序

相關文章
相關標籤/搜索