線程隊列數據共享與生產者消費者模型

線程隊列數據共享與生產者消費者模型

queue模塊簡介

  queue模塊提供了多種隊列,那麼它主要是用於多線程編程中的數據共享。html

  咱們都知道同一進程下的數據是能被多個線程共享的,那麼爲何這些線程在同一進程下還去使用隊列呢?python

 

  由於隊列是: 管道 + 鎖編程

 

  因此使用隊列來存放多個線程中用於共享的數據仍是爲了保證其數據的安全性。安全

  queue模塊中的隊列主要是用於本地測試中使用,正式上線時還得要使用Redis,這個是後話,咱們先來看關於queue模塊的使用。多線程

       官方中文文檔 ide

 

方法大全

 

queue模塊中的隊列方法大全 
方法名稱 功能描述
Queue.qsize() 返回當前隊列的大小
Queue.empty() 判斷當前隊列是否爲空
Queue.full() 判斷當前隊列是否已滿
Queue.put(item, block=True, timeout=None) item放入隊列中,block參數爲若是要操做的隊列目前已盡是否阻塞,timeout爲超時時間。
Queue.put_nowait(item) 至關於 put(item, False),若是操做的隊列已滿則不進行阻塞,而是拋出Full異常。
Queue.get(block=True, timeout=None) 將項目從隊列中取出,block參數爲若是要操做的隊列目前爲空是否阻塞,timeout爲超時時間。
Queue.get_nowait() 至關於 get(False),若是要操做的隊列爲空則不進行阻塞,而是拋出Empty異常。
Queue.task_done() 當消費者線程被join()阻塞後,生產者線程調用此方法會中止消費者線程的阻塞狀態。詳情請見下面補充
Queue.join() 當消費者線程被join()阻塞後,需等待消費者線程調用task_done()方法後方可中止阻塞。詳情請見下面的補充
注意:如遇到異常狀況請查看官方文檔,這個模塊的方法比較特殊 

 

三種不一樣的隊列

  queue模塊中,提供了三種隊列。以下:測試

 

  queue.Queue:先進先出隊列spa

  queue.LifoQueue:先進後出隊列線程

  queue.PriorityQueue:優先級隊列3d

 

import queue

# ==== 先進先出隊列 ====

q1 = queue.Queue(maxsize=5)  # 該隊列最大可存放5個項目
q1.put(1)   # 入隊操做
q1.put(2)
q1.put(3)

print(q1.get())  # 出隊操做
print(q1.get())
print(q1.get())

# ==== 執行結果  ====

"""
1
2
3
"""


# ==== 先進後出隊列 ====

q2 = queue.LifoQueue(maxsize=5)  # 該隊列最大可存放5個項目
q2.put(1)   # 入隊操做
q2.put(2)
q2.put(3)

print(q2.get())  # 出隊操做
print(q2.get())
print(q2.get())

# ==== 執行結果  ====

"""
3
2
1
"""

# ==== 優先級隊列 ====

q3 = queue.PriorityQueue(maxsize=5)
q2.put([10,"優先級爲10"])   # 入隊操做
q2.put([20,"優先級爲20"])
q2.put([30,"優先級爲30"])

print(q2.get())  # 出隊操做,若是想取出具體元素,加個 索引[1] 便可
print(q2.get())
print(q2.get())

# ==== 執行結果  ====

"""
[30, '優先級爲30']
[20, '優先級爲20']
[10, '優先級爲10']
"""
三種隊列的簡單實用

 

生產者消費者模型

  生產者消費者模型是一種思想,大概意思就是咱們不讓生成者與消費者之間進行直接接觸,而是經過某種緩衝的方式讓彼此之間的耦合度下降。生產者生產出了產品,消費者就能夠進行購買,若是沒有產品就等着。

  那麼這個時候咱們就可使用隊列了,由於隊列裏有阻塞的機制,我畫一幅圖,來看一眼。

image-20200702151514469

來,咱們嘗試用代碼實現一下:

import threading
import queue
import time


def producer():
    """生產者"""
name = "食神周樹人" count = 1 # 計數器 while count < 11: # 天天只作10個包子 produce = "包子" print("包子作好了,這是今天的第{0}個包子".format(count)) q.put(produce + str(count)) # 將產品放在管道中 count += 1 time.sleep(0.4) # 廚師0.4秒作一個包子 def consumer(): """消費者"""
name = threading.current_thread().getName() print("{0}正在等包子".format(name)) while 1: try: food = q.get(timeout=2) # 等拿包子,最多等2秒 time.sleep(0.2) # 消費者0.2秒吃一個包子 print("{0}把{1}吃了,真好吃!".format(name, food)) except queue.Empty: # 有的人等的不耐煩就不等了 print("{0}說:等了這麼久尚未,不等了".format(name)) return if __name__ == '__main__': q = queue.Queue(maxsize=10) # 一次最多放10個產品 name = ["大耳朵", "二狗", "三蛋子"] # 消費者姓名列表 for i in range(3): t1 = threading.Thread(target=consumer, name=name[i]) t1.start() producer() # 生產者啓動,目前只有一個生產者,也能夠多個 # ==== 執行結果 ==== """ 大耳朵正在等包子 二狗正在等包子 三蛋子正在等包子包子作好了,這是今天的第1個包子 三蛋子把包子1吃了,真好吃! 包子作好了,這是今天的第2個包子 二狗把包子2吃了,真好吃! 包子作好了,這是今天的第3個包子 大耳朵把包子3吃了,真好吃! 包子作好了,這是今天的第4個包子 三蛋子把包子4吃了,真好吃! 包子作好了,這是今天的第5個包子 二狗把包子5吃了,真好吃! 包子作好了,這是今天的第6個包子 大耳朵把包子6吃了,真好吃! 包子作好了,這是今天的第7個包子 三蛋子把包子7吃了,真好吃! 包子作好了,這是今天的第8個包子 二狗把包子8吃了,真好吃! 包子作好了,這是今天的第9個包子 大耳朵把包子9吃了,真好吃! 包子作好了,這是今天的第10個包子 三蛋子把包子10吃了,真好吃! 二狗說:等了這麼久尚未,不等了 大耳朵說:等了這麼久尚未,不等了 三蛋子說:等了這麼久尚未,不等了 """

 

補充:join()與task_done()

import threading
import queue
import time

def task_1():
    print("正在裝東西..")
    time.sleep(3)
    q.put("玫瑰花")  # 正在裝東西
    q.task_done()  # 通知對方能夠取了


def task_2():
    q.join() # 阻塞等待通知,接到通知說明隊列裏裏有東西了。
    print("取到了",q.get())  # 取東西


if __name__ == '__main__':

    q = queue.Queue(maxsize=5)

    t1 = threading.Thread(target=task_1,name="小明")
    t2 = threading.Thread(target=task_2,name="小花")

    t1.start()
    t2.start()

# ==== 執行結果 ====

"""
正在裝東西..
取到了 玫瑰花
"""
相關文章
相關標籤/搜索