Python 模塊源碼分析:queue 隊列

起步

queue 模塊提供適用於多線程編程的先進先出(FIFO)數據結構。由於它是線程安全的,因此多個線程很輕鬆地使用同一個實例。html

源碼分析

先從初始化的函數來看:編程

class Queue:
    def __init__(self, maxsize=0):
        # 設置隊列的最大容量
        self.maxsize = maxsize
        self._init(maxsize)

        # 線程鎖,互斥變量
        self.mutex = threading.Lock()
        # 由鎖衍生出三個條件變量
        self.not_empty = threading.Condition(self.mutex)
        self.not_full = threading.Condition(self.mutex)
        self.all_tasks_done = threading.Condition(self.mutex)

        self.unfinished_tasks = 0

    def _init(self, maxsize):
        # 初始化底層數據結構
        self.queue = deque()

從這初始化函數能獲得哪些信息呢?首先,隊列是能夠設置其容量大小的,而且具體的底層存放元素的它使用了 collections.deque() 雙端列表的數據結構,這使得能很方便的作先進先出操做。這裏還特意抽象爲 _init 函數是爲了方便其子類進行覆蓋,容許子類使用其餘結構來存放元素(好比優先隊列使用了 list)。安全

而後就是線程鎖 self.mutex ,對於底層數據結構 self.queue 的操做都要先得到這把鎖;再往下是三個條件變量,這三個 Condition 都以 self.mutex 做爲參數,也就是說它們共用一把鎖;從這能夠知道諸如 with self.mutexwith self.not_empty 等都是互斥的。數據結構

基於這些鎖而作的一些簡單的操做:多線程

class Queue:
    ...
    def qsize(self):
        # 返回隊列中的元素數
        with self.mutex:
            return self._qsize()

    def empty(self):
        # 隊列是否爲空
        with self.mutex:
            return not self._qsize()

    def full(self):
        # 隊列是否已滿
        with self.mutex:
            return 0 < self.maxsize <= self._qsize()

    def _qsize(self):
        return len(self.queue)

這個代碼片斷挺好理解的,無需分析。app

做爲隊列,主要得完成入隊與出隊的操做,首先是入隊:函數

class Queue:
    ...
    def put(self, item, block=True, timeout=None):
        with self.not_full: # 獲取條件變量not_full
            if self.maxsize > 0:
                if not block:
                    if self._qsize() >= self.maxsize:
                        raise Full # 若是 block 是 False,而且隊列已滿,那麼拋出 Full 異常
                elif timeout is None:
                    while self._qsize() >= self.maxsize:
                        self.not_full.wait() # 阻塞直到由剩餘空間
                elif timeout < 0: # 不合格的參數值,拋出ValueError
                    raise ValueError("'timeout' must be a non-negative number")
                else:
                    endtime = time() + timeout  # 計算等待的結束時間
                    while self._qsize() >= self.maxsize:
                        remaining = endtime - time()
                        if remaining <= 0.0:
                            raise Full # 等待期間一直沒空間,拋出 Full 異常
                        self.not_full.wait(remaining)
            self._put(item) # 往底層數據結構中加入一個元素
            self.unfinished_tasks += 1
            self.not_empty.notify()

    def _put(self, item):
        self.queue.append(item)

儘管只有二十幾行的代碼,但這裏的邏輯仍是比較複雜的。它要處理超時與隊列剩餘空間不足的狀況,具體幾種狀況以下:源碼分析

  1. 若是 block 是 False,忽略timeout參數線程

    • 若此時隊列已滿,則拋出 Full 異常;
    • 若此時隊列未滿,則當即把元素保存到底層數據結構中;
  2. 若是 block 是 True設計

    • timeoutNone 時,那麼put操做可能會阻塞,直到隊列中有空閒的空間(默認);
    • timeout 是非負數,則會阻塞相應時間直到隊列中有剩餘空間,在這個期間,若是隊列中一直沒有空間,拋出 Full 異常;

處理好參數邏輯後,,將元素保存到底層數據結構中,並遞增unfinished_tasks,同時通知 not_empty ,喚醒在其中等待數據的線程。

出隊操做:

