1、 管道數據庫
建立管道的類安全
Pipe([duplex]):在進程之間建立一條管道,並返回元祖(conn1,conn2),其中conn1,conn2表示管道兩端的鏈接對象,強調一點:必須在產生process對象以前產生管道併發
參數介紹app
dumplex函數
默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送工具
主要方法:ui
Conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。若是鏈接的另外一端已經關閉,那麼recv方法會拋出EOFError。spa
Conn1.send(obj):經過鏈接發送對象。Obj是與序列化兼容的任意對象操作系統
其餘方法:線程
Conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
Conn1.fileno():返回鏈接使用的整數文件描述符
Conn1.poll(【timeout】):若是鏈接上的數據可用,返回True。Timeout指定等待的最長時限。若是省略此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達
Conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。Maxlength指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起ioError異常,而且在鏈接上沒法進行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起eoferror異常
Conn.send_bytes(buffer[,offset[,size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝區接口的任意對象,offset是緩衝區中字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發出,而後調用c.recv_bytes()函數進行接收
Conn1.recv_bytes_info(buffer[,offset])接收一條完整的字節消息,並把它保存在buffer對象中,該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。Offset指定緩衝區中放置消息處的字節位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起bufferTooShort異常
from multiprocessing import Pipe,Process def func(conn1,): # conn2.close() msg = conn1.recv() print('>>>>>',msg) if __name__ == '__main__': conn1,conn2 = Pipe() p = Process(target=func,args=(conn1,)) p.start() # conn1.close() # conn2.close() conn2.send('小鬼!') print('主進程結束')
應該特別注意管道端點的正確管理問題。若是生產者或消費者都沒有使用管道的某個端點,就應將它關閉。這也說明了爲什麼生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。若是忘記要執行這些步驟,程序可能在消費者中的recv()操做上掛起(就是阻塞)。管道是由操做系統進行引用計數的,必須在全部進程中關閉管道的相同一端就會能生成EOFError異常。所以,在生產者中關閉通道不會有任何效果,除非消費者也關閉了相同的管道端點。
主進程將管道的兩端都傳給子進程,子進程和主進程共用管道的兩種報錯狀況,都是在recv接收的時候報錯的:
1, 主進程和子進程的管道的相同一端都關閉了,出現EOFError
2, 若是你管道的一端在主進程和子進程中都關閉了,可是你還用這個關閉的一端去接收消息,那麼就會出現OSError
因此你關閉管道的時候,就會容易出現問題,須要將全部只有這個管道的進程中的兩端所有關閉才行,固然也能夠經過異常捕獲(try:except EOFerror)來處理。
雖然咱們在主進程和子進程中都打印了conn1一端的對象,發現兩個再也不同一個地址,可是子進程中的管道和主進程的管道仍是能夠通訊的,由於管道是同一套,系統可以記錄
咱們的目的就是關閉全部的通道,那麼主進程和子進程進行通訊的時候,能夠給子進程傳管道的一端就夠了,而且用咱們以前學到的,信息發送完後,再發送一個結束信號None,那麼你收到的消息爲None的時候直接結束接收或者說結束循環,就不用每次都關閉各個進程中管道了。
from multiprocessing import Process,Pipe def consumer(p,name): produce,consume = p produce.close() while 1: try: baozi = consume.recv() print('%s 收到包子:%s'%(name,baozi)) except EOFError: break def producer(seq,p): produce,consume = p consume.close() for i in seq: produce.send(i) if __name__ == '__main__': produce,consume = Pipe() c1 = Process(target=consumer,args=((produce,consume),'c1')) c1.start() seq = (i for i in range(10)) producer(seq,(produce,consume)) produce.close() consume.close() c1.join() print('主進程')
關於管道會形成數據不安全問題的官方解釋:
由Pipe方法返回的兩個鏈接對象表示管道的兩端。每一個鏈接對象都有send和recv方法(除其餘以外)。注意,若是兩個進程(或線程)試圖同時從管道的同一端讀取或寫入數據,那麼管道中的數據可能會損壞。固然,在使用管道的不一樣端部的過程當中不存在損壞風險
from multiprocessing import Process,Pipe,Lock def consumer(p,name,lock): produce, consume=p produce.close() while True: lock.acquire() baozi=consume.recv() lock.release() if baozi: print('%s 收到包子:%s' %(name,baozi)) else: consume.close() break def producer(p,n): produce, consume=p consume.close() for i in range(n): produce.send(i) produce.send(None) produce.send(None) produce.close() if __name__ == '__main__': produce,consume=Pipe() lock = Lock() c1=Process(target=consumer,args=((produce,consume),'c1',lock)) c2=Process(target=consumer,args=((produce,consume),'c2',lock)) p1=Process(target=producer,args=((produce,consume),10)) c1.start() c2.start() p1.start() produce.close() consume.close() c1.join() c2.join() p1.join() print('主進程')
from multiprocessing import Manager,Process,Lock
def work(d,lock):
with lock:
d['count'] -=1
if __name__ == '__main__':
lock = Lock()
with Manager() as m:
dic = m.dict({'count':100})
p_1 = []
for i in range(100):
p = Process(target=work,args=(dic,lock))
p_1.append(p)
p.start()
for p in p_1:
p.join()
print(dic)
1、 數據共享
進程間應該儘可能避免通訊,即使須要通訊,也應該選擇進程安全的工具來避免加鎖帶來的問題,應該儘可能避免共享數據的方式,之後會使用數據庫來解決進程之間的數據共享問題
總結一下,進程之間的通訊:隊列,管道,數據共享也算
1、 進程池和mutiprocess.poll
爲何要有進程池?進程池的概念
在程序實際處理問題過程當中,忙時會有成千上萬的任務須要被執行,閒時可能只有零星任務。那麼在成千上萬割任務須要被執行的時候,咱們就須要去船艦成千上萬個進程嗎?首先,建立進程須要消耗時間,銷燬進程(空間,變量,文件信息等等內容)也須要消耗時間。第二即使開啓了成千上萬的進程,操做系統也不能讓他們同時執行,維護一個很大的進程列表的同時,調度的時候,還須要進行切換而且記錄每一個進程的執行節點,也就是記錄上下文,這樣反而會影響程序的效率。所以咱們不能無限制的根據任務開啓或者開啓程序。
定義一個池子,有需求來了,就拿一個池中的進程來處理任務,等處理完畢,進程並不關閉,而是將進程再放回池中繼續等待任務。若是有不少任務須要執行,池中的數量不夠,任務就要等待以前的進程執行完畢歸來,拿到空閒進程才能繼續執行。也就是說,池中進程的數量是固定的,那麼同一時間最多有固定數量的進程再運行。這樣不會增長操做系統的調度難度,還節省了開閉進程的時間,也必定程度上可以實現併發效果