七. 併發編程 (進程隊列)

一. 進程間通訊——隊列(multiprocess.Queue)編程

1 .隊列概念介紹安全

建立共享的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。 

Queue([maxsize]) 
建立共享的進程隊列。
參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。

2. 隊列方法介紹多線程

Queue([maxsize]) 
建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使用管道和鎖定實現。另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。 
Queue的實例q具備如下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True. 若是設置爲False,將引起Queue.Empty異常
(定義在Queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。 q.get_nowait( ) 同q.get(False)方法。 q.put(item [, block [,timeout ] ] ) 將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。若是設置爲False,將引起Queue.Empty異常
(定義在Queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起Queue.Full異常。 q.qsize() 返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。
在某些系統上,此方法可能引起NotImplementedError異常。 q.empty() 若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。 q.full() 若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。 q.close() 關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將自動調用此方法。
關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。 q.cancel_join_thread() 不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。 q.join_thread() 鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。
調用q.cancel_join_thread()方法能夠禁止這種行爲。
單隊列 from multiprocessing import Process, Event,Queue
aa=Queue(4)
print(aa)
aa.put(1)
aa.put(2)
aa.put(3)
aa.put(4)
# aa.put(5)   # 若是隊列已經滿了,程序就會停在這裏,等待數據被別人取走,再將數據放入隊列。
           # 若是隊列中的數據一直不被取走,程序就會永遠停在這裏。

print(aa.full())  #  判斷隊列數據是否滿沒

print(aa.get())
print(aa.get())
print(aa.get())
print(aa.get())
print(aa.get())# 同put方法同樣,若是隊列已經空了,那麼繼續取就會出現阻塞。 下面不會打印
# print(1111111)

'''
multiprocessing模塊支持進程間通訊的兩種主要形式:管道和隊列
都是基於消息傳遞實現的,可是隊列接口
'''
from multiprocessing import Queue
q=Queue(3)
#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3)   # 若是隊列已經滿了,程序就會停在這裏,等待數據被別人取走,再將數據放入隊列。
           # 若是隊列中的數據一直不被取走,程序就會永遠停在這裏。
try:
    q.put_nowait(3) # 可使用put_nowait,若是隊列滿了不會阻塞,可是會由於隊列滿了而報錯。
except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去,可是會丟掉這個消息。
    print('隊列已經滿了')

# 所以,咱們再放入數據以前,能夠先看一下隊列的狀態,若是已經滿了,就不繼續put了。
print(q.full()) #滿了

print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # 同put方法同樣,若是隊列已經空了,那麼繼續取就會出現阻塞。
try:
    q.get_nowait(3) # 可使用get_nowait,若是隊列滿了不會阻塞,可是會由於沒取到值而報錯。
except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下去。
    print('隊列已經空了')

print(q.empty()) #空了
進程之間隊列通訊 from multiprocessing import Process, Event,Queue
def show(q):
   q.put(1)
   q.put(2)
   q.put(3)
if __name__=="__main__":
    q=Queue()
    p1=Process(target=show,args=(q,))
    p1.start()
    print(q.get())
    print(q.get())
    print(q.get())
    
    
print(「***********************************************」)
 進程之間獲取數據 (意思在開一個進程之間生成 在開一個進程之間獲取數據) def show(q):
   q.put(1)
   q.put(2)
   q.put(3)
def run(q):
    print(q.get())
    print(q.get())
    print(q.get())
    print("1111111111111")
if __name__=="__main__":
    q=Queue(3)  # 參數表示這個隊列裏面只能方法三個數據
    p1=Process(target=show,args=(q,))
    p1.start()
    p2 = Process(target=run, args=(q,))
    p2.start()
批量生產數據放入隊列再批量獲取結果 import os
import time
import multiprocessing

# 向queue中輸入數據的函數
def inputQ(queue):
    info = str(os.getpid()) + '(put):' + str(time.asctime())
    queue.put(info)

# 向queue中輸出數據的函數
def outputQ(queue):
    info = queue.get()
    print ('%s%s\033[32m%s\033[0m'%(str(os.getpid()), '(get):',info))

# Main
if __name__ == '__main__':
    multiprocessing.freeze_support()
    record1 = []   # store input processes  輸入存儲過程
    record2 = []   # store output processes輸入存儲過程
    queue = multiprocessing.Queue(3)
    # 輸入進程
    for i in range(5):
        process = multiprocessing.Process(target=inputQ,args=(queue,))
        process.start()
        record1.append(process)

    # 輸出進程
    for i in range(5):
        process = multiprocessing.Process(target=outputQ,args=(queue,))
        process.start()
        record2.append(process)

    print(record1)
    print(record2)

    for p in record1:
        p.join()   # 阻塞效果同步 意思就是等待裏面數據生產處理完   在批量輸出

    for p in record2:
        p.join()  # 阻塞效果同步 意思就是等待裏面數據生產處理完   在批量輸出

    print("執行完畢")


執行結果:

11148(get):10592(put):Mon Jul 15 21:24:16 2019
[<Process(Process-1, stopped)>, <Process(Process-2, started)>, <Process(Process-3, started)>, <Process(Process-4, started)>, <Process(Process-5, started)>]
[<Process(Process-6, started)>, <Process(Process-7, started)>, <Process(Process-8, started)>, <Process(Process-9, started)>, <Process(Process-10, started)>]
19304(get):5280(put):Mon Jul 15 21:24:17 2019
19880(get):1452(put):Mon Jul 15 21:24:17 2019
12272(get):2756(put):Mon Jul 15 21:24:17 2019
21108(get):16472(put):Mon Jul 15 21:24:17 2019
執行完畢併發

3. 隊列生生產者消費者模型app

生產者消費者模型
在併發編程中使用生產者和消費者模式可以解決絕大多數併發問題。該模式經過平衡生產線程和消費線程的工做能力來提升程序的總體處理數據的速度。

爲何要使用生產者和消費者模式
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,
才能繼續生產數據。
一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。 什麼是生產者消費者模式 生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,因此生產者生產完數據以後不用等待消費者處理,
直接扔給阻塞隊列,
消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
 
 
注意
此時的問題是主進程永遠不會結束,緣由是:生產者p在生產完後就結束了,可是消費者c在取空了q以後,
則一直處於死循環中且卡在q.get()這一步。
from multiprocessing import Process,Queue
import time,random,os
def con(q):
  while True:
    res=q.get()    #  當獲取的數據大於生產的時候 會阻塞(死循環)  可是消費者c在取空了q以後
    time.sleep(random.randint(1, 3))
    print('\033[45m%s 吃 %s\033[0m' % (os.getpid(),res))

def prod(q):
    for i in range(5):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))

