python學習筆記——multiprocessing 多進程組件-隊列Queue

1 消息隊列

1.1 基本語法

消息隊列:multiprocessing.Queue,Queue是對進程安全的隊列,可使用Queue實現對進程之間的數據傳輸;還有一個重要做用是做爲緩存使用。html

Queue(maxsize = 0) method of multiprocessing, returns a queue obiectpython

Queue(maxzize = 0)建立一個隊列對象,maxsize 表示隊列中最多存放消息的數量。編程

返回一個隊列對象緩存

 

1.1 隊列對象操做方法:

1.1.1 put方法

put(obj [, block=True[, timeout]])

調用隊列對象的put()方法將obj插入到隊列中,安全

第一個obj爲必需參數,爲插入項目的值;存入消息的種類不限制。多線程

第二個block爲可選參數,默認爲True,app

當block爲True,timeout爲空時,q.put([1,2,3])、q.put([1,2,3],True) 表示將序列插入到隊尾,阻塞調用,若是q隊列滿時,一直等待(無超時限制的阻塞調用)。dom

當block爲True,timeout爲正整數時,q.put([1,],True,2) 表示阻塞調用進程最多timeout秒,若是超過該時間仍無空間可用,則拋出Queue.Full異常(帶超時的阻塞調用)。函數

當block爲False,q.put([1,], False) 表示調用進程時若是有空閒空間則將數據放入隊列中,不然當即拋出Queue.Full異常。post

 

簡而言之,timeout表示超時等待時間,當隊列滿時,再存入消息就會發生阻塞(True條件下有效),阻塞時間超過timeout等待時間則拋出異常。 

1.1.2 get方法

get([block=True[, timeout]])

get方法能夠將隊列中讀取並刪除一個元素。

實際上,get()方法的使用與put()函數相似

第一個block可選參數,默認爲True。

當block爲True,timeout爲空時,阻塞等待取值,直到取到爲止。

當block爲True,timeout爲正整數時,在timeout時間內沒有取到任何元素,則會拋出Queue.Empty異常;

當block爲False時,若是能夠取到至時,則會馬上返回該值,若是沒有取到元素則會當即拋出Queue.Empty異常。

1.1.3 full()

 q.full() 判斷隊列是否爲滿;若滿,返回True,若不滿,返回False

1.1.4 empty()

 q.empty() 判斷隊列是否爲空,若爲空,則返回True

1.1.5 qsize()

 q.qsize() 獲取隊列中消息數量

2 基礎示例

示例1

from multiprocessing import Queue

#建立隊列對象,隊列中最大消息數量爲3
q = Queue(maxsize = 3) 

#存入消息
q.put(1)
q.put('hello')#以字符串存入隊列中
q.put([1,2,3,4])
q.put('hello world',True,5)

運行

Traceback (most recent call last):
  File "a.py", line 10, in <module>
    q.put('hello world',True,5)
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 83, in put
    raise Full
queue.Full

因最後存入時已經超過隊列的最大數量,阻塞5秒後報錯

 

示例2

from multiprocessing import Queue

#建立隊列對象,隊列中最大消息數量爲3
q = Queue(maxsize = 3) 

#存入消息
q.put(1)
q.put('hello')#以字符串存入隊列中
q.put([1,2,3,4])

# 判斷隊列是否爲滿
print(q.full())# True
# 隊列中消息數量
print(q.qsize())# 3
# 判斷隊列是否爲空
print(q.empty()) # False
# 取出消息
print(q.get())# 1

print(q.qsize())# 2
print(q.get()) # hello
print(q.get()) # [1,2,3,4]
#判斷隊列是否爲空
print(q.empty()) #True
print(q.get(True,3)) #報錯queue.Empty

運行

True
3
False
1
2
hello
[1, 2, 3, 4]
True
Traceback (most recent call last):
  File "a.py", line 25, in <module>
    print(q.get(True,3)) #報錯
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 105, in get
    raise Empty
queue.Empty

3 綜合應用

