Python進程專題5:進程間通訊

上一篇文章: Python進程專題4:進程池Pool
下一篇文章: Python進程專題6:共享數據與同步

multiprocessing模塊支持的進程間通訊主要有兩種:管道和隊列。通常來講,發送較少的大對象比發送大量的小對象要好。segmentfault

Queue隊列

底層使用管道和鎖,同時運行支持線程講隊列中的數據傳輸到底層管道中,來實習進程間通訊。spa

  • 語法:
Queue([maxsize])
建立共享隊列。使用multiprocessing模塊的Queue實現多進程之間的數據傳遞。Queue自己是一個消息隊列,
maxsize是隊列運行的最大項數,若是不指定,則不限制大小。
  • 經常使用方法
q.close():關閉隊列,再也不向隊列中添加數據,那些已經進入隊列的數據會繼續處理。q被回收時將自動調用此方法。

q.empty():若是調用此方法時,隊列爲null返回True,單因爲此時其餘進程或者線程正在添加或刪除項,
因此結果不可靠,並且有些平臺運行該方法會直接報錯,個人mac系統運行該方法,直接報錯。

q.full():若是調用此方法時,隊列已滿,返回True,同q.empty()方法,結果不可靠。

q.get([block,timeout]):返回q中的一個項,block若是設置爲True,若是q隊列爲空,該方法會阻塞(就是不往下運行了,處於等待狀態),
直到隊列中有項可用爲止,若是同時頁設置了timeout,那麼在改時間間隔內,都沒有等到有用的項,就會引起Queue.Empty異常。
若是block設置爲false,timeout沒有意義,若是隊列爲空,將引起Queue.Empt異常。

q.get_nowait():等同於q.get(False)

q.put(item,block,timeout):將item放入隊列,若是此時隊列已滿:
若是block=True,timeout沒有設置,就會阻塞,直到有可用空間爲止。
若是block=True,timeout也設置,就會阻塞到timeout,超過這個時間會報Queue.Full異常。
若是block=False,timeout設置無效,直接報Queue.Full異常。

q.put_nowait(item):等同於q.put(item,False)

q.qsize():返回當前隊列項的數量,結果不可靠,並且mac會直接報錯:NotImplementedError。
  • 實例1:驗證:put方法會阻塞

實例:線程

#驗證:put方法會阻塞
from multiprocessing import Queue
queue=Queue(3)#初始化一個Queue隊列,能夠接受3個消息
queue.put("我是第1條信息")
queue.put("我是第2條信息")
queue.put("我是第3條信息")

print("插入第4條信息以前")
queue.put("我是第4條信息")
print("插入第4條信息以後")

效果:程序會一直阻塞,最後一句輸永遠也不會輸出。code

圖片描述

  • 實例2:closse方法、get方法、put方法簡單使用:多進程訪問同一個Queue

代碼:server

#closse方法、get方法、put方法簡單使用:多進程訪問同一個Queue
from multiprocessing import Queue,Process
import time,os
#參數q就是Queue實例
def mark(q,interval):
    time.sleep(interval)
    # 打印信息
    print("進程%d取出數據:"%os.getpid()+queue.get(True))


if __name__=="__main__":
    queue = Queue(3)  # 初始化一個Queue隊列,能夠接受3個消息
    queue.put("我是第1條信息")
    queue.put("我是第2條信息")
    queue.put("我是第3條信息")

    p1=Process(target=mark,args=(queue,1))
    p2=Process(target=mark,args=(queue,2))
    p3=Process(target=mark,args=(queue,3))

    p1.start()
    p2.start()
    p3.start()
    # 關閉隊列,再也不插入信息
    queue.close()
    # 下面插入會致使異常
    # queue.put("我是第4條信息")

    # 打印第1條信息
    print("程序語句執行完成")

效果對象

圖片描述

JoinableQueue隊列

建立可鏈接的共享進程隊列,能夠看作仍是一個Queue,只不過這個Queue除了Queue特有功能外,容許項的消費者通知項的生產者,項已經處理成功。該通知進程時使用共享的信號和條件變量來實現的。blog

JoinableQueue實例除了與Queue對象相同的方法外,還具備下列方法:接口

q.task_done():消費者使用此方法發送信號,表示q.get()返回的項已經被處理。
注意⚠️:若是調用此方法的次數大於隊列中刪除的項的數量,將引起ValueError異常。

q.join():生產者使用此方法進行阻塞,直到隊列中全部的項都被處理完成,即阻塞將持續到隊列中的每一項均調用q.task_done()方法爲止。

代碼實例:隊列

#利用JoinableQueue實現生產者與消費者,而且加入了哨兵,來監聽生產者的要求
from multiprocessing import JoinableQueue,Process
import time

#參數q爲JoinableQueue隊列實例
def mark(q):
    #循環接受信息,一直運行,這也下面爲何要將它設爲後臺進程的緣由,必須保證當主線程退出時,它能夠退出
    while True:
        value = q.get()
        print(value)  # 實際開發過程當中,此處通常用來進行有用的處理
        # 消費者發送信號:任務完成(此處實例的任務就是打印一下下)
        q.task_done()
        #我來方便看出效果,特地停留1s
        time.sleep(1)

        #使用哨兵,監聽生產者的消息,此處經過判斷value是否爲None來判斷傳遞的消息
        if value==None:
            #執行哨兵計劃後,後面的語句都不會輸出
            break




