RabbitMQ之發佈訂閱

工做隊列中,每一個任務之分發給一個工做者。若是須要分發一個消息給多個消費者,這種模式被稱爲「發佈/訂閱」html

交換器(Exchanges)

RabbitMQ完整的消息模型python

發佈者(producer)是發佈消息的應用程序git

隊列(queue)用於消息存儲的緩衝服務器

消費者(consumer)是接收消息的應用程序spa

RabbitMQ消息模型的核心理念是:.net

發佈者(producer)不會直接發送任何消息給隊列。事實上,發佈者(producer)甚至不知道消息是否已經被投遞到隊列。日誌

 

發佈者(producer)只須要把消息發送給一個交換器(exchage),而後由它一邊從發佈者接收消息,一邊把消息推入隊列。交換器必須知道如何處理它接收到的消息,是應該推送到指定的隊列仍是多個隊列,或者直接忽略消息。這些規則經過exchange type來定義。code

交換器類型

一、directorm

處理路由鍵,須要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵徹底匹配。server

是完整的匹配,與routing_key對應。

Rabbitmq Exchange 類型 - zhanghua.1199 - 鬱,加沒

二、topic

將路由鍵和某模式進行匹配。此時隊列須要綁定在一個模式上。

符號#匹配一個或多個詞,符號*匹配很少很多一個詞。
例如audit.#可以匹配到audit.irs.corportate,可是audit.*只會匹配audit.irs
Rabbitmq Exchange 類型 - zhanghua.1199 - 鬱,加沒
相似消息歸類
注:多臺服務器訪問同一個隊列時,rabbitmq會到達該隊列的消息,而後平均發送。

三、headers

heads類型的Exchange不依賴於routing key與bingding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。

在綁定Queue與Exchange時指定一組鍵值對,當消息發送到Exchange時,RabbitMQ會取到該消息的headers,對比其中的鍵值對是否徹底匹配Queue與Exchange綁定時指定的鍵值對,若是徹底匹配,則消息路由到該隊列,不然不會路由。

四、fanout

不處理路由鍵,只須要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的全部隊列上。相似子網廣播,每臺子網內的主機都得到一份複製的消息。

fanout交換機轉發消息是最快的。

Rabbitmq Exchange 類型 - zhanghua.1199 - 鬱,加沒

Exchange聲明方法

exchange_declare(self, exchange=None, exchange_type='direct', passive=False, durable=False, auto_delete=False, internal=False, nowait=False, arguments=None, type=None) method of pika.adapters.blocking_connection.BlockingChannel instance
    This method creates an exchange if it does not already exist, and if
    the exchange exists, verifies that it is of the correct and expected
    class.
    
    If passive set, the server will reply with Declare-Ok if the exchange
    already exists with the same name, and raise an error if not and if the
    exchange does not already exist, the server MUST raise a channel
    exception with reply code 404 (not found).
    
    :param exchange: The exchange name consists of a non-empty sequence of
                      these characters: letters, digits, hyphen, underscore,
                      period, or colon.
    :type exchange: str or unicode
    :param str exchange_type: The exchange type to use
    :param bool passive: Perform a declare or just check to see if it exists
    :param bool durable: Survive a reboot of RabbitMQ
    :param bool auto_delete: Remove when no more queues are bound to it
    :param bool internal: Can only be published to by other exchanges
    :param bool nowait: Do not expect an Exchange.DeclareOk response
    :param dict arguments: Custom key/value pair arguments for the exchange
    :param str type: The deprecated exchange type parameter

  

列出服務器上全部的交換器

guosong@guosong:~$ rabbitmqctl list_exchanges
Listing exchanges ...
	direct
amq.direct	direct
amq.fanout	fanout
amq.headers	headers
amq.match	headers
amq.rabbitmq.log	topic
amq.rabbitmq.trace	topic
amq.topic	topic
...done.

 匿名的交換器

以前的列子中指定exchange=‘’,命名爲空字符串。

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)

exchange參數就是交換器的名稱,空字符串表明匿名交換器,消息將會根據指定的routing_key分發到指定的隊列。

臨時隊列

對於共享同隊列的需求,指定隊列名稱比較重要。

對於日誌系統而言,打算接收全部的日誌消息,關心的是最新的消息。

爲了解決這個問題,須要作兩件事情。

一、隨機建立隊列名

RabbitMQ能夠爲咱們選擇一個隨機的隊列名(推薦),固然也能夠指定。

#經過不指定queue參數值,實現RabbitMQ分配隨機隊列名
result = channel.queue_declare()

#能夠經過以下方式獲取消息隊列名稱
result.method.queue

二、當與消費者(consumer)斷開鏈接的時候,這個隊列應當被刪除。可使用exclusive標識。

result = channel.queue_declare(exclusive=True)

 

綁定(Bindings,隊列與交換器之間綁定)

 

已經建立一個fanout類型的交換器和一個隊列。須要告訴交換器如何發送消息給咱們的隊列。

交換器和隊列之間的關係稱之爲綁定(binding)

#logs交換器將會把消息添加到隊列中
#隊列是上面說的服務器隨機命名的
channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

綁定列表查看

guosong@guosong:~$ rabbitmqctl list_bindings
Listing bindings ...
	exchange	task_queue	queue	task_queue	[]
gs_test_exchange	exchange	task_queue	queue	task_queue	[]
...done.

 

例子

把消息發送給logs交換器,在發送的時候提供routing_key參數,可是它的值會被fanout交換器忽略

 

#!/usr/bin/env python
#-*- coding:utf8 -*-
import sys 
import pika
import logging

logging.basicConfig(format='%(levelname)s:$(message)s',level=logging.CRITICAL)

def emit_log():

    pika.connection.Parameters.DEFAULT_HOST = 'localhost'
    pika.connection.Parameters.DEFAULT_PORT = 5672
    pika.connection.Parameters.DEFAULT_VIRTUAL_HOST = '/' 
    pika.connection.Parameters.DEFAULT_USERNAME = 'guosong'
    pika.connection.Parameters.DEFAULT_PASSWORD = 'guosong'

    para = pika.connection.Parameters()

    connection = pika.BlockingConnection(para)

    channel = connection.channel()
    #聲明一個logs交換器,類型爲fanout,不容許發佈消息到不存在的交換器
    channel.exchange_declare(exchange='logs',type='fanout')

    message = '.'.join(sys.argv[1:]) or "info:Hello World!"

    #發送的時候指定routing_key爲空,沒有綁定隊列到交換器上,消息將會丟失
    #對於日誌類消息,若是沒有消費者監聽的話,這些消息就會忽略
    channel.basic_publish(exchange='logs',routing_key='',body=message)

    #%r也是string類型
    print "[x] Sent %r" % (message,)                                                         

    connection.close()

if __name__ == '__main__':
    emit_log()

演示結果

一、兩個臨時隊列

 二、兩個consumer輸出

第二個consumer開啓晚一些,所以其收到消息少一些,由於在發佈的時候指定routing_key爲空,exchange不會保留。

三、在consumer退出的時候,兩個臨時產生的隊列也自動刪除。

logs交換器把數據發送給兩個系統命名的隊列,符合指望需求。

參考連接

一、http://zhanghua.1199.blog.163.com/blog/static/4644980720128732417654/

二、http://adamlu.net/rabbitmq/tutorial-three-python

三、http://blog.csdn.net/puncha/article/details/8449273

四、http://www.ostest.cn/archives/497

相關文章
相關標籤/搜索