進程間通訊(隊列、管道)、消費者模型和進程池(apply,apply_async,map)

複製代碼
1、隊列(先進先出)
進程間通訊:IPC(Inter-Process Communication)

隊列是使用管道和鎖定實現,因此Queue是多進程安全的隊列,使用Queue能夠實現多進程之間的數據傳遞。 


1、Queue([maxsize]) 
建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。

Queue的實例q具備如下方法:
q.get( [ block [ ,timeout ] ] ) 
返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲,默認爲True阻塞. 若是設置爲False,拋出queue.Empty異常(定義在queue模塊中)。timeout是可選超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起queue.Empty異常。

q.get_nowait( ) 
至關於q.get(False)方法,沒有取到值也不會阻塞,直接拋出queue.Empty異常。

q.put(item [, block [,timeout ] ] ) 
將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True阻塞。若是設置爲False,將引起queue.Full異常(定義在queue庫模塊中)。timeout指定在阻塞模式中等待可用空間的時間長短。超時後將引起queue.Full異常。

q.put_nowait()
至關於q.put(item,False)方法,隊列已滿,不會阻塞,直接拋出queue.Full異常。

q.qsize() 
返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。

q.empty() 
若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

q.full() 
若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。


其餘方法(不經常使用):
q.close() 
關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者中的隊列不會致使get()方法返回錯誤。

q.cancel_join_thread() 
不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。

q.join_thread() 
鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方法由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread()方法能夠禁止這種行爲。



2、方法使用例子:
from multiprocessing import Queue
import queue

(1)
q = Queue(2)
q.put(1)
q.put(2)

print(q.get())   # 1
print(q.get())   # 2


(2)
q = Queue(2)
q.put(1)
q.put(2)
q.put(3)   # 一直阻塞在這裏

print(q.get()) 
print(q.get()) 


(3)
q = Queue(2)
q.put(1)
q.put(2)

print(q.get())   # 1
print(q.get())   # 2
print(q.get())   # 一直阻塞在這裏

(4)
q = Queue(2)
q.put(1)
q.put(2)
q.put_nowait(3)   # 異常:queue.Full
# q.put(3,False)  # 異常:queue.Full
# q.put(3,timeout=2)  # 2秒後異常:queue.Full


(5)
q = Queue(2)
q.put(1)
q.put(2)

print(q.get())  # 1
print(q.get())  # 2
print(q.get_nowait())   # 異常:queue.Empty
# print(q.get(False))   # 異常:queue.Empty
# print(q.get(timeout=2))   # 2秒後異常:queue.Empty


3、小例子
from multiprocessing import Process,Queue

def consume(q):
    print('子進程:',q.get())
    q.put('hi')
    
if __name__ == '__main__':
    q = Queue()
    p = Process(target=consume,args=(q,))
    p.start()
    q.put('hello')
    p.join()
    print('主進程:',q.get())



4、生產者消費者模型
場景:
爲何要使用生產者和消費者模式
在線程世界裏,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發當中,若是生產者處理速度很快,而消費者處理速度很慢,
那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這個問題因而引入了生產者和消費者模式。

什麼是生產者消費者模式
生產者消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而經過阻塞隊列來進行通信,
因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。


import time
import random
from multiprocessing import Process,Queue

def consumer(q,name):
    # 消費者
    while True:
        food = q.get()  # 在隊列中取值
        if food is None:break
        time.sleep(random.uniform(0.3,1))  # 模擬吃消耗的時間
        print('%s偷吃了%s,快打死他' %(name,food))

def producter(q,name,food):
    # 生產者
    for i in range(10):
        time.sleep(random.uniform(0.5,0.9))  # 模擬生產時間
        print('%s生產了%s,序號:%s' %(name,food,i))
        q.put(food+str(i))  # 把值存入隊列中

if __name__ == '__main__':
    q = Queue()  # Queue隊列對象
    c1 = Process(target=consumer,args=(q,'小明'))
    c2 = Process(target=consumer,args=(q,'小東'))
    c1.start()
    c2.start()

    p1 = Process(target=producter,args=(q,'張三','麪包'))
    p2 = Process(target=producter,args=(q,'李四','可樂'))
    p1.start()
    p2.start()

    p1.join()
    p2.join()

    q.put(None) # 有幾個consumer進程就須要放幾個None,表示生產完畢(這就有點low了)
    q.put(None)


    
2、JoinableQueue
JoinableQueue和Queue幾乎同樣,不一樣的是JoinableQueue隊列容許使用者告訴隊列某個數據已經處理了。通知進程是使用共享的信號和條件變量來實現的。 
task_done():使用者使用此方法發出信號,表示q.get()返回的項目已經被處理
join():當隊列中有數據的時候,使用此方法會進入阻塞,直到放入隊列中全部的數據都被處理掉才轉換成不阻塞(每處理一個數據就使用一次taskdone)

解決剛纔生產者消費者模型low的問題:
import time
import random
from multiprocessing import Process,JoinableQueue

