上一篇文章: Python進程專題4:進程池Pool
下一篇文章: Python進程專題6:共享數據與同步
multiprocessing模塊支持的進程間通訊主要有兩種:管道和隊列。通常來講,發送較少的大對象比發送大量的小對象要好。segmentfault
底層使用管道和鎖,同時運行支持線程講隊列中的數據傳輸到底層管道中,來實習進程間通訊。spa
Queue([maxsize]) 建立共享隊列。使用multiprocessing模塊的Queue實現多進程之間的數據傳遞。Queue自己是一個消息隊列, maxsize是隊列運行的最大項數,若是不指定,則不限制大小。
q.close():關閉隊列,再也不向隊列中添加數據,那些已經進入隊列的數據會繼續處理。q被回收時將自動調用此方法。 q.empty():若是調用此方法時,隊列爲null返回True,單因爲此時其餘進程或者線程正在添加或刪除項, 因此結果不可靠,並且有些平臺運行該方法會直接報錯,個人mac系統運行該方法,直接報錯。 q.full():若是調用此方法時,隊列已滿,返回True,同q.empty()方法,結果不可靠。 q.get([block,timeout]):返回q中的一個項,block若是設置爲True,若是q隊列爲空,該方法會阻塞(就是不往下運行了,處於等待狀態), 直到隊列中有項可用爲止,若是同時頁設置了timeout,那麼在改時間間隔內,都沒有等到有用的項,就會引起Queue.Empty異常。 若是block設置爲false,timeout沒有意義,若是隊列爲空,將引起Queue.Empt異常。 q.get_nowait():等同於q.get(False) q.put(item,block,timeout):將item放入隊列,若是此時隊列已滿: 若是block=True,timeout沒有設置,就會阻塞,直到有可用空間爲止。 若是block=True,timeout也設置,就會阻塞到timeout,超過這個時間會報Queue.Full異常。 若是block=False,timeout設置無效,直接報Queue.Full異常。 q.put_nowait(item):等同於q.put(item,False) q.qsize():返回當前隊列項的數量,結果不可靠,並且mac會直接報錯:NotImplementedError。
實例:線程
#驗證:put方法會阻塞 from multiprocessing import Queue queue=Queue(3)#初始化一個Queue隊列,能夠接受3個消息 queue.put("我是第1條信息") queue.put("我是第2條信息") queue.put("我是第3條信息") print("插入第4條信息以前") queue.put("我是第4條信息") print("插入第4條信息以後")
效果:程序會一直阻塞,最後一句輸永遠也不會輸出。code
代碼:server
#closse方法、get方法、put方法簡單使用:多進程訪問同一個Queue from multiprocessing import Queue,Process import time,os #參數q就是Queue實例 def mark(q,interval): time.sleep(interval) # 打印信息 print("進程%d取出數據:"%os.getpid()+queue.get(True)) if __name__=="__main__": queue = Queue(3) # 初始化一個Queue隊列,能夠接受3個消息 queue.put("我是第1條信息") queue.put("我是第2條信息") queue.put("我是第3條信息") p1=Process(target=mark,args=(queue,1)) p2=Process(target=mark,args=(queue,2)) p3=Process(target=mark,args=(queue,3)) p1.start() p2.start() p3.start() # 關閉隊列,再也不插入信息 queue.close() # 下面插入會致使異常 # queue.put("我是第4條信息") # 打印第1條信息 print("程序語句執行完成")
效果對象
建立可鏈接的共享進程隊列,能夠看作仍是一個Queue,只不過這個Queue除了Queue特有功能外,容許項的消費者通知項的生產者,項已經處理成功。該通知進程時使用共享的信號和條件變量來實現的。blog
JoinableQueue實例除了與Queue對象相同的方法外,還具備下列方法:接口
q.task_done():消費者使用此方法發送信號,表示q.get()返回的項已經被處理。 注意⚠️:若是調用此方法的次數大於隊列中刪除的項的數量,將引起ValueError異常。 q.join():生產者使用此方法進行阻塞,直到隊列中全部的項都被處理完成,即阻塞將持續到隊列中的每一項均調用q.task_done()方法爲止。
代碼實例:隊列
#利用JoinableQueue實現生產者與消費者,而且加入了哨兵,來監聽生產者的要求 from multiprocessing import JoinableQueue,Process import time #參數q爲JoinableQueue隊列實例 def mark(q): #循環接受信息,一直運行,這也下面爲何要將它設爲後臺進程的緣由,必須保證當主線程退出時,它能夠退出 while True: value = q.get() print(value) # 實際開發過程當中,此處通常用來進行有用的處理 # 消費者發送信號:任務完成(此處實例的任務就是打印一下下) q.task_done() #我來方便看出效果,特地停留1s time.sleep(1) #使用哨兵,監聽生產者的消息,此處經過判斷value是否爲None來判斷傳遞的消息 if value==None: #執行哨兵計劃後,後面的語句都不會輸出 break if __name__=="__main__": #實例化JoinableQueue q=JoinableQueue() #定義消費者進程 p=Process(target=mark,args=(q,)) #將消費者線程設置爲後臺進程,隨建立它的進程(此處是主進程)的終止而終止 #也就是當它的建立進程(此處是主現場)意外退出時,它也會跟隨一塊兒退出。 #而且後臺進程沒法建立新的進程 p.daemon=True #啓動消費者進程 p.start() #模擬生產者,生產多個項 for xx in range(5): print(xx) #當xx==3時去執行哨兵計劃 if xx==3: print("我用哨兵計劃了") q.put(None) print("哨兵計完美執行") q.put("第%d條消息"%xx) #等待全部項都處理完成再退出,因爲使用了哨兵計劃,隊列沒有徹底執行,因此會一直卡在這個位置 q.join() print("程序真正退出了")
效果:進程
除了使用隊列來進行進程間通訊,還可使用管道來進行消息傳遞。
(connection1,connection2)=Pipe([duplex]) 在進程間建立一條管道,並返回元祖(connection1,connection2),其中connection一、connection2表示兩端的Connection對象。 默認狀況下,duplex=True,此時管道是雙向的,若是設置duplex=false,connection1只能用於接收,connection2只能用於發送。 注意:必須在多進程建立以前建立管道。
connection.close() :關閉鏈接,當connection被垃圾回收時,默認會調用該方法。 connection.fileno() :返回鏈接使用的整數文件描述符 connection.poll([timeout]):若是鏈接上的數據可用,返回True,timeout爲等待的最長時間,若是不指定,該方法將馬上返回結果。 若是指定爲None,該方法將會無限等待直到數據到達。 connection.send(obj):經過鏈接發送對象,obj是與序列號兼容的任意對象。 connection.send_bytes(buffer[,offset,size]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區的任何對象。 offset是緩衝區的字節偏移量,而size是要發送的字節數。 connection.recv():接收connection.send()方法返回的對象。若是鏈接的另外一端已經關閉,不再會存在任何數據, 該方法將引發EOFError異常。 connection.recv_bytes([maxlength]):接收connection.send_bytes()方法發送的一條完整字節信息,maxlength爲能夠接受的 最大字節數。若是消息超過這個最大數,將引起IOError異常,而且在鏈接上沒法進一步讀取。若是鏈接的另外一端已經關閉, 不再會有任何數據,該方法將引起EOFError異常。 connection.recv_bytes_into(buffer[,offset]):接收一條完整的字節信息,兵把它保存在buffer對象中, 該對象支持可寫入的緩衝區接口(就是bytearray對象或相似對象)。 offset指定緩衝區放置消息的字節偏移量。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
示意圖:
代碼:
#理解管道的生產者與消費者 from multiprocessing import Pipe, Process import time def mark(pipe): #接受參數 output_p, input_p = pipe print("mark方法內部調用input_p.close()") #消費者(子進程)此實例只接收,因此把輸入關閉 input_p.close() while True: try: item = output_p.recv() except EOFError: print("報錯了") break print(item) time.sleep(1) print("mark執行完成") if __name__ == "__main__": #必須在多進程建立以前,建立管道,該管道是雙向的 (output_p, input_p) = Pipe()#建立管道 #建立一個進程,並把管道兩端都做爲參數傳遞過去 p = Process(target=mark, args=((output_p, input_p),)) #啓動進程 p.start() #生產者(主進程)此實例只輸入,因此關閉輸出(接收端) output_p.close() for item in list(range(5)): input_p.send(item) print("主方法內部調用input_p.close()()") #關閉生產者(主進程)的輸入端 input_p.close()
效果圖:
代碼:
#利用管道實現多進程協做:子線程計算結果,返回給主線程 from multiprocessing import Pipe, Process def mark(pipe): #接受參數 server_p, client_p = pipe #消費者(子進程)此實例只接收,因此把輸入關閉 client_p.close() while True: try: x,y = server_p.recv() except EOFError: print("報錯了") break result=x+y server_p.send(result) print("mark執行完成") if __name__ == "__main__": #必須在多進程建立以前,建立管道,該管道是雙向的 (server_p, client_p) = Pipe()#建立管道 #建立一個進程,並把管道兩端都做爲參數傳遞過去 p = Process(target=mark, args=((server_p, client_p),)) #啓動進程 p.start() #生產者(主進程)此實例只輸入,因此關閉輸出(接收端) server_p.close() #發送數據 client_p.send((4,5)) #打印接受到的數據 print(client_p.recv()) client_p.send(("Mark", "大帥哥")) # 打印接受到的數據 print(client_p.recv()) #關閉生產者(主進程)的輸入端 client_p.close()
結果:
9 Mark大帥哥 報錯了 mark執行完成