1. 工做隊列:html
對於資源密集型任務,咱們等待其處理完成在不少狀況下是不現實的,好比沒法在http的短暫請求窗口中處理大量耗時任務,python
爲了達到主線程無需等待,任務異步執行的要求,咱們能夠將任務加入任務隊列,如圖,多個workers能夠共享安全
同一個任務隊列,同時對任務進行處理,主線程P將延後任務發送到隊列以後便可返回,延遲任務由C1和C2處理完成;異步
2. 輪詢調度:測試
隊列會將消息輪詢分發給worker,如上圖兩個worker,則首先發送消息到C1,而後發送消息到C2,而後在發送消息到C1,C2,C1...,fetch
隊列按順序發送,這樣保證了每一個worker收到的消息是均等的,默認設置狀況下,隊列並不會考慮worker當前的負載狀況。spa
3. 均衡調度:線程
如2中所說,好比如今有兩個隊列,奇數消息都須要隊列作大量繁重的處理,而偶數消息則須要處理的邏輯很是少,這樣就會形成某個隊列code
任務繁重,等待處理任務過多,從而使消息處理不均衡,處理能力降低。面對這樣的狀況,RabbitMQ提供了均衡調度機制,指定workerhtm
只能接收一條消息,當worker處理完畢,隊列收到消息確認(4中描述)的時候,纔會派發給該worker一條新消息。由此,達到對消息和隊列處理能力的均衡調度。
以下,咱們可使用basic_qos,並將perfetch_count設置爲1,來告訴隊列每次只發送一條消息給當前worker,直到收到完成確認才發下一條。
channel.basic_qos(prefetch_count=1)
4. 消息確認:
當不使用消息確認的狀況下,隊列將消息投遞給worker以後,會當即將消息從隊列內存中刪除;此時,若是woker被停掉或者崩潰,
那麼worker當前正在處理的消息和隊列已經派發給worker的消息都會丟失。
RabbitMQ提供了消息確認機制,worker完成處理消息以後發送ack,隊列確認消息已處理完畢,纔將其從內存中刪除。可是這個過程沒有
超時,哪怕woker處理了很長時間也是沒問題的。當worker掛掉,隊列沒有收到消息ack,若是有其餘worker在運行,那麼worker
會將未確認的消息派發給其餘運行中的worker。ack確認機制默認是開啓的,固然能夠在channel中關閉。
注意,必定要確保在消息處理完成以後發送ack,不然隊列內存將會隨消息的增長而不斷增長,甚至形成內存耗盡。
5. 消息持久化:
消息ack解決了worker掛掉時候消息的安全性,可是沒法針對整個服務的重啓或者掛掉,當RabbitMQ重啓或者掛掉的時候,隊列和消息都會消失,
爲了不這種狀況發生,咱們須要設置隊列和消息持久化。
(1) 設置隊列持久化:durable=True
channel.queue_declare(queue='task_queue', durable=True)
(2) 設置消息持久化:delivery_mode=2
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
上述設置雖然必定程度上保證了消息持久化,可是在收到消息和持久化消息之間仍然有時間窗口存在,且並非每條消息都會寫一次磁盤,
因此這個時間窗口內仍然可能丟失消息,若是要確保持久化足夠健壯,請參考 https://www.rabbitmq.com/confirms.html
6. 測試代碼:
new_task.py--用於發送消息到隊列
1 #!/usr/bin/env python 2 import pika 3 import sys 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 # 設置隊列持久化 10 channel.queue_declare(queue='task_queue', durable=True) 11 12 message = ' '.join(sys.argv[1:]) or "Hello World!" 13 channel.basic_publish(exchange='', 14 routing_key='task_queue', 15 body=message, 16 properties=pika.BasicProperties( 17 delivery_mode = 2, # 設置消息持久化 18 )) 19 print(" [x] Sent %r" % message) 20 connection.close()
worker.py--用於接收隊列消息並完成消息處理
1 #!/usr/bin/env python 2 import pika 3 import time 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 # 設置隊列持久化 10 channel.queue_declare(queue='task_queue', durable=True) 11 print(' [*] Waiting for messages. To exit press CTRL+C') 12 13 def callback(ch, method, properties, body): 14 print(" [x] Received %r" % body) 15 time.sleep(body.count(b'.')) 16 print(" [x] Done") 17 # 完成消息處理,發送ack確認消息 18 ch.basic_ack(delivery_tag = method.delivery_tag) 19 20 # 最多同時接受一條消息 21 channel.basic_qos(prefetch_count=1) 22 channel.basic_consume(callback, 23 queue='task_queue') 24 25 channel.start_consuming()