Python 源碼分析:queue 隊列模塊

起步編程

queue 模塊提供適用於多線程編程的先進先出(FIFO)數據結構。由於它是線程安全的,因此多個線程很輕鬆地使用同一個實例。安全

源碼分析數據結構

先從初始化的函數來看:多線程

 

 

從這初始化函數能獲得哪些信息呢?首先,隊列是能夠設置其容量大小的,而且具體的底層存放元素的它使用了collections.deque()雙端列表的數據結構,這使得能很方便的作先進先出操做。這裏還特意抽象爲_init函數是爲了方便其子類進行覆蓋,容許子類使用其餘結構來存放元素(好比優先隊列使用了 list)。ide

而後就是線程鎖self.mutex,對於底層數據結構self.queue的操做都要先得到這把鎖;再往下是三個條件變量,這三個 Condition 都以self.mutex做爲參數,也就是說它們共用一把鎖;從這能夠知道諸如with self.mutex與with self.not_empty等都是互斥的。函數

基於這些鎖而作的一些簡單的操做:源碼分析

 

 

這個代碼片斷挺好理解的,無需分析。線程

做爲隊列,主要得完成入隊與出隊的操做,首先是入隊:設計

 

 

儘管只有二十幾行的代碼,但這裏的邏輯仍是比較複雜的。它要處理超時與隊列剩餘空間不足的狀況,具體幾種狀況以下:索引

一、若是 block 是 False,忽略timeout參數

若此時隊列已滿,則拋出 Full 異常;

若此時隊列未滿,則當即把元素保存到底層數據結構中;

二、若是 block 是 True

若 timeout 是 None 時,那麼put操做可能會阻塞,直到隊列中有空閒的空間(默認);

若 timeout 是非負數,則會阻塞相應時間直到隊列中有剩餘空間,在這個期間,若是隊列中一直沒有空間,拋出 Full 異常;

處理好參數邏輯後,,將元素保存到底層數據結構中,並遞增unfinished_tasks,同時通知not_empty,喚醒在其中等待數據的線程。

出隊操做:

 

 

get()操做是put()相反的操做,代碼塊也及其類似,get()是從隊列中移除最早插入的元素並將其返回。

一、若是 block 是 False,忽略timeout參數

若此時隊列沒有元素,則拋出 Empty 異常;

若此時隊列由元素,則當即把元素保存到底層數據結構中;

二、若是 block 是 True

若 timeout 是 None 時,那麼get操做可能會阻塞,直到隊列中有元素(默認);

若 timeout 是非負數,則會阻塞相應時間直到隊列中有元素,在這個期間,若是隊列中一直沒有元素,則拋出 Empty 異常;

最後,經過self.queue.popleft()將最先放入隊列的元素移除,並通知not_full,喚醒在其中等待數據的線程。

這裏有個值得注意的地方,在put()操做中遞增了self.unfinished_tasks,而get()中卻沒有遞減,這是爲何?

這實際上是爲了留給用戶一個消費元素的時間,get()僅僅是獲取元素,並不表明消費者線程處理的該元素,用戶須要調用task_done()來通知隊列該任務處理完成了:

 

 

因爲task_done()使用方調用的,當task_done()次數大於put()次數時會拋出異常。

task_done()操做的做用是喚醒正在阻塞的join()操做。join()方法會一直阻塞,直到隊列中全部的元素都被取出,並被處理了(和線程的join方法相似)。也就是說join()方法必須配合task_done()來使用才行。

LIFO 後進先出隊列

LifoQueue使用後進先出順序,與棧結構類似:

 

 

這就是 LifoQueue 所有代碼了,這正是 Queue 設計很棒的一個緣由,它將底層的數據操做抽象成四個操做函數,自己來處理線程安全的問題,使得其子類只需關注底層的操做。

LifoQueue 底層數據結構改用 list 來存放,經過 self.queue.pop() 就能將 list 中最後一個元素移除,無需重置索引。

PriorityQueue 優先隊列

 

 

優先隊列使用了 heapq 模塊的結構,也就是最小堆的結構。優先隊列更爲經常使用,隊列中項目的處理順序須要基於這些項目的特徵,一個簡單的例子:

 

 

使用優先隊列的時候,須要定義__lt__魔術方法,來定義它們之間如何比較大小。若元素的 priority 相同,依然使用先進先出的順序。--------------------- 

相關文章
相關標籤/搜索