RabbitMQ如何應對Server出現異常以及server分發任務的負載均衡問題

上一篇文章結尾我留了一個問題,就是在work2中設no_ack=Truepython

出現那樣的結果是由於server一次分發多個結果給work2,work2又在執行完了之後沒有發送ack確認執行結束,server端是根據connection是否存在來判斷work2是否stoped,因爲鏈接存在,因此server認爲work2正在執行,其實此時work2已經執行結束,正在等待server發送消息。。。fetch

一:spa

task是被加載到內存中的,要避免server崩潰致使的task丟失,固然想到的辦法就是持久化,將task保存到硬盤上code

在定義Queue的時候,設置屬性durable=Trueorm

channel.queue_declare(=,=)

與此同時須要告知生產者,queue是被持久化在硬盤上的server

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

二:內存


如何解決server一次分發多個任務的狀況,也能夠經過設置屬性it

channel.basic_qos(prefetch_count=1)

new_task.py:io

# -*- coding: UTF-8 -*-
import pika

if __name__ == '__main__':
    connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel = connection.channel()
    channel.queue_declare(queue="Kadima", durable=True)
    message = "You are awsome!"
    for i in range(0, 100):  # 循環100次發送消息
        channel.basic_publish(exchange="", routing_key='Kadima', body=message + " " + str(i),
                              properties=pika.BasicProperties(delivery_mode=2))
    print "sending ", message


work代碼:class

#-*- coding: UTF-8 -*-
import time
import pika
import sys

__author__ = 'Yue'

var=0

def callback(ch, method, properties, body):
#     <pika.adapters.blocking_connection.BlockingChannel object at 0x02973BF0> <Basic.
# Deliver(['consumer_tag=ctag1.8b367697d96c4579ba78914d8a4760a8', 'delivery_tag=50
# ', 'exchange=', 'redelivered=False', 'routing_key=Kadima'])> <BasicProperties> Y
# ou are awsome! 98

    # temp=var+1 #這裏有趣的是不能寫成var+=1或者var=var+1,要知道爲何,就須要清楚「Python全局變量和局部變量」
    # global var
    # var+=1
    # if var==20:
    #     print var , body
    #     sys.exit()
    print "1 received %r" % (body,)
    # time.sleep(0.1)
    print "Done"
    #設置返回ack的標誌,method.delivery_tag是MQ分發給Work時的一個標記
    ch.basic_ack(delivery_tag = method.delivery_tag)


if __name__ == '__main__':
    connection=pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel=connection.channel()
    channel.basic_qos(prefetch_count=1)
    channel.queue_declare(queue="Kadima",durable=True)
    channel.basic_consume(callback,queue="Kadima")
    print ' [1] Waiting for messages'
    channel.start_consuming()

work2.py

import time
import pika

__author__ = 'Yue'


def callback(ch, method, properties, body):
    print "2 received %r" % (body,)
    # time.sleep(0.1)
    print "Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)



if __name__ == '__main__':
    connection=pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel=connection.channel()
    channel.basic_qos(prefetch_count=1)
    channel.queue_declare(queue="Kadima",durable=True)
    channel.basic_consume(callback,queue="Kadima")
    print ' [2] Waiting for messages'
    channel.start_consuming()

3運行結果

能夠看出server對任務的分發已經變得隨機,而不是原來的依次分發

相關文章
相關標籤/搜索