if __name__=="__main__":
    #實例化JoinableQueue
    q=JoinableQueue()

    #定義消費者進程
    p=Process(target=mark,args=(q,))
    #將消費者線程設置爲後臺進程,隨建立它的進程(此處是主進程)的終止而終止
    #也就是當它的建立進程(此處是主現場)意外退出時,它也會跟隨一塊兒退出。
    #而且後臺進程沒法建立新的進程
    p.daemon=True

    #啓動消費者進程
    p.start()

    #模擬生產者,生產多個項
    for xx in range(5):
        print(xx)
        #當xx==3時去執行哨兵計劃
        if xx==3:
            print("我用哨兵計劃了")
            q.put(None)
            print("哨兵計完美執行")
        q.put("第%d條消息"%xx)


    #等待全部項都處理完成再退出,因爲使用了哨兵計劃,隊列沒有徹底執行,因此會一直卡在這個位置
    q.join()

    print("程序真正退出了")

效果:進程

圖片描述

管道

除了使用隊列來進行進程間通訊,還可使用管道來進行消息傳遞。

  • 語法:
(connection1,connection2)=Pipe([duplex])
在進程間建立一條管道,並返回元祖(connection1,connection2),其中connection一、connection2表示兩端的Connection對象。
默認狀況下,duplex=True,此時管道是雙向的,若是設置duplex=false,connection1只能用於接收,connection2只能用於發送。
注意:必須在多進程建立以前建立管道。
  • 經常使用方法:
connection.close() :關閉鏈接,當connection被垃圾回收時,默認會調用該方法。

connection.fileno() :返回鏈接使用的整數文件描述符

connection.poll([timeout]):若是鏈接上的數據可用,返回True,timeout爲等待的最長時間,若是不指定,該方法將馬上返回結果。
若是指定爲None,該方法將會無限等待直到數據到達。

connection.send(obj):經過鏈接發送對象,obj是與序列號兼容的任意對象。

connection.send_bytes(buffer[,offset,size]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區的任何對象。
offset是緩衝區的字節偏移量,而size是要發送的字節數。

connection.recv():接收connection.send()方法返回的對象。若是鏈接的另外一端已經關閉,不再會存在任何數據,
該方法將引發EOFError異常。

connection.recv_bytes([maxlength]):接收connection.send_bytes()方法發送的一條完整字節信息,maxlength爲能夠接受的
最大字節數。若是消息超過這個最大數,將引起IOError異常,而且在鏈接上沒法進一步讀取。若是鏈接的另外一端已經關閉,
不再會有任何數據,該方法將引起EOFError異常。

connection.recv_bytes_into(buffer[,offset]):接收一條完整的字節信息,兵把它保存在buffer對象中,
該對象支持可寫入的緩衝區接口(就是bytearray對象或相似對象)。
offset指定緩衝區放置消息的字節偏移量。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
  • 實例1:理解管道的生產者與消費者

示意圖:

圖片描述

代碼:

#理解管道的生產者與消費者
from multiprocessing import Pipe, Process
import time

def mark(pipe):
    #接受參數
    output_p, input_p = pipe
    print("mark方法內部調用input_p.close()")
    #消費者(子進程)此實例只接收,因此把輸入關閉
    input_p.close()

    while True:
        try:
            item = output_p.recv()
        except EOFError:
            print("報錯了")
            break
        print(item)
        time.sleep(1)
    print("mark執行完成")


if __name__ == "__main__":
    #必須在多進程建立以前,建立管道,該管道是雙向的
    (output_p, input_p) = Pipe()#建立管道
    #建立一個進程,並把管道兩端都做爲參數傳遞過去
    p = Process(target=mark, args=((output_p, input_p),))
    #啓動進程
    p.start()
    #生產者(主進程)此實例只輸入,因此關閉輸出(接收端)
    output_p.close()

    for item in list(range(5)):
        input_p.send(item)
    print("主方法內部調用input_p.close()()")
    #關閉生產者(主進程)的輸入端
    input_p.close()

效果圖:

圖片描述

  • 實例2:利用管道實現多進程協做:子線程計算結果,返回給主線程

代碼:

#利用管道實現多進程協做:子線程計算結果,返回給主線程
from multiprocessing import Pipe, Process

def mark(pipe):
    #接受參數
    server_p, client_p = pipe
    #消費者(子進程)此實例只接收,因此把輸入關閉
    client_p.close()

    while True:
        try:
            x,y = server_p.recv()
        except EOFError:
            print("報錯了")
            break
        result=x+y
        server_p.send(result)
    print("mark執行完成")


if __name__ == "__main__":
    #必須在多進程建立以前,建立管道,該管道是雙向的
    (server_p, client_p) = Pipe()#建立管道
    #建立一個進程,並把管道兩端都做爲參數傳遞過去
    p = Process(target=mark, args=((server_p, client_p),))
    #啓動進程
    p.start()
    #生產者(主進程)此實例只輸入,因此關閉輸出(接收端)
    server_p.close()

    #發送數據
    client_p.send((4,5))
    #打印接受到的數據
    print(client_p.recv())

    client_p.send(("Mark", "大帥哥"))
    # 打印接受到的數據
    print(client_p.recv())





    #關閉生產者(主進程)的輸入端
    client_p.close()

結果:

9
Mark大帥哥
報錯了
mark執行完成
相關文章
相關標籤/搜索