工做隊列中,每一個任務之分發給一個工做者。若是須要分發一個消息給多個消費者,這種模式被稱爲「發佈/訂閱」html
RabbitMQ完整的消息模型python
發佈者(producer)是發佈消息的應用程序git
隊列(queue)用於消息存儲的緩衝服務器
消費者(consumer)是接收消息的應用程序spa
RabbitMQ消息模型的核心理念是:.net
發佈者(producer)不會直接發送任何消息給隊列。事實上,發佈者(producer)甚至不知道消息是否已經被投遞到隊列。日誌
發佈者(producer)只須要把消息發送給一個交換器(exchage),而後由它一邊從發佈者接收消息,一邊把消息推入隊列。交換器必須知道如何處理它接收到的消息,是應該推送到指定的隊列仍是多個隊列,或者直接忽略消息。這些規則經過exchange type來定義。code
一、directorm
處理路由鍵,須要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵徹底匹配。server
是完整的匹配,與routing_key對應。
二、topic
將路由鍵和某模式進行匹配。此時隊列須要綁定在一個模式上。
三、headers
heads類型的Exchange不依賴於routing key與bingding key的匹配規則來路由消息,而是根據發送的消息內容中的headers屬性進行匹配。
在綁定Queue與Exchange時指定一組鍵值對,當消息發送到Exchange時,RabbitMQ會取到該消息的headers,對比其中的鍵值對是否徹底匹配Queue與Exchange綁定時指定的鍵值對,若是徹底匹配,則消息路由到該隊列,不然不會路由。
四、fanout
不處理路由鍵,只須要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的全部隊列上。相似子網廣播,每臺子網內的主機都得到一份複製的消息。
fanout交換機轉發消息是最快的。
exchange_declare(self, exchange=None, exchange_type='direct', passive=False, durable=False, auto_delete=False, internal=False, nowait=False, arguments=None, type=None) method of pika.adapters.blocking_connection.BlockingChannel instance This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class. If passive set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not and if the exchange does not already exist, the server MUST raise a channel exception with reply code 404 (not found). :param exchange: The exchange name consists of a non-empty sequence of these characters: letters, digits, hyphen, underscore, period, or colon. :type exchange: str or unicode :param str exchange_type: The exchange type to use :param bool passive: Perform a declare or just check to see if it exists :param bool durable: Survive a reboot of RabbitMQ :param bool auto_delete: Remove when no more queues are bound to it :param bool internal: Can only be published to by other exchanges :param bool nowait: Do not expect an Exchange.DeclareOk response :param dict arguments: Custom key/value pair arguments for the exchange :param str type: The deprecated exchange type parameter
guosong@guosong:~$ rabbitmqctl list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic ...done.
以前的列子中指定exchange=‘’,命名爲空字符串。
channel.basic_publish(exchange='', routing_key='hello', body=message)
exchange參數就是交換器的名稱,空字符串表明匿名交換器,消息將會根據指定的routing_key分發到指定的隊列。
對於共享同隊列的需求,指定隊列名稱比較重要。
對於日誌系統而言,打算接收全部的日誌消息,關心的是最新的消息。
爲了解決這個問題,須要作兩件事情。
一、隨機建立隊列名
RabbitMQ能夠爲咱們選擇一個隨機的隊列名(推薦),固然也能夠指定。
#經過不指定queue參數值,實現RabbitMQ分配隨機隊列名 result = channel.queue_declare()
#能夠經過以下方式獲取消息隊列名稱
result.method.queue
二、當與消費者(consumer)斷開鏈接的時候,這個隊列應當被刪除。可使用exclusive標識。
result = channel.queue_declare(exclusive=True)
已經建立一個fanout類型的交換器和一個隊列。須要告訴交換器如何發送消息給咱們的隊列。
交換器和隊列之間的關係稱之爲綁定(binding)
#logs交換器將會把消息添加到隊列中 #隊列是上面說的服務器隨機命名的 channel.queue_bind(exchange='logs', queue=result.method.queue)
綁定列表查看
guosong@guosong:~$ rabbitmqctl list_bindings Listing bindings ... exchange task_queue queue task_queue [] gs_test_exchange exchange task_queue queue task_queue [] ...done.
把消息發送給logs交換器,在發送的時候提供routing_key參數,可是它的值會被fanout交換器忽略
#!/usr/bin/env python #-*- coding:utf8 -*- import sys import pika import logging logging.basicConfig(format='%(levelname)s:$(message)s',level=logging.CRITICAL) def emit_log(): pika.connection.Parameters.DEFAULT_HOST = 'localhost' pika.connection.Parameters.DEFAULT_PORT = 5672 pika.connection.Parameters.DEFAULT_VIRTUAL_HOST = '/' pika.connection.Parameters.DEFAULT_USERNAME = 'guosong' pika.connection.Parameters.DEFAULT_PASSWORD = 'guosong' para = pika.connection.Parameters() connection = pika.BlockingConnection(para) channel = connection.channel() #聲明一個logs交換器,類型爲fanout,不容許發佈消息到不存在的交換器 channel.exchange_declare(exchange='logs',type='fanout') message = '.'.join(sys.argv[1:]) or "info:Hello World!" #發送的時候指定routing_key爲空,沒有綁定隊列到交換器上,消息將會丟失 #對於日誌類消息,若是沒有消費者監聽的話,這些消息就會忽略 channel.basic_publish(exchange='logs',routing_key='',body=message) #%r也是string類型 print "[x] Sent %r" % (message,) connection.close() if __name__ == '__main__': emit_log()
一、兩個臨時隊列
二、兩個consumer輸出
第二個consumer開啓晚一些,所以其收到消息少一些,由於在發佈的時候指定routing_key爲空,exchange不會保留。
三、在consumer退出的時候,兩個臨時產生的隊列也自動刪除。
logs交換器把數據發送給兩個系統命名的隊列,符合指望需求。
一、http://zhanghua.1199.blog.163.com/blog/static/4644980720128732417654/
二、http://adamlu.net/rabbitmq/tutorial-three-python
三、http://blog.csdn.net/puncha/article/details/8449273
四、http://www.ostest.cn/archives/497