Python之路(第三十九篇)管道、進程間數據共享Manager

 

1、管道

概念

管道可用於具備親緣關係進程間的通訊,有名管道克服了管道沒有名字的限制,所以,除具備管道所具備的功能外,它還容許無親緣關係進程間的通訊.html

先畫一幅圖幫助你們理解下管道的基本原理python

 

  現有2個進程A和B,他們都在內存中開闢了空間,那麼咱們在內存中再開闢一個空間C,做用是鏈接這兩個進程的。對於進程來講內存空間是能夠共享的(任何一個進程均可以使用內存,內存當中的空間是用地址來標記的,咱們經過查找某一個地址就能找到這個內存)A進程能夠不斷的向C空間輸送東西,B進程能夠不斷的從C空間讀取東西,這就是進程間的通訊   .segmentfault

​ 管道在信息傳輸上是以流的方式傳輸, 也就是你從A進程不斷的寫入,B進程源源不斷的讀出,A進程先寫入的就會被B進程先讀出,後寫進來的就會被後讀出,安全

Pipe僅僅適用於只有兩個進程一讀一寫的半雙工狀況,也就是說信息是隻向一個方向流動。單項通訊叫作半雙工,雙向叫作全雙工.markdown

單工:簡單的說就是一方只能發信息,另外一方則只能收信息,通訊是單向的。數據結構

半雙工:比單工先進一點,就是雙方都能發信息,但同一時間則只能一方發信息。app

全雙工:比半雙工再先進一點,就是雙方不只都能發信息,並且可以同時發送。dom

實現機制:

​ 管道是由內核管理的一個緩衝區,至關於咱們放入內存中的一個紙條。管道的一端鏈接一個進程的輸出。這個進程會向管道中放入信息。管道的另外一端鏈接一個進程的輸入,這個進程取出被放入管道的信息。一個緩衝區不須要很大,它被設計成爲環形的數據結構,以便管道能夠被循環利用。當管道中沒有信息的話,從管道中讀取的進程會等待,直到另外一端的進程放入信息。當管道被放滿信息的時候,嘗試放入信息的進程會等待,直到另外一端的進程取出信息。當兩個進程都終結的時候,管道也自動消失。函數

管道特色

管道是單向的、先進先出的、無結構的字節流,它把一個進程的輸出和另外一個進程的輸入鏈接在一塊兒。ui

  • 寫進程在管道的尾端寫入數據,讀進程在管道的首端讀出數據。數據讀出後將從管道中移走,其它讀進程都不能再讀到這些數據。

  • 管道提供了簡單的流控制機制。進程試圖讀一個空管道時,在數據寫入管道前,進程將一直阻塞。一樣,管道已經滿時,進程再試圖寫管道,在其它進程從管道中讀走數據以前,寫進程將一直阻塞。

匿名管道具備的特色:

  • 只能用於具備親緣關係的進程之間的通訊(也就是父子進程或者兄弟進程之間)。

  • 一種半雙工的通訊模式,具備固定的讀端和寫端。

  • LINUX把管道看做是一種文件,採用文件管理的方法對管道進行管理,對於它的讀寫也可使用普通的read()和write()等函數。可是它不是普通的文件,並不屬於其餘任何文件系統,只存在於內核的內存空間中。

參數介紹

  #建立管道的類:
Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道
#參數介紹:
dumplex:默認管道是半雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。
#主要方法:
  conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。
  conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象
#其餘方法:
conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
conn1.fileno():返回鏈接使用的整數文件描述符
conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。

conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收    

conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。

建立管道過程的示意圖

 

例子

  
  ​
  # 主進程寫,子進程讀
  ​
  from multiprocessing import Pipe,Process
  ​
  def func(out_pipe, in_pipe):
      in_pipe.close()
      # 關閉複製過來的管道的輸入端
      while True:
          try :
              msg = out_pipe.recv() #子進程的管道端口接收主進程的數據
              print(msg)
          except EOFError:
              out_pipe.close()
              break
  if __name__ == '__main__':
      out_pipe, in_pipe = Pipe()
      Process(target=func,args = (out_pipe, in_pipe)).start() #啓動子進程
      out_pipe.close() #關閉主進程的輸出管道端口
      for i in range(20):
          in_pipe.send('hello world!') #經過管道的端口向子進程寫入
      in_pipe.close()

  