if __name__=="__main__":
   q=Queue()
   p1= Process(target=prod, args=(q,))
   p1.start()
   p2= Process(target=con, args=(q,))
   p2.start()

   print("主進程!!!!!!!!!")
執行
主進程!!!!!!!!!
15876 生產了 包子0
15876 生產了 包子1
5572 吃 包子0
15876 生產了 包子2
5572 吃 包子1
15876 生產了 包子3
5572 吃 包子2
15876 生產了 包子4
5572 吃 包子3
5572 吃 包子4

沒結束掉進程
 
 
解決(上面阻塞就是死循環)方式無非是讓生產者在生產完畢後,
往隊列中再發一個結束信號,這樣消費者在接收到結束信號後就能夠break出死循環。
from multiprocessing import Process,Queue
import time,random,os
def con(q):
  while True:
    res=q.get()
    if res is None:break           # 添加結束條件來解決  阻塞(死循環)  就是明確生產者和消費者結束條件
    time.sleep(random.randint(1, 3))
    print('\033[45m%s 吃 %s\033[0m' % (os.getpid(),res))

def prod(q):
    for i in range(5):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))

    q.put(None)  # 發送結束信號  # 添加結束條件來解決  阻塞(死循環)  就是明確生產者和消費者結束條件

if __name__=="__main__":
   q=Queue()
   p1= Process(target=prod, args=(q,))

   p2= Process(target=con, args=(q,))
   p1.start()
   p2.start()
   print("主進程!!!!!!!!!")

執行結果:
主進程!!!!!!!!!
1764 生產了 包子0
1764 生產了 包子1
1764 生產了 包子2
20952 吃 包子0
20952 吃 包子1
1764 生產了 包子3
20952 吃 包子2
1764 生產了 包子4
20952 吃 包子3
20952 吃 包子4
    進程已結束,退出代碼 0
主進程在生產者生產完畢後發送結束信號None
from multiprocessing import Process,Queue
import time,random,os
def consumer(q):
    while True:
        res=q.get()
        if res is None:break #收到結束信號則結束
        time.sleep(random.randint(1,3))
        print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res))

def producer(q):
    for i in range(6):
        time.sleep(random.randint(1,3))
        res='包子%s' %i
        q.put(res)
        print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res))

if __name__ == '__main__':
    q=Queue()
    #生產者們:即廚師們
    p1=Process(target=producer,args=(q,))

    #消費者們:即吃貨們
    c1=Process(target=consumer,args=(q,))
    #開始
    p1.start()
    c1.start()

    p1.join()    # 阻塞 就是等因此數據生產完了
    q.put(None) #發送結束信號

    print('主進程')


執行結果:
    856 生產了 包子0
    4856 生產了 包子1
    4856 生產了 包子2
    8000 吃 包子0
    4856 生產了 包子3
    8000 吃 包子1
    4856 生產了 包子4
    8000 吃 包子2
    8000 吃 包子3
    4856 生產了 包子5
    主進程
    8000 吃 包子4
    8000 吃 包子5
    進程已結束,退出代碼 0
 
 
多個消費者的例子:有幾個消費者就須要發送幾回結束信號

from
multiprocessing import Process,Queue import time,random,os def consumer(q): while True: res=q.get() if res is None:break #收到結束信號則結束 time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) def producer(name,q): for i in range(2): time.sleep(random.randint(1,3)) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) if __name__ == '__main__': q=Queue() #生產者們:即廚師們 p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('麪條',q)) p3=Process(target=producer,args=('饅頭',q)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) #開始 p1.start() p2.start() p3.start() c1.start() p1.join() #必須保證生產者所有生產完畢,才應該發送結束信號 p2.join() p3.join() q.put(None) #有幾個消費者就應該發送幾回結束信號None q.put(None) #發送結束信號 print('主進程!!!!!')

執行結果    7900 生產了 饅頭0    21464 吃 饅頭0    17084 生產了 麪條0    4680 生產了 包子0    7900 生產了 饅頭1    4680 生產了 包子1    17084 生產了 麪條1    主進程!!!!!    21464 吃 麪條0    21464 吃 包子0    21464 吃 饅頭1    21464 吃 包子1    21464 吃 麪條1    進程已結束,退出代碼 0
相關文章
相關標籤/搜索