queue
模塊提供適用於多線程編程的先進先出(FIFO)數據結構。由於它是線程安全的,因此多個線程很輕鬆地使用同一個實例。html
先從初始化的函數來看:編程
class Queue: def __init__(self, maxsize=0): # 設置隊列的最大容量 self.maxsize = maxsize self._init(maxsize) # 線程鎖,互斥變量 self.mutex = threading.Lock() # 由鎖衍生出三個條件變量 self.not_empty = threading.Condition(self.mutex) self.not_full = threading.Condition(self.mutex) self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0 def _init(self, maxsize): # 初始化底層數據結構 self.queue = deque()
從這初始化函數能獲得哪些信息呢?首先,隊列是能夠設置其容量大小的,而且具體的底層存放元素的它使用了 collections.deque()
雙端列表的數據結構,這使得能很方便的作先進先出操做。這裏還特意抽象爲 _init
函數是爲了方便其子類進行覆蓋,容許子類使用其餘結構來存放元素(好比優先隊列使用了 list
)。安全
而後就是線程鎖 self.mutex
,對於底層數據結構 self.queue
的操做都要先得到這把鎖;再往下是三個條件變量,這三個 Condition
都以 self.mutex
做爲參數,也就是說它們共用一把鎖;從這能夠知道諸如 with self.mutex
與 with self.not_empty
等都是互斥的。數據結構
基於這些鎖而作的一些簡單的操做:多線程
class Queue: ... def qsize(self): # 返回隊列中的元素數 with self.mutex: return self._qsize() def empty(self): # 隊列是否爲空 with self.mutex: return not self._qsize() def full(self): # 隊列是否已滿 with self.mutex: return 0 < self.maxsize <= self._qsize() def _qsize(self): return len(self.queue)
這個代碼片斷挺好理解的,無需分析。app
做爲隊列,主要得完成入隊與出隊的操做,首先是入隊:函數
class Queue: ... def put(self, item, block=True, timeout=None): with self.not_full: # 獲取條件變量not_full if self.maxsize > 0: if not block: if self._qsize() >= self.maxsize: raise Full # 若是 block 是 False,而且隊列已滿,那麼拋出 Full 異常 elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() # 阻塞直到由剩餘空間 elif timeout < 0: # 不合格的參數值,拋出ValueError raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout # 計算等待的結束時間 while self._qsize() >= self.maxsize: remaining = endtime - time() if remaining <= 0.0: raise Full # 等待期間一直沒空間,拋出 Full 異常 self.not_full.wait(remaining) self._put(item) # 往底層數據結構中加入一個元素 self.unfinished_tasks += 1 self.not_empty.notify() def _put(self, item): self.queue.append(item)
儘管只有二十幾行的代碼,但這裏的邏輯仍是比較複雜的。它要處理超時與隊列剩餘空間不足的狀況,具體幾種狀況以下:源碼分析
若是 block
是 False,忽略timeout參數線程
若是 block
是 True設計
timeout
是 None
時,那麼put操做可能會阻塞,直到隊列中有空閒的空間(默認);timeout
是非負數,則會阻塞相應時間直到隊列中有剩餘空間,在這個期間,若是隊列中一直沒有空間,拋出 Full
異常;處理好參數邏輯後,,將元素保存到底層數據結構中,並遞增unfinished_tasks
,同時通知 not_empty
,喚醒在其中等待數據的線程。
出隊操做:
class Queue: ... def get(self, block=True, timeout=None): with self.not_empty: if not block: if not self._qsize(): raise Empty elif timeout is None: while not self._qsize(): self.not_empty.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = time() + timeout while not self._qsize(): remaining = endtime - time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) item = self._get() self.not_full.notify() return item def _get(self): return self.queue.popleft()
get()
操做是 put()
相反的操做,代碼塊也及其類似,get()
是從隊列中移除最早插入的元素並將其返回。
若是 block
是 False,忽略timeout參數
若是 block
是 True
timeout
是 None
時,那麼get操做可能會阻塞,直到隊列中有元素(默認);timeout
是非負數,則會阻塞相應時間直到隊列中有元素,在這個期間,若是隊列中一直沒有元素,則拋出 Empty
異常;最後,經過 self.queue.popleft()
將最先放入隊列的元素移除,並通知 not_full
,喚醒在其中等待數據的線程。
這裏有個值得注意的地方,在 put()
操做中遞增了 self.unfinished_tasks
,而 get()
中卻沒有遞減,這是爲何?
這實際上是爲了留給用戶一個消費元素的時間,get()
僅僅是獲取元素,並不表明消費者線程處理的該元素,用戶須要調用 task_done()
來通知隊列該任務處理完成了:
class Queue: ... def task_done(self): with self.all_tasks_done: unfinished = self.unfinished_tasks - 1 if unfinished <= 0: if unfinished < 0: # 也就是成功調用put()的次數小於調用task_done()的次數時,會拋出異常 raise ValueError('task_done() called too many times') self.all_tasks_done.notify_all() # 當unfinished爲0時,會通知all_tasks_done self.unfinished_tasks = unfinished def join(self): with self.all_tasks_done: while self.unfinished_tasks: # 若是有未完成的任務,將調用wait()方法等待 self.all_tasks_done.wait()
因爲 task_done()
使用方調用的,當 task_done()
次數大於 put()
次數時會拋出異常。
task_done()
操做的做用是喚醒正在阻塞的 join()
操做。join()
方法會一直阻塞,直到隊列中全部的元素都被取出,並被處理了(和線程的join方法相似)。也就是說 join()
方法必須配合 task_done()
來使用才行。
LifoQueue使用後進先出順序,與棧結構類似:
class LifoQueue(Queue): '''Variant of Queue that retrieves most recently added entries first.''' def _init(self, maxsize): self.queue = [] def _qsize(self): return len(self.queue) def _put(self, item): self.queue.append(item) def _get(self): return self.queue.pop()
這就是 LifoQueue
所有代碼了,這正是 Queue
設計很棒的一個緣由,它將底層的數據操做抽象成四個操做函數,自己來處理線程安全的問題,使得其子類只需關注底層的操做。
LifoQueue 底層數據結構改用 list
來存放,經過 self.queue.pop()
就能將 list 中最後一個元素移除,無需重置索引。
from heapq import heappush, heappop class PriorityQueue(Queue): '''Variant of Queue that retrieves open entries in priority order (lowest first). Entries are typically tuples of the form: (priority number, data). ''' def _init(self, maxsize): self.queue = [] def _qsize(self): return len(self.queue) def _put(self, item): heappush(self.queue, item) def _get(self): return heappop(self.queue)
優先隊列使用了 heapq
模塊的結構,也就是最小堆的結構。優先隊列更爲經常使用,隊列中項目的處理順序須要基於這些項目的特徵,一個簡單的例子:
import queue class A: def __init__(self, priority, value): self.priority = priority self.value = value def __lt__(self, other): return self.priority < other.priority q = queue.PriorityQueue() q.put(A(1, 'a')) q.put(A(0, 'b')) q.put(A(1, 'c')) print(q.get().value) # 'b'
使用優先隊列的時候,須要定義 __lt__
魔術方法,來定義它們之間如何比較大小。若元素的 priority
相同,依然使用先進先出的順序。