class Queue:
    ...
    def get(self, block=True, timeout=None):
        with self.not_empty:
            if not block:
                if not self._qsize():
                    raise Empty
            elif timeout is None:
                while not self._qsize():
                    self.not_empty.wait()
            elif timeout < 0:
                raise ValueError("'timeout' must be a non-negative number")
            else:
                endtime = time() + timeout
                while not self._qsize():
                    remaining = endtime - time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)
            item = self._get()
            self.not_full.notify()
            return item

    def _get(self):     
        return self.queue.popleft()

get() 操做是 put() 相反的操做,代碼塊也及其類似,get() 是從隊列中移除最早插入的元素並將其返回。

  1. 若是 block 是 False,忽略timeout參數

    • 若此時隊列沒有元素,則拋出 Empty 異常;
    • 若此時隊列由元素,則當即把元素保存到底層數據結構中;
  2. 若是 block 是 True

    • timeoutNone 時,那麼get操做可能會阻塞,直到隊列中有元素(默認);
    • timeout 是非負數,則會阻塞相應時間直到隊列中有元素,在這個期間,若是隊列中一直沒有元素,則拋出 Empty 異常;

最後,經過 self.queue.popleft() 將最先放入隊列的元素移除,並通知 not_full ,喚醒在其中等待數據的線程。

這裏有個值得注意的地方,在 put() 操做中遞增了 self.unfinished_tasks ,而 get() 中卻沒有遞減,這是爲何?

這實際上是爲了留給用戶一個消費元素的時間,get() 僅僅是獲取元素,並不表明消費者線程處理的該元素,用戶須要調用 task_done() 來通知隊列該任務處理完成了:

class Queue:
    ...
    def task_done(self):
        with self.all_tasks_done:
            unfinished = self.unfinished_tasks - 1
            if unfinished <= 0:
                if unfinished < 0: # 也就是成功調用put()的次數小於調用task_done()的次數時,會拋出異常
                    raise ValueError('task_done() called too many times')
                self.all_tasks_done.notify_all() # 當unfinished爲0時,會通知all_tasks_done
            self.unfinished_tasks = unfinished

    def join(self):
        with self.all_tasks_done:
            while self.unfinished_tasks: # 若是有未完成的任務,將調用wait()方法等待
                self.all_tasks_done.wait()

因爲 task_done() 使用方調用的,當 task_done() 次數大於 put() 次數時會拋出異常。

task_done() 操做的做用是喚醒正在阻塞的 join() 操做。join() 方法會一直阻塞,直到隊列中全部的元素都被取出,並被處理了(和線程的join方法相似)。也就是說 join() 方法必須配合 task_done() 來使用才行。

LIFO 後進先出隊列

LifoQueue使用後進先出順序,與棧結構類似:

class LifoQueue(Queue):
    '''Variant of Queue that retrieves most recently added entries first.'''

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self):
        return len(self.queue)

    def _put(self, item):
        self.queue.append(item)

    def _get(self):
        return self.queue.pop()

這就是 LifoQueue 所有代碼了,這正是 Queue 設計很棒的一個緣由,它將底層的數據操做抽象成四個操做函數,自己來處理線程安全的問題,使得其子類只需關注底層的操做。

LifoQueue 底層數據結構改用 list 來存放,經過 self.queue.pop() 就能將 list 中最後一個元素移除,無需重置索引。

PriorityQueue 優先隊列

from heapq import heappush, heappop

class PriorityQueue(Queue):
    '''Variant of Queue that retrieves open entries in priority order (lowest first).

    Entries are typically tuples of the form:  (priority number, data).
    '''

    def _init(self, maxsize):
        self.queue = []

    def _qsize(self):
        return len(self.queue)

    def _put(self, item):
        heappush(self.queue, item)

    def _get(self):
        return heappop(self.queue)

優先隊列使用了 heapq 模塊的結構,也就是最小堆的結構。優先隊列更爲經常使用,隊列中項目的處理順序須要基於這些項目的特徵,一個簡單的例子:

import queue

class A:
    def __init__(self, priority, value):
        self.priority = priority
        self.value = value

    def __lt__(self, other):
        return self.priority < other.priority


q = queue.PriorityQueue()

q.put(A(1, 'a'))
q.put(A(0, 'b'))
q.put(A(1, 'c'))

print(q.get().value)  # 'b'

使用優先隊列的時候,須要定義 __lt__ 魔術方法,來定義它們之間如何比較大小。若元素的 priority 相同,依然使用先進先出的順序。

參考

相關文章
相關標籤/搜索