puthon-進程間通訊-隊列和管道

********進程間通訊-隊列和管道********

****進程間通訊-隊列和管道

IPC(Inter-Process Communication)

****隊列

**概念介紹

建立貢獻的進程隊列,Queue是多進程安全的隊列,可使用Queue實現多進程之間的數據傳遞。

Queue([maxsize]) 建立共享的進程隊列。
參數 :maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。
底層隊列使用管道和鎖定實現。

*********方法介紹:

Queue([maxsize]) 
建立共享的進程隊列。maxsize是隊列中容許的最大項數。若是省略此參數,則無大小限制。底層隊列使
道和鎖定實現。另外,還須要運行支持線程以便隊列中的數據傳輸到底層管道中。 
Queue的實例q具備如下方法:

q.get( [ block [ ,timeout ] ] ) 
返回q中的一個項目。若是q爲空,此方法將阻塞,直到隊列中有項目可用爲止。block用於控制阻塞行爲
默認爲True. 若是設置爲False,將引起Queue.Empty異常(定義在Queue模塊中)。timeout是可選
超時時間,用在阻塞模式中。若是在制定的時間間隔內沒有項目變爲可用,將引起Queue.Empty異常。

q.get_nowait( ) 
同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) 
將item放入隊列。若是隊列已滿,此方法將阻塞至有空間可用爲止。block控制阻塞行爲,默認爲True。
若是設置爲False,將引起Queue.Empty異常(定義在Queue庫模塊中)。timeout指定在阻塞模式中等
待可用空間的時間長短。超時後將引起Queue.Full異常。

q.qsize() 
返回隊列中目前項目的正確數量。此函數的結果並不可靠,由於在返回結果和在稍後程序中使用結果之間,
隊列中可能添加或刪除了項目。在某些系統上,此方法可能引起NotImplementedError異常。


q.empty() 
若是調用此方法時 q爲空,返回True。若是其餘進程或線程正在往隊列中添加項目,結果是不可靠的。
也就是說,在返回和使用結果之間,隊列中可能已經加入新的項目。

q.full() 
若是q已滿,返回爲True. 因爲線程的存在,結果也多是不可靠的(參考q.empty()方法)。。

**********其餘方法:

q.close() 
關閉隊列,防止隊列中加入更多數據。調用此方法時,後臺線程將繼續寫入那些已入隊列但還沒有寫入的數
據,但將在此方法完成時立刻關閉。若是q被垃圾收集,將自動調用此方法。關閉隊列不會在隊列使用者
中生成任何類型的數據結束信號或異常。例如,若是某個使用者正被阻塞在get()操做上,關閉生產者
中的隊列不會致使get()方法返回錯誤。

q.cancel_join_thread() 
不會再進程退出時自動鏈接後臺線程。這能夠防止join_thread()方法阻塞。

q.join_thread() 
鏈接隊列的後臺線程。此方法用於在調用q.close()方法後,等待全部隊列項被消耗。默認狀況下,此方
由不是q的原始建立者的全部進程調用。調用q.cancel_join_thread()方法能夠禁止這種行爲。




例子:

**單看隊列用法

'''

multiprocessing模塊支持進程間通訊的兩種主要形式:管道和隊列
都是基於消息傳遞實現的

'''

from multiprocessing import Queue

q = Queue(3)

# put, get, put_nowait, get_nowait, full, empty

q.put(1)
q.put(2)
q.put(3)
# q.put(4) #若是隊列已經滿了,程序就會停在這裏,等待數據被別人取走,再將數據放入隊列
# 若是隊列中的數據一致不被取走,程序就會永遠停在這裏

try:
    q.put_nowait(4)  # 可使用put_nowait(),若是隊列滿了就不會阻塞,可是會由於隊列滿了而報錯

except:  # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序不會一直阻塞下曲,可是會丟掉這個消息
    print('隊列已滿')

# 所以,咱們再放入數據以前,能夠看一下隊列的狀態,若是已經滿了,就不繼續put了

print(q.full())  # 滿了

print(q.get())
print(q.get())
print(q.get())

# print(q.get())  #同put方法同樣,若是隊列已經空了,那麼繼續取就會出現阻塞

try:
    q.get_nowait()  # 可使用get_nowait(),若是隊列滿了不會阻塞,可是會由於沒取到值而報錯
except:  # 所以咱們能夠用一個try語句來處理這個錯誤。這樣程序就不會一致阻塞下去
    print('空了')

print(q.empty())  # 空了

**Queue實現進程間的通訊

from multiprocessing import Queue, Process


def func(q):
    q.put('hello')  # 調用主函數中p進程傳遞過來的進程參數 put函數爲向隊列中添加一條數據。


if __name__ == '__main__':
    q = Queue()  # 建立一個Queue對象
    p = Process(target=func, args=(q,))  # 建立一個進程
    p.start()
    print(q.get())
    p.join()

**批量生產數據放入對列再批量獲取

import os
from multiprocessing import Queue, Process


def inputQ(queue):
    info = str(os.getpid()) + '(put):'
    queue.put(info)


def outputQ(queue):
    info = queue.get()
    print('%s%s\033[32m%s\033[0m' % (str(os.getpid()), '(get):', info))


