生產者和消費者模型

1、生產者和消費者模型

是一個解耦合的過程
補充: queue不適合傳大文件,通產傳一些消息.編程

生產者: 生產數據的任務多線程

消費者: 處理數據的任務
生產者能夠不停的生產,達到了本身最大的生產效率,消費者能夠不停的消費,也達到了本身最大的消費效率.生產者消費者模型大大提升了生產者生產的效率和消費者消費的效率.併發

生產者--隊列(盆)-->消費者dom

在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。線程

1.1爲何要使用生產者和消費者模式

在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。code

1.2什麼是生產者消費者模式

生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。對象

2、基於隊列實現生產者消費者模型

2.1生產者消費者模型1

from multiprocessing import Process,Queue
def producer(q,name,food):
    """生產者"""
    for i in range(1,11):
        print(f'{name}生產{food}{i}')
        res = f'{food}{i}'
        q.put(res)
    q.put(None)
def consumer(q,name):
    """消費者"""
    while True:
        res = q.get(timeout=3)
        if res is None:break
        print(f'{name}吃了{res}')
if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer,args=(q,'ocean','包子'))
    c1 = Process(target=consumer,args=(q,'chen'))
    p1.start()
    c1.start()
#####################
ocean生產包子1
ocean生產包子2
ocean生產包子3
ocean生產包子4
ocean生產包子5
ocean生產包子6
ocean生產包子7
ocean生產包子8
ocean生產包子9
ocean生產包子10
chen吃了包子1
chen吃了包子2
chen吃了包子3
chen吃了包子4
chen吃了包子5
chen吃了包子6
chen吃了包子7
chen吃了包子8
chen吃了包子9
chen吃了包子10

2.2生產者消費者模型2

from multiprocessing import Process,Queue
import time,random
def producer(q,name,food):
    """生產者"""
    for i in range(1,4):
        print(f'{name}生產了{food}{i}')
        time.sleep(random.randint(1,3))
        res = f'{food}{i}'
        q.put(res)
def consumer(q,name):
    """消費者"""
    while True:
        res = q.get(timeout=3)
        if res is None:break
        time.sleep(random.randint(1,3))
        print(f'{name}吃了{res}')
if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer,args=(q,'ocean','包子'))
    p2 = Process(target=producer,args=(q,'sky','韭菜'))
    p3 = Process(target=producer,args=(q,'rocky','酒'))
    c1 = Process(target=consumer,args=(q,'nick'))
    c2 = Process(target=consumer,args=(q,'mac'))
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    p3.join()#生產者生產完畢
    q.put(None)#存在幾個消費者,put幾回None
    q.put(None)
    # for i in range(2):
    #     q.put(None)
#############################
ocean生產了包子1
sky生產了韭菜1
rocky生產了酒1
ocean生產了包子2
sky生產了韭菜2
ocean生產了包子3
sky生產了韭菜3
mac吃了韭菜1
rocky生產了酒2
nick吃了包子1
mac吃了包子2
rocky生產了酒3
nick吃了韭菜2
mac吃了酒1
nick吃了包子3
nick吃了酒2
mac吃了韭菜3
nick吃了酒3

2.3生產者消費者模型3

2.3.1JoinableQueue([maxsize])模塊

建立可鏈接的共享進程隊列。這就像是一個Queue對象,但隊列容許項目的使用者通知生產者項目已經被成功處理。通知進程是使用共享的信號和條件變量來實現的。隊列

2.3.1.1方法介紹

q.task_done():使用者使用此方法發出信號,表示q.get()返回的項目已經被處理。若是調用此方法的次數大於從隊列中刪除的項目數量,將引起ValueError異常。進程

q.join():生產者將使用此方法進行阻塞,直到隊列中全部項目均被處理。阻塞將持續到爲隊列中的每一個項目均調用q.task_done()方法爲止。

from multiprocessing import JoinableQueue
#用法和Queue類似
q = JoinableQueue()
q.put('ocean')#隊列放入一個任務,內存在一個計數機制,+1
q.put('can')#計數機制 +1
print(q.get())
q.task_done()#完成一次任務,計數機制-1
print(q.get())
q.task_done()#計數機制  -1
q.join()#計數機制不爲0的時候,阻塞等待計數器爲0後經過
##################
ocean
can

2.3.2JionableQueue隊列實現生產者消費者模型

from multiprocessing import Process, JoinableQueue
import time, random


def producer(q, name, food):
    """生產者"""
    for i in range(3):
        print(f'{name}生產了{food}{i}')
        time.sleep(random.randint(1, 3))
        res = f'{food}{i}'
        q.put(res)


def consumer(q, name):
    """消費者"""
    while True:
        res = q.get()
        time.sleep(random.randint(1, 3))
        print(f'{name}吃了{res}')
        q.task_done()  # 用來計數


if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer, args=(q, 'ocean', '包子'))
    p2 = Process(target=producer, args=(q, 'chen', '菜'))
    p3 = Process(target=producer, args=(q, 'nick', '酒'))
    c1 = Process(target=consumer, args=(q, 'pocky'))
    c2 = Process(target=consumer, args=(q, 'mac'))
    p1.start()
    p2.start()
    p3.start()
    c1.daemon = True  # 定義爲收回進程
    c2.daemon = True
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    p3.join()  # 生產者生產完畢
    q.join()
    # 生產者生產完畢--這是主程序的最後一行代碼,執行後結束---q.join()消費者已經取乾淨了,沒有存在的意思了。
    # 這個是主程序的最後一行代碼結束,守護進程結束,也是守護進程的概念
###############################
ocean生產了包子0
chen生產了菜0
nick生產了酒0
chen生產了菜1
nick生產了酒1
ocean生產了包子1
chen生產了菜2
pocky吃了菜0
nick生產了酒2
ocean生產了包子2
mac吃了酒0
pocky吃了包子0
mac吃了菜1
mac吃了包子1
mac吃了菜2
pocky吃了酒1
pocky吃了包子2
mac吃了酒2
相關文章
相關標籤/搜索