python中RabbitMQ的使用(工做隊列)

消息能夠理解爲任務,消息發送者能夠當作任務派送者(sender),消息接收者能夠當作工做者(worker)。dom

當工做者接收到一個任務,還沒完任務時分配者又發一個任務,此時須要多個工做者來共同處理這些任務。post

任務分派結構圖以下:fetch

注:此時有一個任務派送人P,兩個工做接收者C1和C2。this

如今咱們來模擬該狀況:spa

1.首先打開三個終端:code

2.分別在前兩個終端運行receive1.pyblog

 3.在第三個終端屢次運行send1.pyrabbitmq

 此時將會輪流向worker1和worker2分派任務。it

問題:io

在以上任務分配和完成狀況中,有幾個問題將會產生:

1.工做者任務是否完成?

2.工做者掛掉後,如何防止未完成的任務丟失,而且如何處理這些任務?

3.RabbitMQ自身出現問題,此時如何防止任務丟失?

4.任務有輕重之分,如何實現公平調度?

方案:

1.消息確認(Message acknowledgment)

當任務完成後,工做者(receiver)將消息反饋給RabbitMQ:

複製代碼
1 def callback(ch, method, properties, body):
2     print " [x] Received %r" % (body,)
3     #停頓5秒,方便ctrl+c退出
4     time.sleep(5)
5     print " [x] Done"
6     #當工做者完成任務後,會反饋給rabbitmq
7     ch.basic_ack(delivery_tag=method.delivery_tag)
複製代碼

2.保留任務(no_ack=False)

當工做者掛掉後,防止任務丟失:

# 去除no_ack=True參數或者設置爲False後能夠實現
# 一個工做者ctrl+c退出後,正在執行的任務也不會丟失,rabbitmq會將任務從新分配給其餘工做者。
channel.basic_consume(callback, queue='task_queue', no_ack=False)

3.消息持久化存儲(Message durability)

聲明持久化存儲:

# durable=True即聲明持久化存儲
channel.queue_declare(queue='task_queue', durable=True)

在發送任務時,用delivery_mode=2來標記任務爲持久化存儲:

複製代碼
1 # 用delivery_mode=2來標記任務爲持久化存儲:
2 channel.basic_publish(exchange='',
3                       routing_key='task_queue',
4                       body=message,
5                       properties=pika.BasicProperties(
6                           delivery_mode=2,
7                       ))
複製代碼

4.公平調度(Fair dispatch)

使用basic_qos設置prefetch_count=1,使得rabbitmq不會在同一時間給工做者分配多個任務,即只有工做者完成任務以後,纔會再次接收到任務

channel.basic_qos(prefetch_count=1)

完整代碼以下:

receive1.py

import time

import pika
# 建立鏈接
hostname="localhost"
parameters=pika.ConnectionParameters(hostname)
connection=pika.BlockingConnection(parameters)

# 建立通道
channel=connection.channel()

'''
queue_declare(self, 
                queue, 
                passive=False, 
                durable=False(是否將任務持久化存儲), 
                exclusive=False, 
                auto_delete=False, 
                arguments=None
                )
'''
channel.queue_declare(queue="task_queue",durable=True)

def callback(ch,method,properties,body):
    print (" [x] Received %r" % body)
    time.sleep(5)
    print(" [x] Done")
    # ch.basic_ack爲當工做者完成任務後,會反饋給rabbitmq
    ch.basic_ack(delivery_tag=method.delivery_tag)

# basic_qos設置prefetch_count=1,使得rabbitmq不會在同一時間給工做者分配多個任務,
# 即只有工做者完成任務以後,纔會再次接收到任務。
channel.basic_qos(prefetch_count=1)

# 去除no_ack=True參數或者設置爲False後能夠實現   ack:acknowledgment(確認)
# 一個工做者ctrl+c退出後,正在執行的任務也不會丟失,rabbitmq會將任務從新分配給其餘工做者。
channel.basic_consume('task_queue',callback,auto_ack=False)

# 開始接收信息,按ctrl+c退出
print (' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

send1.py

import pika
import random
connector=pika.BlockingConnection(pika.ConnectionParameters("localhost"))

channel=connector.channel()
# 若是rabbitmq自身掛掉的話,那麼任務會丟失。因此須要將任務持久化存儲起來,聲明持久化存儲:
channel.queue_declare(queue="task_queue",durable=True)

number=random.randint(1,1000)
messge="hello word:%s" %number
# 在發送任務的時候,用delivery_mode=2來標記任務爲持久化存儲:
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=messge,
    properties=pika.BasicProperties(
        delivery_mode=2
    )
)
print( " [x] Sent %s" % messge)
connector.close()

示例以下:

首先啓動三個終端,兩個先執行receive1.py,第三個屢次執行send1.py:

終端3:

此時分配三個任務,33分配給worker1,170分配給worker2,262分配給worker1

 

終端1:

worker1完成任務33後,開始任務262,咱們在任務完成前使用(CRTL+C)使worker1掛掉

 

終端2:

worker2完成任務170,原本沒有任務,可是worker1掛掉,此時接收他的任務262

相關文章
相關標籤/搜索