if __name__ == '__main__':
    record1 = []
    record2 = []

    queue = Queue(3)

    # 輸入進程
    for i in range(10):
        p = Process(target=inputQ, args=(queue,))
        p.start()
        record1.append(p)

    # 輸出進程
    for i in range(10):
        p = Process(target=outputQ, args=(queue,))
        p.start()
        record2.append(p)

    [p.join() for p in record1]
    [p.join() for p in record2]



********管道********

****介紹

#建立管道的類:
Pipe([duplex]):在進程之間建立一條管道,並返回元組(conn1,conn2),其中conn1,conn2表示
道兩端的鏈接對象,強調一點:必須在產生Process對象以前產生管道

#參數介紹:
dumplex:默認管道是全雙工的,若是將duplex射成False,conn1只能用於接收,conn2只能用於發送。

#主要方法:
conn1.recv():接收conn2.send(obj)發送的對象。若是沒有消息可接收,recv方法會一直阻塞。
若是鏈接的另一端已經關閉,那麼recv方法會拋出EOFError。
conn1.send(obj):經過鏈接發送對象。obj是與序列化兼容的任意對象

#其餘方法:
conn1.close():關閉鏈接。若是conn1被垃圾回收,將自動調用此方法
conn1.fileno():返回鏈接使用的整數文件描述符
conn1.poll([timeout]):若是鏈接上的數據可用,返回True。timeout指定等待的最長時限。若是省略
此參數,方法將當即返回結果。若是將timeout射成None,操做將無限期地等待數據到達。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法發送的一條完整的字節消息。maxlength
指定要接收的最大字節數。若是進入的消息,超過了這個最大值,將引起IOError異常,而且在鏈接上沒法進
行進一步讀取。若是鏈接的另一端已經關閉,不再存在任何數據,將引起EOFError異常。
conn.send_bytes(buffer [, offset [, size]]):經過鏈接發送字節數據緩衝區,buffer是支持緩衝
區接口的任意對象,offset是緩衝區中的字節偏移量,而size是要發送字節數。結果數據以單條消息的形式發
出,而後調用c.recv_bytes()函數進行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一條完整的字節消息,並把它保存在buffer對象中,
該對象支持可寫入的緩衝區接口(即bytearray對象或相似的對象)。offset指定緩衝區中放置消息處的字節
位移。返回值是收到的字節數。若是消息長度大於可用的緩衝區空間,將引起BufferTooShort異常。

****Pipe初使用

from multiprocessing import Process, Pipe


def f(conn):
    conn.send("Hello The_Third_Wave")
    conn.close()


if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())
    p.join()


注意:
應該特別注意管道端點的正確管理問題。若是是生產者或消費者中都沒有使用管道的某個端點,就應將
它關閉。這也說明了爲什麼在生產者中關閉了管道的輸出端,在消費者中關閉管道的輸入端。若是忘記執
行這些步驟,程序可能在消費者中的recv()操做上掛起。管道是由操做系統進行引用計數的,必須在
全部進程中關閉管道後才能生成EOFError異常。所以,在生產者中關閉管道不會有任何效果,除非消費
也關閉了相同的管道端點。


**引起EOFError

from multiprocessing import Pipe, Process


def func(con):
    con1, con2 = con
    con1.close()  # 子進程使用con2和父進程通訊,因此
    while 1:
        try:
            print(con2.recv())  # 當主進程的con1發數據時,子進程要死循環的去接收。
        except EOFError:  # 若是主進程的con1發完數據並關閉con1,子進程的con2繼續接收時,就會報錯,使用try的方式,獲取錯誤
            con2.close()  # 獲取到錯誤,就是指子進程已經把管道中全部數據都接收完了,因此用這種方式去關閉管道
            break


if __name__ == '__main__':
    con1, con2 = Pipe()
    p = Process(target=func, args=((con1, con2),))
    p.start()
    con2.close()  # 在父進程中,使用con1去和子進程通訊,因此不須要con2,就提早關閉
    for i in range(10):  # 生產數據
        con1.send(i)  # 給子進程的con2發送數據
    con1.close()  # 生產完數據,關閉父進程這一端的管道

**多個消費者之間的競爭問題帶來的數據不安全問題

from multiprocessing import Process, Pipe, Lock


def consumer(p, name, lock):
    produce, consume = p
    produce.close()
    while True:
        lock.acquire()
        baozi = consume.recv()
        lock.release()
        if baozi:
            print('%s 收到包子:%s' % (name, baozi))
        else:
            consume.close()
            break


def producer(p, n):
    produce, consume = p
    consume.close()
    for i in range(n):
        produce.send(i)
    produce.send(None)
    produce.send(None)
    produce.close()


if __name__ == '__main__':
    produce, consume = Pipe()
    lock = Lock()
    c1 = Process(target=consumer, args=((produce, consume), 'c1', lock))
    c2 = Process(target=consumer, args=((produce, consume), 'c2', lock))
    p1 = Process(target=producer, args=((produce, consume), 10))
    c1.start()
    c2.start()
    p1.start()

    produce.close()
    consume.close()

    c1.join()
    c2.join()
    p1.join()
    print('主進程')
相關文章
相關標籤/搜索