def consumer(jq,name):
    # 消費者
    while True:
        food = jq.get()  # 在隊列中取值
        # if food is None:break
        time.sleep(random.uniform(0.3,1))  # 模擬吃消耗的時間
        print('%s偷吃了%s,快打死他' %(name,food))
        jq.task_done() # 向jq.join()發送一次信號,證實這個數據已經處理了

def producter(jq,name,food):
    # 生產者
    for i in range(10):
        time.sleep(random.uniform(0.5,0.9))  # 模擬生產時間
        print('%s生產了%s,序號:%s' %(name,food,i))
        jq.put(food+str(i))  # 把值存入隊列中


if __name__ == '__main__':
    jq = JoinableQueue()
    c1 = Process(target=consumer,args=(jq,'小明'))
    c2 = Process(target=consumer,args=(jq,'小東'))

    c1.daemon = True  # 把消費者設置爲守護進程
    c2.daemon = True
    
    c1.start()
    c2.start()

    p1 = Process(target=producter,args=(jq,'張三','麪包'))
    p2 = Process(target=producter,args=(jq,'李四','可樂'))
    p1.start()
    p2.start()

    p1.join()
    p2.join()
    
    jq.join()   # 數據所有被task_done後纔不阻塞
   





3、管道
#建立管道的類:
Pipe([duplex]):在進程之間建立一條管道,並返回元組(left,right),其中left,right表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道

#參數介紹:
duplex:默認管道是全雙工的,若是將duplex改爲False,left只能用於接收,right只能用於發送。


#主要方法:
    right.recv():接收left.send()發送的內容。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。
    letf.send():經過鏈接發送內容。
 
 
#其餘方法:
close():關閉鏈接。
fileno():返回鏈接使用的整數文件描述符
poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout設置成None,操做將無限期地等待數據到達。
 
recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收    
 
recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。


注意:
# 隊列是基於管道實現的,使用管道 + 鎖的模式造成的IPC方式,使得進程之間數據安全
# 管道是基於socket實現的,使用socket + pickle 的模式造成的IPC方式,使得進程之間數據是不安全的且存取數據複雜(同一時間不一樣進程對同一個數據進行處理,形成數據的不安全)


例子:(左邊發送,右邊接收)
from multiprocessing import Process,Pipe

def test(right):
    print(right.recv())
    right.close()

if __name__ == '__main__':
    left,right = Pipe()
    p = Process(target=test,args=(right,))
    p.start()
    left.send('hello')

    
(右邊發送,左邊接收)
from multiprocessing import Process,Pipe

def test(left):
    left.send('hi')
    left.close()

if __name__ == '__main__':
    left,right = Pipe()
    p = Process(target=test,args=(left,))
    p.start()
    print(right.recv())
    

# pipe的端口管理不會隨着某一個進程的關閉就關閉
# 操做系統來管理進程對這些端口的使用,不使用的端口應該關閉它
# 一條管道,兩個進程,就有4個端口  每關閉一個端口計數-1,直到只剩下一個端口的時候 recv就會報錯
# 若是不關閉不使用的端口,在已經把數據發送完畢的狀況下,那麼接收端的recv就會一直掛起,等待接收數據,這個進程就一直不能關閉
# 所以不使用的端口就應該關閉它,讓recv拋出異常後對這個進程進行處理

from multiprocessing import Process,Pipe
def consumer(left,right):
    left.close()  # 若這裏不close,則不會異常EOFError,數據接收完畢後,下面的right.recv()就會一直掛起
    while True:
        try:
            print(right.recv())
        except EOFError:
            break

if __name__ == '__main__':
    left,right = Pipe()
    Process(target=consumer,args=(left,right)).start()
    right.close()
    for i in range(10):
        left.send('Apple%s'%i)
    left.close()




 4、進程池(不能傳隊列做爲子進程的參數,只能傳管道) 1、爲何要有進程池?
在程序實際處理問題過程當中,忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。那麼在成千上萬個任務須要被執行的時候,咱們就須要去建立成千上萬個進程麼?
首先,建立進程須要消耗時間,銷燬進程也須要消耗時間。第二即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,這樣反而會影響程序的效率。所以咱們不能無限制的根據任務開啓或者結束進程。


2、進程池的概念
定義一個池子,在裏面放上固定數量的進程,有需求來了,就拿一個池中的進程來處理任務,等處處理完畢,進程並不關閉,而是將進程再放回進程池中繼續等待任務。
若是有不少任務須要執行,池中的進程數量不夠,任務就要等待以前的進程執行任務完畢歸來,拿到空閒進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程在運行。
這樣不會增長操做系統的調度難度,還節省了開閉進程的時間,也必定程度上可以實現併發效果。


# 信號量
    # 500件衣服          有500個任務
    # 500我的           建立了500個進程
    # 只有4臺機器        可是隻有4個CPU
    # 輪流去作,一我的作完了就走,機器留給另外一我的作

# 多進程
    # 500件衣服          有500個任務
    # 500我的           建立了500個進程
    # 搶4臺機器          可是隻有4個CPU
    # 你們搶着機器去作,搶到的就作

