RabbitMQ的任務分發

direct分發機制會根據分發關鍵字(routing_key),將task分發到指定的queue,work只須要監聽相應的queue便可,在代碼中,須要設置相應的routing_keypython

fanout機制相反,他會將task分發給全部的queuespa

fanout模式:code

emit_log.pyit

# -*- coding: UTF-8 -*-
import pika

if __name__ == '__main__':

    connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))
    channel = connection.channel()
    channel.exchange_declare(exchange="logs2",type="direct")
    message = "You are awsome!"
    for i in range(0, 100):  # 循環100次發送消息
        if i%2==0:
            channel.basic_publish(exchange="logs2", routing_key='even', body=message + " " + str(i),)
        else:
            channel.basic_publish(exchange="logs2", routing_key='old', body=message + " " + str(i),)

    print "sending ", message

    #兩個receive_log 都將接收到task

receive_log.pyio

pika

__author__ = callback(, , , body):
    body


__name__ == :
    connection=pika.BlockingConnection(pika.ConnectionParameters())
    channel=connection.channel()
    channel.exchange_declare(=,=)
    result=channel.queue_declare(=)
    queue_name=result..queue
    ,queue_name
    channel.queue_bind(=,=queue_name)
    channel.basic_consume(callback,=queue_name,=)
    channel.start_consuming()

receive_log2.py
class

pika

__author__ = callback(, , , body):
    body


__name__ == :
    connection=pika.BlockingConnection(pika.ConnectionParameters())
    channel=connection.channel()
    channel.exchange_declare(=,=)
    result=channel.queue_declare(=)
    queue_name=result.method.queue
    ,queue_name
    channel.queue_bind(=,=queue_name)
    channel.basic_consume(callback,=queue_name,=)
    channel.start_consuming()

能夠看出兩個work均接受到全部的消息import

direct模式:coding

work的代碼只須要將上述代碼中的type改成type="direct",並綁定不一樣的exchange便可,循環


pika

__author__ = __name__ == :

    connection = pika.BlockingConnection(pika.ConnectionParameters())
    channel = connection.channel()
    channel.exchange_declare(=,=)
    message = i (, ):  i%==:
            channel.basic_publish(=, =, =message + + (i),)
        :
            channel.basic_publish(=, =, =message + + (i),)

    , message

receive_even_log.pychannel

pika

__author__ = callback(, , , body):
    body


__name__ == :
    connection=pika.BlockingConnection(pika.ConnectionParameters())
    channel=connection.channel()
    channel.exchange_declare(=,=)
    result=channel.queue_declare(=)
    queue_name=result..queue
    ,queue_name
    channel.queue_bind(=,=queue_name,=)
    channel.basic_consume(callback,=queue_name,=)
    channel.start_consuming()

receive_old_log.py

pika

__author__ = callback(, , , body):
    body


__name__ == :
    connection=pika.BlockingConnection(pika.ConnectionParameters())
    channel=connection.channel()
    channel.exchange_declare(=,=)
    result=channel.queue_declare(=)
    queue_name=result.method.queue
    ,queue_name
    channel.queue_bind(=,=queue_name,=)
    channel.basic_consume(callback,=queue_name,=)
    channel.start_consuming()

從結果中看出:task只分發給了相應的queue

相關文章
相關標籤/搜索