隊列--線程安全的FIFO實現

隊列 — 線程安全的FIFO實現

queue 模塊提供了一個適合多線程編程的先入、先出(FIFO)數據結構。它能夠用來安全地傳遞生產者和消費者線程之間的消息或其餘數據。因爲線程安全,多線程能夠安全地處理同一個Queue實例。python

基本的FIFO隊列

Queue類實現基本的先進先出容器
put() -- 從隊尾添加元素
get() -- 從隊首刪除元素,並返回該元素編程

import queue

q = queue.Queue()

for i in range(5):
    q.put(i)

while not q.empty():
    print(q.get(), end=' ')
print()

這個例子使用一個線程來講明元素添加與刪除的順序相同,即先進先出。 結果:安全

0 1 2 3 4

一個小例子:擊鼓傳炸彈數據結構

image
代碼實現:多線程

import queue
import random
q = queue.Queue()
nameList = ['A', 'B', 'C', 'D', 'E', 'F']
for name in nameList:
    q.put(name)

while not q.qsize() == 1:
    num = int(random.uniform(0, 20))
    print(f'num={num}', end=' ')
    while num:   # 隨機轉動num次
        q.put(q.get())
        num -= 1
    outName = q.get()
    print(f'outName={outName}')
print(f'winner={q.get()}')

運行結果:dom

num=2 outName=C
num=6 outName=E
num=7 outName=D
num=0 outName=F
num=11 outName=B
winner=A

LIFO隊列

與隊列的標準FIFO實現不一樣,LifoQueue使用後進先出順序(一般與棧相關聯)。spa

import queue

q = queue.LifoQueue()

for i in range(5):
    q.put(i)

while not q.empty():
    print(q.get(), end=' ')
print()

結果:線程

4 3 2 1 0

優先級隊列 Priority Queue

有時候,隊列中的項目的處理順序須要基於這些項的特徵,而不單單是它們被建立或添加到隊列中的順序。例如,來自工資部門的打印做業可能優先於開發人員想要打印的代碼清單。PriorityQueue使用隊列內容的排序順序來決定要檢索哪一項。code

import functools
import queue
import threading


@functools.total_ordering
class Job:

    def __init__(self, priority, description):
        self.priority = priority
        self.description = description
        print('New job:', description)
        return

    def __eq__(self, other):
        try:
            return self.priority == other.priority
        except AttributeError:
            return NotImplemented

    def __lt__(self, other):
        try:
            return self.priority < other.priority
        except AttributeError:
            return NotImplemented


q = queue.PriorityQueue()

q.put(Job(3, 'Mid-level job'))
q.put(Job(10, 'Low-level job'))
q.put(Job(1, 'Important job'))


def process_job(q):
    while True:
        next_job = q.get()
        print('Processing job:', next_job.description)
        q.task_done()


workers = [
    threading.Thread(target=process_job, args=(q,)),
    threading.Thread(target=process_job, args=(q,)),
]
for w in workers:
    w.setDaemon(True)
    w.start()

q.join()

結果:orm

New job: Mid-level job
New job: Low-level job
New job: Important job
Processing job: Important job
Processing job: Mid-level job
Processing job: Low-level job

一點解釋 :

task_done()
意味着以前入隊的一個任務已經完成。由隊列的消費者線程調用。每個get()調用獲得一個任務,接下來的task_done()調用告訴隊列該任務已經處理完畢。 即每個由put()調用入隊的任務都有一個對應的task_done()調用。

join()
阻塞調用線程,直到隊列中的全部任務被處理掉。

只要有數據被加入隊列,未完成的任務數就會增長。當消費者線程調用task_done()(意味着有消費者取得任務並完成任務),未完成的任務數就會減小。當未完成的任務數降到0,join()解除阻塞。

put(item[, block[, timeout]])
將item放入隊列中。

  • 若是可選的參數block爲True且timeout爲空對象(默認的狀況,阻塞調用,無超時)。
  • 若是timeout是個正整數,阻塞調用進程最多timeout秒,若是一直無空空間可用,拋出Full異常(帶超時的阻塞調用)。
  • 若是block爲False,若是有空閒空間可用將數據放入隊列,不然當即拋出Full異常

其非阻塞版本爲put_nowait等同於put(item, False)

get([block[, timeout]])
從隊列中移除並返回一個數據。block跟timeout參數同put方法
其非阻塞方法爲get_nowait()至關與get(False)

empty() 若是隊列爲空,返回True,反之返回False

相關文章
相關標籤/搜索