# 進程池
    # 500件衣服          有500個任務
    # 4我的             建立了4個進程
    # 4臺機器           有4個CUP
    # 4我的拿着機器作,作完一個繼續作下一個,直到500個作完
    

    
3、建立進程池
p = Pool([numprocess  [,initializer [, initargs]]])

參數介紹:
numprocess:要建立的進程數,若是省略,將默認使用os.cpu_count()的值,即你電腦CPU的個數
initializer:是每一個工做進程啓動時要執行的可調用對象,默認爲None
initargs:是要傳給initializer的參數組


主要方法:
同步提交 p.apply(func [, args [, kwargs]])
    返回值 : 子進程對應函數的返回值(子進程return的返回值)
    一個一個順序執行的,並無任何併發效果

異步提交 p.apply_async(func [, args [, kwargs]])
    沒有返回值,要想全部任務可以順利的執行完畢,須要執行:
        p.close()
        p.join()  # 必須先close再join,阻塞直到進程池中的全部任務都執行完畢
    有返回值(res接收返回值)的狀況下
        res.get()  # get不能在提交任務以後馬上執行,應該是先提交全部的任務再經過get獲取結果

p.map()
    異步提交的簡化版本
    自帶close和join方法

p.close():關閉進程池,防止進一步操做。若是全部操做持續掛起,它們將在工做進程終止前完成

p.jion():等待全部工做進程退出。此方法只能在close()或teminate()以後調用



其餘方法(不經常使用)
方法apply_async()和map_async()的返回值是AsyncResul的實例obj。實例具備如下方法
obj.get():返回結果,若是有必要則等待結果到達。timeout是可選的。若是在指定時間內尚未到達,將引起一場。若是遠程操做中引起了異常,它將在調用此方法時再次被引起。
obj.ready():若是調用完成,返回True
obj.successful():若是調用完成且沒有引起異常,返回True,若是在結果就緒以前調用此方法,引起異常
obj.wait([timeout]):等待結果變爲可用。
obj.terminate():當即終止全部工做進程,同時不執行任何清理或結束任何掛起工做。若是p被垃圾回收,將自動調用此函數



4、例子
4-1、apply
import os
import time
from multiprocessing import Pool

def test(num):
    time.sleep(1)
    print('%s:%s' %(num,os.getpid()))
    return num*2

if __name__ == '__main__':
    p = Pool()
    for i in range(20):
        res = p.apply(test,args=(i,)) # 提交任務的方法 同步提交
        print('-->',res)  # res就是test的return的值



        
4-2、apply_async
import time
from multiprocessing import Pool

def func(num):
    time.sleep(1)
    print('作了%s件衣服'%num)

if __name__ == '__main__':
    p = Pool(4)  # 進程池中建立4個進程,不寫的話,默認值爲你電腦的CUP數量
    for i in range(50):
        p.apply_async(func,args=(i,)) # 異步提交func到一個子進程中執行,沒有返回值的狀況
    p.close() # 關閉進程池,用戶不能再向這個池中提交任務了
    p.join()  # 阻塞,直到進程池中全部的任務都被執行完

注意:
異步提交且沒有返回值接收的狀況下必需要用close()和join()
由於若是沒有close()和join(),主進程執行完畢後會馬上把子進程回收了,至關於子進程還沒來得及開啓
因此要join,讓子進程結束後再結束父進程,可是進程池中要使用join就必須先進行close
    

import time
import os
from multiprocessing import Pool

def test(num):
    time.sleep(1)
    print('%s:%s' %(num,os.getpid()))
    return num*2

if __name__ == '__main__':
    p = Pool()
    res_lst = []
    for i in range(20):
        res = p.apply_async(test,args=(i,))   # 提交任務的方法 異步提交
        res_lst.append(res)
    for res in res_lst:
        print(res.get())

注意:
異步提交有返回值的狀況下,res是一個對象表明的是這個任務的編號,須要用res.get()方法讓任務執行且把返回值返回給你。
get有阻塞效果,拿到子進程的返回值後纔不阻塞,因此並不須要再使用close和join。



4-3、map
import time
import os
from multiprocessing import Pool

def test(num):
    time.sleep(1)
    print('%s:%s'%(num,os.getpid()))
    return num*2

if __name__ == '__main__':
    p = Pool()
    p.map(test,range(20))

注意:
map接收一個函數和一個可迭代對象,是異步提交的簡化版本,自帶close和join方法
可迭代對象的每個值就是函數接收的實參,可迭代對象的長度就是建立的任務數量
map能夠直接拿到返回值的可迭代對象(列表),循環就能夠獲取返回值
複製代碼
import time
from multiprocessing import Pool

def func(num):
    print('子進程:',num)
    # time.sleep(1)
    return num

if __name__ == '__main__':
    p = Pool()
    ret = p.map(func,range(10))   # ret是列表 for i in ret:
        print('返回值:',i)
複製代碼
 
  

 

 
複製代碼
相關文章
相關標籤/搜索