上一篇咱們使用C#語言講解了單個消費者從消息隊列中處理消息的模型,這一篇咱們使用Python語言來說解多個消費者同時工做從一個Queue處理消息的模型。網絡
工做隊列(又稱:任務隊列——Task Queues)是爲了不等待一些佔用大量資源、時間的操做。當咱們把任務(Task)看成消息發送到隊列中,一個運行在後臺的工做者(worker)進程就會取出任務而後處理。當你運行多個工做者(workers),任務就會在它們之間共享。這個概念在網絡應用中是很是有用的,它能夠在短暫的HTTP請求中處理一些複雜的任務,我麼能夠將耗時的請求放在任務隊列,而後立馬返回響應,接下來由多個worker去處理複雜的業務操做。(這種架構叫作"分佈式異步隊列",有時候用來方式D-DOS攻擊,12306網站就是採用這種模式)架構
用Python操做Python模塊首先要到如pika這個包,利用pip install pika去安裝。併發
咱們首先寫一個new_task.py用來向任務隊列中寫入任務,已備用。異步
import pika import sys with pika.BlockingConnection(pika.connection.ConnectionParameters(host="localhost")) as connection: channel = connection.channel() channel.queue_declare(queue = "hello") for index in range(0,10): channel.basic_publish(exchange="", routing_key="hello", body="["+str(index)+"]" + "Hello World") connection.close()
接下來編寫works.py程序,咱們須要在works.py中建立消費者,讓消費者從任務隊列中提取任務去執行。分佈式
import pika import sys import time connection = pika.BlockingConnection(pika.connection.ConnectionParameters(host="localhost")) channel = connection.channel() channel.queue_declare(queue = "hello") channel.basic_ack() def callback(ch, method, properties, body): print(" [x] Received %r" % (body.decode('utf-8'),)) time.sleep(3) # 咱們在這裏利用線程休息來模擬一個比較耗時的任務處理 print(" [x] Done") channel.basic_consume(callback, queue='hello', no_ack= True) # 咱們把no_ack標記爲true用來屏蔽消息確認 channel.start_consuming() connection.close()
在callback函數中讓當前線程休息5秒用來模擬一個耗時的任務。ide
接下來首先打開兩個Terminal窗口同時去運行works.py程序,而後運行new_task.py程序來查看效果。注意:在這裏爲了說明多個work可以同時分享任務隊列中的隊列,咱們必定要先運行works.py,後運行new_task.py程序。具體緣由後面在說明。函數
默認來講,RabbitMQ會按順序得把消息發送給兩個消費者(consumer),平均每一個消費者都會收到同等數量得消息,這種發送消息的方式叫作——輪詢(round-robin)。這樣作的好處就是咱們在處理相同數量的task所用的時間成倍的減小了。work越多,咱們處理任務隊列所用的時間就越少,這在高併發系統中會很是有用。高併發
當前的代碼中,當消息被RabbitMQ發送給消費者(consumers)以後,立刻就會在內存中移除。這種狀況之下,假如其中一個工做者掛掉了,那麼它正在處理的消息就會丟失,而且與此同時,後面全部發送到這個工做者的還沒來得及處理的消息也都會丟失。這顯然不是咱們想看到的結果。咱們不想丟失任何消息,若是一個工做者(worker)掛掉了,咱們但願任務會從新發送給其餘的工做者(worker)。fetch
爲了防止消息丟失,RabbitMQ提供了消息響應(acknowledgments)。消費者會經過一個ack(響應),告訴RabbitMQ已經收到並處理了某條消息,而後RabbitMQ纔會釋放並刪除這條消息,而不是這條消息一發送出去立刻就從內存中刪除。網站
若是消費者(consumer)掛掉了,沒有發送響應,RabbitMQ就會認爲消息沒有被徹底處理,而後從新發送給其餘消費者(consumer)。這樣,及時工做者(workers)偶爾的掛掉,也不會丟失消息。消息是沒有超時這個概念的;當工做者與它斷開連的時候,RabbitMQ會從新發送消息。這樣在處理一個耗時很是長的消息任務的時候就不會出問題了。消息響應默認是開啓的。以前的例子中咱們可使用no_ack=True標識把它關閉。接下來咱們移除這個標識,當工做者(worker)完成了任務,就發送一個響應。
對咱們的workers.py稍微進行一下改動:
1 import pika 2 import sys 3 import time 4 5 connection = pika.BlockingConnection(pika.connection.ConnectionParameters(host="localhost")) 6 channel = connection.channel() 7 channel.queue_declare(queue = "hello") 8 channel.basic_ack() 9 10 def callback(ch, method, properties, body): 11 print(" [x] Received %r" % (body.decode('utf-8'),)) 12 time.sleep(3) 13 print(" [x] Done") 14 ch.basic_ack(delivery_tag = method.delivery_tag) # 2. channel.basic_ack()方法用來執行消息確認操做 15 16 channel.basic_consume(callback, 17 queue='hello', 18 no_ack= False) # 1. no_ack告訴RabbitMQ開啓消息確認機制,也就是說消息須要被確認 19 20 channel.start_consuming() 21 connection.close()
先開啓兩個Terinmal窗口執行workers.py而後執行new_task.py,當執行一半是利用ctrl+c關掉其中一個worker。能夠看到RabbitMQ將已經關掉的worker的沒來得及處理的消息,再一次發給worker2。以此保證消息不會丟失。
在回調方法中必定要記得調用channel.basic_ack()方法用來確認消息。緣由很容易理解,消息若是不確認,任務就算是被callback函數處理成功了,RabbitMQ在內存中也不會刪除這條任務,這條任務還會停留在內存中。這樣無疑會帶來一個比較大的bug。
RabbittMQ若是意外崩潰的話,就會丟失全部的「隊列」和「消息」。所以爲了確保信息不會丟失,有兩個事情是須要注意的:咱們必須把「隊列」和「消息」設爲持久化。下面的代碼分別演示瞭如何進行隊列持久化和消息持久化。
1 import pika 2 import sys 3 4 5 with pika.BlockingConnection(pika.connection.ConnectionParameters(host="localhost")) as connection: 6 channel = connection.channel() 7 channel.queue_declare(queue = "hello",durable=True) # 1.Queue持久化 8 for index in range(0,10): 9 channel.basic_publish(exchange="", 10 routing_key="hello", 11 body="["+str(index)+"]" + "Hello World", 12 properties= pika.BasicProperties( # 2.消息持久化 13 delivery_mode= 2 14 )) 15 connection.close()
在實際生產中咱們不必定全部的任務處理時都消耗一樣多的時間,有的任務須要更長的時間,有的任務須要比較少的時間。這樣就形成有的工做者比較繁忙,有的工做者比較輕鬆。然而RabbitMQ並不知道這些,它仍然一如既往的派發消息。這樣無疑會形成資源的浪費。
這時由於RabbitMQ只管分發進入隊列的消息,不會關心有多少消費者(consumer)沒有做出響應。它盲目的把第n-th條消息發給第n-th個消費者。
咱們可使用basic.qos方法,並設置prefetch_count=1。這樣是告訴RabbitMQ,再同一時刻,不要發送超過1條消息給一個工做者(worker),直到它已經處理了上一條消息而且做出了響應。這樣,RabbitMQ就會把消息分發給下一個空閒的工做者(worker)。這樣能保證消息是一個一個發出去的,而且是一個處理完成了再發另外一個,而不是一次性所有發分出去了。這樣儘量的保證了每一個worker的工做時間相同(公平調度),而且在相同時間執行效率高的worker會分享到更多的消息(多勞多得)。
channe.basic_qos(prefetch_count=1)
固然,若是全部的worker都長時間處於繁忙狀態,沒有時間接收下一條消息,那麼任務隊列就有可能滿了。咱們能夠增長worker的數量,或者想其餘辦法。
1 import pika 2 import sys 3 4 5 with pika.BlockingConnection(pika.connection.ConnectionParameters(host="localhost")) as connection: 6 channel = connection.channel() 7 channel.queue_declare(queue = "hello",durable = True) # 1.Queue持久化 8 for index in range(0,10): 9 channel.basic_publish(exchange="", 10 routing_key="hello", 11 body="["+str(index)+"]" + "Hello World", 12 properties= pika.BasicProperties( # 2.消息持久化 13 delivery_mode= 2 14 )) 15 connection.close()
1 import pika 2 import sys 3 import time 4 5 connection = pika.BlockingConnection(pika.connection.ConnectionParameters(host="localhost")) 6 channel = connection.channel() 7 channel.queue_declare(queue = "hello",durable = True) # 隊列持久化 8 9 def callback(ch, method, properties, body): 10 print(" [x] Received %r" % (body.decode('utf-8'),)) 11 time.sleep(5) 12 print(" [x] Done----%r" % time.strftime("%Y-%m-%d %X",time.localtime())) 13 ch.basic_ack(delivery_tag = method.delivery_tag) 14 15 channel.basic_qos(prefetch_count = 1) # 用來告訴每一個worker一次只能接受一條消息 16 channel.basic_consume(callback, 17 queue='hello', 18 no_ack = False) 19 channel.start_consuming() 20 connection.close()
1 import pika 2 import sys 3 import time 4 5 connection = pika.BlockingConnection(pika.connection.ConnectionParameters(host="localhost")) 6 channel = connection.channel() 7 channel.queue_declare(queue = "hello") 8 9 def callback(ch, method, properties, body): 10 print(" [x] Received %r" % (body.decode('utf-8'),)) 11 time.sleep(1) 12 print(" [x] Done----%r" % time.strftime("%Y-%m-%d %X",time.localtime())) 13 ch.basic_ack(delivery_tag = method.delivery_tag) 14 15 channel.basic_qos(prefetch_count = 1) 16 channel.basic_consume(callback, 17 queue='hello', 18 no_ack = False) 19 channel.start_consuming() 20 connection.close()