例子2

  
  # 出現EOF錯誤的狀況
  # 當pipe的輸入端被關閉,且沒法接收到輸入的值,那麼就會拋出EOFError。
  ​
  from multiprocessing import Pipe, Process
  ​
  ​
  def func(out_pipe, in_pipe):
      in_pipe.close()
      # 關閉複製過來的管道的輸入端
      while True:
  ​
          msg = out_pipe.recv()  # 子進程的管道端口接收主進程的數據
          print(msg)
  ​
  ​
  if __name__ == '__main__':
      out_pipe, in_pipe = Pipe()
      Process(target=func, args=(out_pipe, in_pipe)).start()  # 啓動子進程
      out_pipe.close()  # 關閉主進程的輸出管道端口
      for i in range(20):
          in_pipe.send('hello world!')  # 經過管道的端口向子進程寫入
      in_pipe.close()
 

  

基於管道實現生產者消費者模型

  
  from multiprocessing import Process,Pipe
  ​
  import time,random
  ​
  ​
  def consumer(p,name):
      in_pipe,out_pipe=p
      out_pipe.close()
      while True:
          try:
              # time.sleep(random.uniform(0,1))
              baozi=in_pipe.recv()
              print('%s 收到包子:%s' %(name,baozi))
          except EOFError:
              in_pipe.close()
              break
  def producer(p,name):
      in_pipe,out_pipe=p
      in_pipe.close()
      for i in range(10):
          # print(i)
          str ='%s生產的包子%s'%(name,i)
          out_pipe.send(str)
          # time.sleep(1)
      else:
          out_pipe.close()
  if __name__ == '__main__':
      in_pipe,out_pipe=Pipe()
      p = Process(target=producer,args=((in_pipe,out_pipe),'jack'))
  ​
      c1=Process(target=consumer,args=((in_pipe,out_pipe),'c1'))
      c2=Process(target=consumer,args=((in_pipe,out_pipe),'c2'))
      c1.start()
      c2.start()
      p.start()
  ​
      in_pipe.close()
      out_pipe.close()
  ​
      c1.join()
      c2.join()
      print('主進程')
  ​
  # 基於管道實現進程間通訊(與隊列的方式是相似的,隊列就是管道加鎖實現的)
  ## 加鎖來控制操做管道的行爲,來避免進程之間爭搶數據形成的數據不安全現象

  

這裏須要加鎖來解決數據不安全的狀況

  
  from multiprocessing import Process,Pipe,Lock
  ​
  def consumer(produce, consume,name,lock):
      produce.close()
      while True:
          lock.acquire()
          baozi=consume.recv()
          lock.release()
          if baozi:
              print('%s 收到包子:%s' %(name,baozi))
          else:
              consume.close()
              break
  ​
  def producer(produce, consume,n):
      consume.close()
      for i in range(n):
          produce.send(i)
      produce.send(None)
      produce.send(None)
      produce.close()
  ​
  if __name__ == '__main__':
      produce,consume=Pipe()
      lock = Lock()
      c1=Process(target=consumer,args=(produce,consume,'c1',lock))
      c2=Process(target=consumer,args=(produce,consume,'c2',lock))
      p1=Process(target=producer,args=(produce,consume,30))
      c1.start()
      c2.start()
      p1.start()
      produce.close()
      consume.close()

  

2、進程間的數據共享manager

使用Manager能夠方便的進行多進程數據共享,事實上Manager的功能遠不止於此 。Manager支持的類型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。

但與管道相似,這裏的數據也是不安全的。須要用鎖來解決。

  
  from multiprocessing import Manager,Process
  ​
  def main(dic):
      dic['count'] -= 1
      # print(dic)
  ​
  if __name__ == '__main__':
      m = Manager()#爲這個manager類註冊存儲容器,也就是經過這個manager類實現的共享的變量
      dic=m.dict({'count':100})
      p_lst = []
      for i in range(50):
          p = Process(target=main, args=(dic,))
          p_lst.append(p)
          p.start()
      for p in p_lst:
          p.join()
      print("主進程",dic['count'])

  

分析:多運行幾回能夠看到,每次輸出的結果都基本是不一樣的,所以這裏仍是須要用鎖來解決。

  
  from multiprocessing import Manager,Process,Lock
  ​
  ​
  def main(dic,lock):
  ​
      #     with lock:能夠這樣寫,也能夠寫成下面的樣子
      lock.acquire()
      dic['count'] -= 1
      lock.release()
  ​
  if __name__ == '__main__':
      m = Manager()
      l = Lock()
      dic=m.dict({'count':100})
      p_lst = []
      for i in range(50):
          p = Process(target=main,args=(dic,l))
          p.start()
          p_lst.append(p)
      for i in p_lst: i.join()
      print('主進程',dic)
 

  

 

 

參考資料

[1]http://www.javashuo.com/article/p-xgtlqpws-gh.html

[2]http://www.th7.cn/system/lin/201605/165994.shtml

[3]https://blog.csdn.net/weixin_39859512/article/details/80898340

相關文章
相關標籤/搜索