python使用Queue進行進程間通訊

1.Process之間有時須要通訊,操做系統提供了不少機制來實現進程間的通訊.python

可使用multiprocessing模塊的Queue實現多進程之間的數據傳遞,app

Queue自己是一個消息列隊程序:dom

from multiprocessing import Queue
q=Queue(3)  # 初始化一個Queue對象,最多可接收三條put消息
q.put("消息1")
q.put("消息2")
print(q.full())  # False
q.put("消息3")
print(q.full())  # True
# 由於消息列隊已滿下面的try都會拋出異常,
# 第一個try會等待2秒後再拋出異常,第二個Try會馬上拋出異
try:
    q.put("消息4",True,2)
except:
    print("消息列隊已滿,現有消息數量:%s"%q.qsize())

try:
    q.put_nowait("消息4")
except:
    print("消息列隊已滿,現有消息數量:%s"%q.qsize())
# 推薦的方式,先判斷消息列隊是否已滿,再寫入
if not q.full():
    q.put_nowait("消息4")
# 讀取消息時,先判斷消息列隊是否爲空,再讀取
if not q.empty():
    for i in range(q.qsize()):
        print(q.get_nowait())

2.初始化Queue()對象時(例如:q=Queue()),若括號中沒有指定最大可接收的消息數量,spa

那麼就表明可接受的消息數量沒有上限(直到內存的盡頭);操作系統

Queue.qsize():返回當前隊列包含的消息數量;.net

Queue.empty():若是隊列爲空,返回True,反之False;code

Queue.full():若是隊列滿了,返回True,反之False;對象

Queue.get([block[, timeout]]):獲取隊列中的一條消息,而後將其從列隊中移除,block默認值爲True;blog

a.若是block使用默認值,且沒有設置timeout(單位秒),消息列隊若是爲空,
此時程序將被阻塞,停在讀取狀態,直到從消息列隊讀到消息爲止,若是設置了timeout,
則會等待timeout秒,若還沒讀取到任何消息,則拋出」Queue.Empty」異常;
b.若是block值爲False,消息列隊若是爲空,則會馬上拋出」Queue.Empty」異常;
Queue.get_nowait()--至關Queue.get(False);
若是block使用默認值,且沒有設置timeout(單位秒),消息列隊若是已經沒有空間可寫入,
此時程序將被阻塞(停在寫入狀態),直到從消息列隊騰出空間爲止,
若是設置了timeout,則會等待timeout秒,若還沒空間,則拋出」Queue.Full」異常;
若是block值爲False,消息列隊若是沒有空間可寫入,則會馬上拋出」Queue.Full」異常;
Queue.put_nowait(item)--至關Queue.put(item, False);

3.在父進程中建立兩個子進程,一個往Queue裏寫數據,一個從Queue裏讀數據:隊列

from multiprocessing import Process, Queue
import os, time, random
# 寫數據進程執行的代碼:
def write(q):
    for value in ['A', 'B', 'C']:
	print('Put %s to queue...' % value)
	q.put(value)
	time.sleep(random.random())
# 讀數據進程執行的代碼:
def read(q):
    while True:
	    if not q.empty():
		    value = q.get(True)
			print('Get %s from queue.' % value)
			time.sleep(random.random())
	    else:
		    break

if __name__=='__main__':
    # 父進程建立Queue,並傳給各個子進程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
	# 啓動子進程pw,寫入
    pw.start()    
    # 等待pw結束:
    pw.join()
    # 啓動子進程pr,讀取:
    pr.start()
    pr.join()
    # pr進程裏是死循環,沒法等待其結束,只能強行終止:
    print('')
    print('全部數據都寫入而且讀完')

4.進程池中的Queue

若是要使用Pool建立進程,就須要使用multiprocessing.Manager()中的Queue(),
而不是multiprocessing.Queue(),不然會獲得一條以下的錯誤信息:
RuntimeError: Queue objects should only be shared between processes through inheritance.

5.下面的實例演示了進程池中的進程如何通訊:

# 修改import中的Queue爲Manager
from multiprocessing import Manager,Pool
import os,time,random

def reader(q):
    print("reader啓動(%s),父進程爲(%s)"%(os.getpid(),os.getppid()))
    for i in range(q.qsize()):
        print("reader從Queue獲取到消息:%s"%q.get(True))

def writer(q):
    print("writer啓動(%s),父進程爲(%s)"%(os.getpid(),os.getppid()))
    for i in "dongGe":
        q.put(i)

if __name__=="__main__":
    print("(%s) start"%os.getpid())
    q=Manager().Queue() #使用Manager中的Queue來初始化
    po=Pool()
    #使用阻塞模式建立進程,這樣就不須要在reader中使用死循環了,可讓writer徹底執行完成後,再用reader去讀取
    po.apply(writer,(q,))
    po.apply(reader,(q,))
    po.close()
    po.join()
    print("(%s) End"%os.getpid())

參考博客:https://blog.csdn.net/Duke10/article/details/79867656

相關文章
相關標籤/搜索