多個進程間的通訊 示例1

import multiprocessing

def writer_proc(q):      
    try:         
        q.put(1, block = False) 
    except:         
        pass   

def reader_proc(q):
    try:
        print(q.get(block = False))
    except:         
        pass

if __name__ == "__main__":
    q = multiprocessing.Queue()
    writer = multiprocessing.Process(target=writer_proc, args=(q,))  
    writer.start()   

    reader = multiprocessing.Process(target=reader_proc, args=(q,))  
    reader.start()  

    reader.join()  
    writer.join()

運行結果爲 1 或 在終端上沒有任何顯示

實際上,這時因爲多線程代碼各自運行所致,若是取出先運行,同時又有報錯處理設置,因此不會報錯;而是沒有任何輸出。

爲了達到目的,能夠在取出函數中增長sleep()函數。

import multiprocessing
import time

def writer_proc(q):      
    try:         
        q.put(1, block = False) 
    except:         
        pass   

def reader_proc(q):
    time.sleep(1)
    try:
        print(q.get(block = False))
    except:         
        pass

if __name__ == "__main__":
    q = multiprocessing.Queue()
    writer = multiprocessing.Process(target=writer_proc, args=(q,))  
    writer.start()   

    reader = multiprocessing.Process(target=reader_proc, args=(q,))  
    reader.start()  

    reader.join()  
    writer.join()

通過改良後,每一次運行代碼都可達到輸入輸出的目的。

 

多個子進程間的通訊示例2

多個子進程間的通訊就要採用第一步中的隊列Queue,好比,有如下需求,一個子進程向隊列中寫數據,另外一個進程從隊列中取數據,

# _*_ encoding:utf-8 _*_

from multiprocessing import Process,Queue,Pool,Pipe
import os,time,random

#寫數據進程執行的代碼:
def write(p):
    for value in ['A','B','C']:
        print ('Write---Before Put value---Put %s to queue...' % value)
        p.put(value)
        print ('Write---After Put value')
        time.sleep(random.random())
        print ('Write---After sleep')

#讀數據進程執行的代碼:
def read(p):
    while True:
        print ('Read---Before get value')
        value = p.get(True)
        print ('Read---After get value---Get %s from queue.' % value)

if __name__ == '__main__':
    #父進程建立Queue,並傳給各個子進程:
    p = Queue()
    pw = Process(target=write,args=(p,))
    pr = Process(target=read,args=(p,))
    #啓動子進程pw,寫入:
    pw.start()
    #啓動子進程pr,讀取:
    pr.start()
    #等待pw結束:
    pw.join()
    #pr進程裏是死循環,沒法等待其結束,只能強行終止:
    pr.terminate()

運行

Write---Before Put value---Put A to queue...
Read---Before get value
Write---After Put value
Read---After get value---Get A from queue.
Read---Before get value
Write---After sleep
Write---Before Put value---Put B to queue...
Write---After Put value
Read---After get value---Get B from queue.
Read---Before get value
Write---After sleep
Write---Before Put value---Put C to queue...
Write---After Put value
Read---After get value---Get C from queue.
Read---Before get value
Write---After sleep

關於鎖的應用,在不一樣程序間若是有同時對同一個隊列操做的時候,爲了不錯誤,能夠在某個函數操做隊列的時候給它加把鎖,這樣在同一個時間內則只能有一個子進程對隊列進行操做,鎖也要在manager對象中的鎖

示例3

from multiprocessing import  Process,Queue
import time

process_list = []
q = Queue()

def fun(name):
    time.sleep(1)
    q.put('hello' + str(name))

for i in range(10):
    p = Process(target = fun,args = (i,))
    p.start()
    process_list.append(p)

for i in process_list:
    i.join()

while not q.empty():
    print(q.get())

運行

hello0
hello1
hello3
hello2
hello4
hello6
hello5
hello9
hello8
hello7

 

參考:

python隊列Queue

Python多進程編程

Python的multiprocessing,Queue,Process

相關文章
相關標籤/搜索