消息隊列」是在消息的傳輸過程當中保存消息的容器。
消息隊列最經典的用法就是消費者和生成者之間經過消息管道來傳遞消息,消費者和生成者是不通的進程。生產者往管道中寫消息,消費者從管道中讀消息。
操做系統提供了不少機制來實現進程間的通訊 ,multiprocessing模塊就提供了Queue和Pipe兩種方法來實現。多線程
使用multiprocessing裏面的Queue來實現消息隊列ide
from multiprocessing import Queue
q = Queue
q.put(data)
data = q.get(data)函數
經過Mutiprocess裏面的Pipe來實現消息隊列:
1, Pipe方法返回(conn1, conn2)表明一個管道的兩個端。Pipe方法有duplex參數,若是duplex參數爲True(默認值),那麼這個管道是全雙工模式,也就是說conn1和conn2都可收發。duplex爲False,conn1只負責接受消息,conn2只負責發送消息。
2, send和recv方法分別是發送和接受消息的方法。close方法表示關閉管道,當消息接受結束之後,關閉管道。spa
Python提供了Queue模塊來專門實現消息隊列
Queue對象 操作系統
Queue對象實現一個fifo隊列(其餘的還有lifo、priority隊列,這裏再也不介紹)。queue只有maxsize一個構造參數,用來指定隊列容量,指定爲0的時候表明容量無限。主要有如下成員函數:
Queue.qsize():返回消息隊列的當前空間。返回的值不必定可靠。
Queue.empty():判斷消息隊列是否爲空,返回True或False。一樣不可靠。
Queue.full():相似上邊,判斷消息隊列是否滿
Queue.put(item, block=True, timeout=None):往消息隊列中存放消息。block能夠控制是否阻塞,timeout指定阻塞時候的等待時間。若是不阻塞或者超時,會引發一個full exception。
Queue.put_nowait(item):至關於put(item, False).
Queue.get(block=True, timeout=None):獲取一個消息,其餘同put。線程
'''寫一個消費者和生產者,爲了練習多線程,咱們用多線程的方式來實現,並經過類的重寫的方法來實現''' from Queue import Queue from threading import Thread import time class Proceduer(Thread): def __init__(self, queue): super(Proceduer, self).__init__() self.queue = queue def run(self): try: for i in xrange(1, 10): print("put data is: {0} to queue".format(i)) self.queue.put(i) except Exception as e: print("put data error!") raise e class Consumer_odd(Thread): def __init__(self, queue): super(Consumer_odd, self).__init__() self.queue = queue def run(self): try: while not self.queue.empty(): number = self.queue.get(block=True, timeout=3) if number%2 != 0: print("get {0} from queue ODD".format(number)) else: self.queue.put(number) time.sleep(1) except Exception as e: raise e class Consumer_even(Thread): def __init__(self, queue): super(Consumer_even, self).__init__() self.queue = queue def run(self): # print(self.queue.empty) while not self.queue.empty(): try: number = self.queue.get(block=True, timeout=3) if number%2 == 0: print("get {0} from queue Even, thread name is: {1}".format(number, self.getName())) else: self.queue.put(number) time.sleep(1) except Exception as e: raise e def main(): queue = Queue() p = Proceduer(queue=queue) p.start() p.join() time.sleep(1) c1 = Consumer_odd(queue=queue) c2 = Consumer_even(queue=queue) c1.start() c2.start() c1.join() c2.join() print("All threads terminate!") if __name__ == '__main__': main()