********進程間通訊-隊列和管道******** ****進程間通訊-隊列和管道 IPC(Inter-Process Communication) ****隊列 **概念介紹 建立貢獻的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。 Queue([maxsize]) 建立共享的進程隊列。 參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。 底層隊列使用管道和鎖定實現。 *********方法介紹: Queue([maxsize]) 建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使 道和鎖定實現。另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。 Queue的實例q具備如下方法: q.get( [ block [ ,timeout ] ] ) 返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲 默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選 超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。 q.get_nowait( ) 同q.get(False)方法。 q.put(item [, block [,timeout ] ] ) 將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。 若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等 待可用空間的時間長短。超時後將引起Queue.Full異常。 q.qsize() 返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間, 隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。 q.empty() 若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。 也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。 q.full() 若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。 **********其餘方法: q.close() 關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數 據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者 中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者 中的隊列不會致使get()方法返回錯誤。 q.cancel_join_thread() 不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。 q.join_thread() 鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方 由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread()方法能夠禁止這種行爲。 例子: **單看隊列用法 ''' multiprocessing模塊支持進程間通訊的兩種主要形式:管道和隊列 都是基於消息傳遞實現的 ''' from multiprocessing import Queue q = Queue(3) # put, get, put_nowait, get_nowait, full, empty q.put(1) q.put(2) q.put(3) # q.put(4) #若是隊列已經滿了,程序就會停在這裏,等待數據被別人取走,再將數據放入隊列 # 若是隊列中的數據一致不被取走,程序就會永遠停在這裏 try: q.put_nowait(4) # 可使用put_nowait(),若是隊列滿了就不會阻塞,可是會由於隊列滿了而報錯 except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下曲,可是會丟掉這個消息 print('隊列已滿') # 所以,咱們再放入數據以前,能夠看一下隊列的狀態,若是已經滿了,就不繼續put了 print(q.full()) # 滿了 print(q.get()) print(q.get()) print(q.get()) # print(q.get()) #同put方法同樣,若是隊列已經空了,那麼繼續取就會出現阻塞 try: q.get_nowait() # 可使用get_nowait(),若是隊列滿了不會阻塞,可是會由於沒取到值而報錯 except: # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序就不會一致阻塞下去 print('空了') print(q.empty()) # 空了 **Queue實現進程間的通訊 from multiprocessing import Queue, Process def func(q): q.put('hello') # 調用主函數中p進程傳遞過來的進程參數 put函數爲向隊列中添加一條數據。 if __name__ == '__main__': q = Queue() # 建立一個Queue對象 p = Process(target=func, args=(q,)) # 建立一個進程 p.start() print(q.get()) p.join() **批量生產數據放入對列再批量獲取 import os from multiprocessing import Queue, Process def inputQ(queue): info = str(os.getpid()) + '(put):' queue.put(info) def outputQ(queue): info = queue.get() print('%s%s\033[32m%s\033[0m' % (str(os.getpid()), '(get):', info)) if __name__ == '__main__': record1 = [] record2 = [] queue = Queue(3) # 輸入進程 for i in range(10): p = Process(target=inputQ, args=(queue,)) p.start() record1.append(p) # 輸出進程 for i in range(10): p = Process(target=outputQ, args=(queue,)) p.start() record2.append(p) [p.join() for p in record1] [p.join() for p in record2] ********管道******** ****介紹 #建立管道的類: Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示 道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道 #參數介紹: dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。 #主要方法: conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。 若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。 conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象 #其餘方法: conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法 conn1.fileno():返回鏈接使用的整數文件描述符 conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略 此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。 conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength 指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進 行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。 conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝 區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發 出,而後調用c.recv_bytes()函數進行接收 conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中, 該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節 位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。 ****Pipe初使用 from multiprocessing import Process, Pipe def f(conn): conn.send("Hello The_Third_Wave") conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) p.join() 注意: 應該特別注意管道端點的正確管理問題。若是是生產者或消費者中都沒有使用管道的某個端點,就應將 它關閉。這也說明了爲什麼在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。若是忘記執 行這些步驟,程序可能在消費者中的recv()操做上掛起。管道是由操做系統進行引用計數的,必須在 全部進程中關閉管道後才能生成EOFError異常。所以,在生產者中關閉管道不會有任何效果,除非消費 也關閉了相同的管道端點。 **引起EOFError from multiprocessing import Pipe, Process def func(con): con1, con2 = con con1.close() # 子進程使用con2和父進程通訊,因此 while 1: try: print(con2.recv()) # 當主進程的con1發數據時,子進程要死循環的去接收。 except EOFError: # 若是主進程的con1發完數據並關閉con1,子進程的con2繼續接收時,就會報錯,使用try的方式,獲取錯誤 con2.close() # 獲取到錯誤,就是指子進程已經把管道中全部數據都接收完了,因此用這種方式去關閉管道 break if __name__ == '__main__': con1, con2 = Pipe() p = Process(target=func, args=((con1, con2),)) p.start() con2.close() # 在父進程中,使用con1去和子進程通訊,因此不須要con2,就提早關閉 for i in range(10): # 生產數據 con1.send(i) # 給子進程的con2發送數據 con1.close() # 生產完數據,關閉父進程這一端的管道 **多個消費者之間的競爭問題帶來的數據不安全問題 from multiprocessing import Process, Pipe, Lock def consumer(p, name, lock): produce, consume = p produce.close() while True: lock.acquire() baozi = consume.recv() lock.release() if baozi: print('%s 收到包子:%s' % (name, baozi)) else: consume.close() break def producer(p, n): produce, consume = p consume.close() for i in range(n): produce.send(i) produce.send(None) produce.send(None) produce.close() if __name__ == '__main__': produce, consume = Pipe() lock = Lock() c1 = Process(target=consumer, args=((produce, consume), 'c1', lock)) c2 = Process(target=consumer, args=((produce, consume), 'c2', lock)) p1 = Process(target=producer, args=((produce, consume), 10)) c1.start() c2.start() p1.start() produce.close() consume.close() c1.join() c2.join() p1.join() print('主進程')