消息隊列:multiprocessing.Queue,Queue是對進程安全的隊列,可使用Queue實現對進程之間的數據傳輸;還有一個重要做用是做爲緩存使用。html
Queue(maxsize = 0) method of multiprocessing, returns a queue obiectpython
Queue(maxzize = 0)建立一個隊列對象,maxsize 表示隊列中最多存放消息的數量。編程
返回一個隊列對象緩存
put(obj [, block=True[, timeout]])
調用隊列對象的put()方法將obj插入到隊列中,安全
第一個obj爲必需參數,爲插入項目的值;存入消息的種類不限制。多線程
第二個block爲可選參數,默認爲True,app
當block爲True,timeout爲空時,q.put([1,2,3])、q.put([1,2,3],True) 表示將序列插入到隊尾,阻塞調用,若是q隊列滿時,一直等待(無超時限制的阻塞調用)。dom
當block爲True,timeout爲正整數時,q.put([1,],True,2) 表示阻塞調用進程最多timeout秒,若是超過該時間仍無空間可用,則拋出Queue.Full異常(帶超時的阻塞調用)。函數
當block爲False,q.put([1,], False) 表示調用進程時若是有空閒空間則將數據放入隊列中,不然當即拋出Queue.Full異常。post
簡而言之,timeout表示超時等待時間,當隊列滿時,再存入消息就會發生阻塞(True條件下有效),阻塞時間超過timeout等待時間則拋出異常。
get([block=True[, timeout]])
get方法能夠將隊列中讀取並刪除一個元素。
實際上,get()方法的使用與put()函數相似
第一個block可選參數,默認爲True。
當block爲True,timeout爲空時,阻塞等待取值,直到取到爲止。
當block爲True,timeout爲正整數時,在timeout時間內沒有取到任何元素,則會拋出Queue.Empty異常;
當block爲False時,若是能夠取到至時,則會馬上返回該值,若是沒有取到元素則會當即拋出Queue.Empty異常。
q.full() 判斷隊列是否爲滿;若滿,返回True,若不滿,返回False
q.empty() 判斷隊列是否爲空,若爲空,則返回True
q.qsize() 獲取隊列中消息數量
示例1
from multiprocessing import Queue #建立隊列對象,隊列中最大消息數量爲3 q = Queue(maxsize = 3) #存入消息 q.put(1) q.put('hello')#以字符串存入隊列中 q.put([1,2,3,4]) q.put('hello world',True,5)
運行
Traceback (most recent call last): File "a.py", line 10, in <module> q.put('hello world',True,5) File "/usr/lib/python3.5/multiprocessing/queues.py", line 83, in put raise Full queue.Full
因最後存入時已經超過隊列的最大數量,阻塞5秒後報錯
示例2
from multiprocessing import Queue #建立隊列對象,隊列中最大消息數量爲3 q = Queue(maxsize = 3) #存入消息 q.put(1) q.put('hello')#以字符串存入隊列中 q.put([1,2,3,4]) # 判斷隊列是否爲滿 print(q.full())# True # 隊列中消息數量 print(q.qsize())# 3 # 判斷隊列是否爲空 print(q.empty()) # False # 取出消息 print(q.get())# 1 print(q.qsize())# 2 print(q.get()) # hello print(q.get()) # [1,2,3,4] #判斷隊列是否爲空 print(q.empty()) #True print(q.get(True,3)) #報錯queue.Empty
運行
True 3 False 1 2 hello [1, 2, 3, 4] True Traceback (most recent call last): File "a.py", line 25, in <module> print(q.get(True,3)) #報錯 File "/usr/lib/python3.5/multiprocessing/queues.py", line 105, in get raise Empty queue.Empty
多個進程間的通訊 示例1
import multiprocessing def writer_proc(q): try: q.put(1, block = False) except: pass def reader_proc(q): try: print(q.get(block = False)) except: pass if __name__ == "__main__": q = multiprocessing.Queue() writer = multiprocessing.Process(target=writer_proc, args=(q,)) writer.start() reader = multiprocessing.Process(target=reader_proc, args=(q,)) reader.start() reader.join() writer.join()
運行結果爲 1 或 在終端上沒有任何顯示
實際上,這時因爲多線程代碼各自運行所致,若是取出先運行,同時又有報錯處理設置,因此不會報錯;而是沒有任何輸出。
爲了達到目的,能夠在取出函數中增長sleep()函數。
import multiprocessing import time def writer_proc(q): try: q.put(1, block = False) except: pass def reader_proc(q): time.sleep(1) try: print(q.get(block = False)) except: pass if __name__ == "__main__": q = multiprocessing.Queue() writer = multiprocessing.Process(target=writer_proc, args=(q,)) writer.start() reader = multiprocessing.Process(target=reader_proc, args=(q,)) reader.start() reader.join() writer.join()
通過改良後,每一次運行代碼都可達到輸入輸出的目的。
多個子進程間的通訊示例2
多個子進程間的通訊就要採用第一步中的隊列Queue,好比,有如下需求,一個子進程向隊列中寫數據,另外一個進程從隊列中取數據,
# _*_ encoding:utf-8 _*_ from multiprocessing import Process,Queue,Pool,Pipe import os,time,random #寫數據進程執行的代碼: def write(p): for value in ['A','B','C']: print ('Write---Before Put value---Put %s to queue...' % value) p.put(value) print ('Write---After Put value') time.sleep(random.random()) print ('Write---After sleep') #讀數據進程執行的代碼: def read(p): while True: print ('Read---Before get value') value = p.get(True) print ('Read---After get value---Get %s from queue.' % value) if __name__ == '__main__': #父進程建立Queue,並傳給各個子進程: p = Queue() pw = Process(target=write,args=(p,)) pr = Process(target=read,args=(p,)) #啓動子進程pw,寫入: pw.start() #啓動子進程pr,讀取: pr.start() #等待pw結束: pw.join() #pr進程裏是死循環,沒法等待其結束,只能強行終止: pr.terminate()
運行
Write---Before Put value---Put A to queue... Read---Before get value Write---After Put value Read---After get value---Get A from queue. Read---Before get value Write---After sleep Write---Before Put value---Put B to queue... Write---After Put value Read---After get value---Get B from queue. Read---Before get value Write---After sleep Write---Before Put value---Put C to queue... Write---After Put value Read---After get value---Get C from queue. Read---Before get value Write---After sleep
關於鎖的應用,在不一樣程序間若是有同時對同一個隊列操做的時候,爲了不錯誤,能夠在某個函數操做隊列的時候給它加把鎖,這樣在同一個時間內則只能有一個子進程對隊列進行操做,鎖也要在manager對象中的鎖
示例3
from multiprocessing import Process,Queue import time process_list = [] q = Queue() def fun(name): time.sleep(1) q.put('hello' + str(name)) for i in range(10): p = Process(target = fun,args = (i,)) p.start() process_list.append(p) for i in process_list: i.join() while not q.empty(): print(q.get())
運行
hello0
hello1
hello3
hello2
hello4
hello6
hello5
hello9
hello8
hello7
參考: