RabbitMQ如何應對消費出現異常的狀況

1,生產者python

new_task.py函數

import pika

if __name__ == '__main__':
    connection=pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel=connection.channel()
    channel.queue_declare("Kadima")
    message="You are awsome!"
    for i in range(0,100):#循環100次發送消息
        channel.basic_publish(exchange="",routing_key='Kadima',body=message+" "+str(i))
    print "sending ",message

2,多個消費者spa

消費者1,work.pycode

#-*- coding: UTF-8 -*-
import time
import pika
import sys

__author__ = 'Yue'

var=0

def callback(ch, method, properties, body):
    # temp=var+1 #這裏有趣的是不能寫成var+=1或者var=var+1,要知道爲何,就須要清楚「Python全局變量和局部變量」
    #global var
    #var+=1
    #if var==20:
        #sys.exit()
    print "1 received %r" % (body,)
    time.sleep(body.count("."))
    print "Done"


if __name__ == '__main__':
    connection=pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel=connection.channel()
    channel.queue_declare("Kadima")
    channel.basic_consume(callback,queue="Kadima",no_ack=True)
    print ' [1] Waiting for messages'
    channel.start_consuming()

work2.pyit

import time
import pika

__author__ = 'Yue'


def callback(ch, method, properties, body):
    print "2 received %r" % (body,)
    time.sleep(body.count("."))
    print "Done"


if __name__ == '__main__':
    connection=pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel=connection.channel()
    channel.queue_declare("Kadima")
    channel.basic_consume(callback,queue="Kadima",no_ack=True)
    print ' [2] Waiting for messages'
    channel.start_consuming()

3,執行work,work2,new_taskio

個人啓動順序是work,work2,從執行結果能夠看出,RabbitMQ是將task分別依次分發給按照時間順序註冊的work上的,class

也就是,task1,task2,task3,task4,它會將task1,task3分發給work,另外兩個分發給task3,task4import


接下來,有趣的事情就要發生了:變量

當把work.py中的callback函數的註釋內容打開後(起做用是讓work處理19個task,便退出程序),MQ並無將本該分發給work的task分發給work2,那到底去哪裏了呢?我暫時假設爲work退出時並無告訴MQ他不幹了(他出現異常啦),MQ仍是會將task分發給workobject

4,那沒有執行完的任務怎麼辦呢?

Message acknowledgment :ack默認是打開的

修改work代碼以下

#-*- coding: UTF-8 -*-
import time
import pika
import sys

__author__ = 'Yue'

var=0

def callback(ch, method, properties, body):
    print ch, method, properties, body
#     <pika.adapters.blocking_connection.BlockingChannel object at 0x02973BF0> <Basic.
# Deliver(['consumer_tag=ctag1.8b367697d96c4579ba78914d8a4760a8', 'delivery_tag=50
# ', 'exchange=', 'redelivered=False', 'routing_key=Kadima'])> <BasicProperties> Y
# ou are awsome! 98

    temp=var+1 #這裏有趣的是不能寫成var+=1或者var=var+1,要知道爲何,就須要清楚「Python全局變量和局部變量」
    global var
    var+=1
    if var==20:
        print var , body
        sys.exit()
    print "1 received %r" % (body,)
    # time.sleep(0.1)
    print "Done"
    #設置返回ack的標誌,method.delivery_tag是MQ分發給Work時的一個標記
    ch.basic_ack(delivery_tag = method.delivery_tag)


if __name__ == '__main__':
    connection=pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel=connection.channel()
    channel.queue_declare("Kadima")
    channel.basic_consume(callback,queue="Kadima")
    print ' [1] Waiting for messages'
    channel.start_consuming()

work2

def callback(ch, method, properties, body):
    print "2 received %r" % (body,)
    # time.sleep(0.1)
    print "Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)



if __name__ == '__main__':
    connection=pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel=connection.channel()
    channel.queue_declare("Kadima")
    channel.basic_consume(callback,queue="Kadima")
    print ' [2] Waiting for messages'
    channel.start_consuming()

因而可知,MQ會從新將沒有執行,或執行失敗的任務從新分發給存活的work2,並且,他的分發順序也頗有趣,是在本來應該分發給work2的task執行結束後再去分發未執行的任務。‘


5,思考,若是在work2中

channel.basic_consume(callback,queue="Kadima",no_ack=True)

會出現什麼狀況。。。

相關文章
相關標籤/搜索