管道可用於具備親緣關係進程間的通訊,有名管道克服了管道沒有名字的限制,所以,除具備管道所具備的功能外,它還容許無親緣關係進程間的通訊.html
先畫一幅圖幫助你們理解下管道的基本原理python
現有2個進程A和B,他們都在內存中開闢了空間,那麼咱們在內存中再開闢一個空間C,做用是鏈接這兩個進程的。對於進程來講內存空間是能夠共享的(任何一個進程均可以使用內存,內存當中的空間是用地址來標記的,咱們經過查找某一個地址就能找到這個內存)A進程能夠不斷的向C空間輸送東西,B進程能夠不斷的從C空間讀取東西,這就是進程間的通訊 .segmentfault
管道在信息傳輸上是以流的方式傳輸, 也就是你從A進程不斷的寫入,B進程源源不斷的讀出,A進程先寫入的就會被B進程先讀出,後寫進來的就會被後讀出,安全
Pipe僅僅適用於只有兩個進程一讀一寫的半雙工狀況,也就是說信息是隻向一個方向流動。單項通訊叫作半雙工,雙向叫作全雙工.markdown
單工:簡單的說就是一方只能發信息,另外一方則只能收信息,通訊是單向的。數據結構
半雙工:比單工先進一點,就是雙方都能發信息,但同一時間則只能一方發信息。app
全雙工:比半雙工再先進一點,就是雙方不只都能發信息,並且可以同時發送。dom
管道是由內核管理的一個緩衝區,至關於咱們放入內存中的一個紙條。管道的一端鏈接一個進程的輸出。這個進程會向管道中放入信息。管道的另外一端鏈接一個進程的輸入,這個進程取出被放入管道的信息。一個緩衝區不須要很大,它被設計成爲環形的數據結構,以便管道能夠被循環利用。當管道中沒有信息的話,從管道中讀取的進程會等待,直到另外一端的進程放入信息。當管道被放滿信息的時候,嘗試放入信息的進程會等待,直到另外一端的進程取出信息。當兩個進程都終結的時候,管道也自動消失。函數
管道是單向的、先進先出的、無結構的字節流,它把一個進程的輸出和另外一個進程的輸入鏈接在一塊兒。ui
寫進程在管道的尾端寫入數據,讀進程在管道的首端讀出數據。數據讀出後將從管道中移走,其它讀進程都不能再讀到這些數據。
管道提供了簡單的流控制機制。進程試圖讀一個空管道時,在數據寫入管道前,進程將一直阻塞。一樣,管道已經滿時,進程再試圖寫管道,在其它進程從管道中讀走數據以前,寫進程將一直阻塞。
只能用於具備親緣關係的進程之間的通訊(也就是父子進程或者兄弟進程之間)。
一種半雙工的通訊模式,具備固定的讀端和寫端。
LINUX把管道看做是一種文件,採用文件管理的方法對管道進行管理,對於它的讀寫也可使用普通的read()和write()等函數。可是它不是普通的文件,並不屬於其餘任何文件系統,只存在於內核的內存空間中。
#建立管道的類:
Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道
#參數介紹:
dumplex:默認管道是半雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。
#主要方法:
conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。
conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
#其餘方法:
conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
conn1.fileno():返回鏈接使用的整數文件描述符
conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。
例子
# 主進程寫,子進程讀 from multiprocessing import Pipe,Process def func(out_pipe, in_pipe): in_pipe.close() # 關閉複製過來的管道的輸入端 while True: try : msg = out_pipe.recv() #子進程的管道端口接收主進程的數據 print(msg) except EOFError: out_pipe.close() break if __name__ == '__main__': out_pipe, in_pipe = Pipe() Process(target=func,args = (out_pipe, in_pipe)).start() #啓動子進程 out_pipe.close() #關閉主進程的輸出管道端口 for i in range(20): in_pipe.send('hello world!') #經過管道的端口向子進程寫入 in_pipe.close()
例子2
# 出現EOF錯誤的狀況 # 當pipe的輸入端被關閉,且沒法接收到輸入的值,那麼就會拋出EOFError。 from multiprocessing import Pipe, Process def func(out_pipe, in_pipe): in_pipe.close() # 關閉複製過來的管道的輸入端 while True: msg = out_pipe.recv() # 子進程的管道端口接收主進程的數據 print(msg) if __name__ == '__main__': out_pipe, in_pipe = Pipe() Process(target=func, args=(out_pipe, in_pipe)).start() # 啓動子進程 out_pipe.close() # 關閉主進程的輸出管道端口 for i in range(20): in_pipe.send('hello world!') # 經過管道的端口向子進程寫入 in_pipe.close()
from multiprocessing import Process,Pipe import time,random def consumer(p,name): in_pipe,out_pipe=p out_pipe.close() while True: try: # time.sleep(random.uniform(0,1)) baozi=in_pipe.recv() print('%s 收到包子:%s' %(name,baozi)) except EOFError: in_pipe.close() break def producer(p,name): in_pipe,out_pipe=p in_pipe.close() for i in range(10): # print(i) str ='%s生產的包子%s'%(name,i) out_pipe.send(str) # time.sleep(1) else: out_pipe.close() if __name__ == '__main__': in_pipe,out_pipe=Pipe() p = Process(target=producer,args=((in_pipe,out_pipe),'jack')) c1=Process(target=consumer,args=((in_pipe,out_pipe),'c1')) c2=Process(target=consumer,args=((in_pipe,out_pipe),'c2')) c1.start() c2.start() p.start() in_pipe.close() out_pipe.close() c1.join() c2.join() print('主進程') # 基於管道實現進程間通訊(與隊列的方式是相似的,隊列就是管道加鎖實現的) ## 加鎖來控制操做管道的行爲,來避免進程之間爭搶數據形成的數據不安全現象
這裏須要加鎖來解決數據不安全的狀況
from multiprocessing import Process,Pipe,Lock def consumer(produce, consume,name,lock): produce.close() while True: lock.acquire() baozi=consume.recv() lock.release() if baozi: print('%s 收到包子:%s' %(name,baozi)) else: consume.close() break def producer(produce, consume,n): consume.close() for i in range(n): produce.send(i) produce.send(None) produce.send(None) produce.close() if __name__ == '__main__': produce,consume=Pipe() lock = Lock() c1=Process(target=consumer,args=(produce,consume,'c1',lock)) c2=Process(target=consumer,args=(produce,consume,'c2',lock)) p1=Process(target=producer,args=(produce,consume,30)) c1.start() c2.start() p1.start() produce.close() consume.close()
使用Manager能夠方便的進行多進程數據共享,事實上Manager的功能遠不止於此 。Manager支持的類型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。
但與管道相似,這裏的數據也是不安全的。須要用鎖來解決。
from multiprocessing import Manager,Process def main(dic): dic['count'] -= 1 # print(dic) if __name__ == '__main__': m = Manager()#爲這個manager類註冊存儲容器,也就是經過這個manager類實現的共享的變量 dic=m.dict({'count':100}) p_lst = [] for i in range(50): p = Process(target=main, args=(dic,)) p_lst.append(p) p.start() for p in p_lst: p.join() print("主進程",dic['count'])
分析:多運行幾回能夠看到,每次輸出的結果都基本是不一樣的,所以這裏仍是須要用鎖來解決。
from multiprocessing import Manager,Process,Lock def main(dic,lock): # with lock:能夠這樣寫,也能夠寫成下面的樣子 lock.acquire() dic['count'] -= 1 lock.release() if __name__ == '__main__': m = Manager() l = Lock() dic=m.dict({'count':100}) p_lst = [] for i in range(50): p = Process(target=main,args=(dic,l)) p.start() p_lst.append(p) for i in p_lst: i.join() print('主進程',dic)
[1]http://www.javashuo.com/article/p-xgtlqpws-gh.html
[2]http://www.th7.cn/system/lin/201605/165994.shtml
[3]https://blog.csdn.net/weixin_39859512/article/details/80898340