RabbitMQ消息隊列(二): 工做隊列

1. 工做隊列:html

對於資源密集型任務,咱們等待其處理完成在不少狀況下是不現實的,好比沒法在http的短暫請求窗口中處理大量耗時任務,python

爲了達到主線程無需等待,任務異步執行的要求,咱們能夠將任務加入任務隊列,如圖,多個workers能夠共享安全

同一個任務隊列,同時對任務進行處理,主線程P將延後任務發送到隊列以後便可返回,延遲任務由C1和C2處理完成;異步

 

2. 輪詢調度:測試

隊列會將消息輪詢分發給worker,如上圖兩個worker,則首先發送消息到C1,而後發送消息到C2,而後在發送消息到C1,C2,C1...,fetch

隊列按順序發送,這樣保證了每一個worker收到的消息是均等的,默認設置狀況下,隊列並不會考慮worker當前的負載狀況。spa

 

3. 均衡調度:線程

如2中所說,好比如今有兩個隊列,奇數消息都須要隊列作大量繁重的處理,而偶數消息則須要處理的邏輯很是少,這樣就會形成某個隊列code

任務繁重,等待處理任務過多,從而使消息處理不均衡,處理能力降低。面對這樣的狀況,RabbitMQ提供了均衡調度機制,指定workerhtm

只能接收一條消息,當worker處理完畢,隊列收到消息確認(4中描述)的時候,纔會派發給該worker一條新消息。由此,達到對消息和隊列處理能力的均衡調度。

以下,咱們可使用basic_qos,並將perfetch_count設置爲1,來告訴隊列每次只發送一條消息給當前worker,直到收到完成確認才發下一條。

channel.basic_qos(prefetch_count=1)

 

4. 消息確認:

當不使用消息確認的狀況下,隊列將消息投遞給worker以後,會當即將消息從隊列內存中刪除;此時,若是woker被停掉或者崩潰,

那麼worker當前正在處理的消息和隊列已經派發給worker的消息都會丟失。

RabbitMQ提供了消息確認機制,worker完成處理消息以後發送ack,隊列確認消息已處理完畢,纔將其從內存中刪除。可是這個過程沒有

超時,哪怕woker處理了很長時間也是沒問題的。當worker掛掉,隊列沒有收到消息ack,若是有其餘worker在運行,那麼worker

會將未確認的消息派發給其餘運行中的worker。ack確認機制默認是開啓的,固然能夠在channel中關閉。

注意,必定要確保在消息處理完成以後發送ack,不然隊列內存將會隨消息的增長而不斷增長,甚至形成內存耗盡。

 

5. 消息持久化:

消息ack解決了worker掛掉時候消息的安全性,可是沒法針對整個服務的重啓或者掛掉,當RabbitMQ重啓或者掛掉的時候,隊列和消息都會消失,

爲了不這種狀況發生,咱們須要設置隊列和消息持久化。

(1) 設置隊列持久化:durable=True

channel.queue_declare(queue='task_queue', durable=True)

(2) 設置消息持久化:delivery_mode=2

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

上述設置雖然必定程度上保證了消息持久化,可是在收到消息和持久化消息之間仍然有時間窗口存在,且並非每條消息都會寫一次磁盤,

因此這個時間窗口內仍然可能丟失消息,若是要確保持久化足夠健壯,請參考 https://www.rabbitmq.com/confirms.html

 

6. 測試代碼:

new_task.py--用於發送消息到隊列

 1 #!/usr/bin/env python
 2 import pika
 3 import sys
 4 
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6         host='localhost'))
 7 channel = connection.channel()
 8 
 9 # 設置隊列持久化
10 channel.queue_declare(queue='task_queue', durable=True)
11 
12 message = ' '.join(sys.argv[1:]) or "Hello World!"
13 channel.basic_publish(exchange='',
14                       routing_key='task_queue',
15                       body=message,
16                       properties=pika.BasicProperties(
17                          delivery_mode = 2, # 設置消息持久化
18                       ))
19 print(" [x] Sent %r" % message)
20 connection.close()

 

worker.py--用於接收隊列消息並完成消息處理

 1 #!/usr/bin/env python
 2 import pika
 3 import time
 4 
 5 connection = pika.BlockingConnection(pika.ConnectionParameters(
 6         host='localhost'))
 7 channel = connection.channel()
 8 
 9 # 設置隊列持久化
10 channel.queue_declare(queue='task_queue', durable=True)
11 print(' [*] Waiting for messages. To exit press CTRL+C')
12 
13 def callback(ch, method, properties, body):
14     print(" [x] Received %r" % body)
15     time.sleep(body.count(b'.'))
16     print(" [x] Done")
17     # 完成消息處理,發送ack確認消息    
18     ch.basic_ack(delivery_tag = method.delivery_tag)
19 
20 # 最多同時接受一條消息
21 channel.basic_qos(prefetch_count=1)
22 channel.basic_consume(callback,
23                       queue='task_queue')
24 
25 channel.start_consuming()
相關文章
相關標籤/搜索