上篇文章中,咱們把每一個Message都是deliver到某個Consumer。在這篇文章中,咱們將會將同一個Message deliver到多個Consumer中。這個模式也被成爲 「publish / subscribe」。html
這篇文章中,咱們將建立一個日誌系統,它包含兩個部分:第一個部分是發出log(Producer),第二個部分接收到並打印(Consumer)。 咱們將構建兩個Consumer,第一個將log寫到物理磁盤上;第二個將log輸出的屏幕。python
關於exchange的概念在《RabbitMQ消息隊列(一): Detailed Introduction 詳細介紹》中有詳細介紹。如今作一下簡單的回顧。bash
RabbitMQ 的Messaging Model就是Producer並不會直接發送Message到queue。實際上,Producer並不知道它發送的Message是否已經到達queue。ui
Producer發送的Message其實是發到了Exchange中。它的功能也很簡單:從Producer接收Message,而後投遞到queue中。Exchange須要知道如何處理Message,是把它放到那個queue中,仍是放到多個queue中?這個rule是經過Exchange 的類型定義的。spa
咱們知道有三種類型的Exchange:direct, topic 和fanout。fanout就是廣播模式,會將全部的Message都放到它所知道的queue中。建立一個名字爲logs,類型爲fanout的Exchange:.net
channel.exchange_declare(exchange='logs',
type='fanout')複製代碼
Listing exchanges3d
經過rabbitmqctl能夠列出當前全部的Exchange:日誌
$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
logs fanout
amq.direct direct
amq.topic topic
amq.fanout fanout
amq.headers headers
...done.複製代碼
注意 amq.* exchanges 和the default (unnamed)exchange是RabbitMQ默認建立的。code
如今咱們能夠經過exchange,而不是routing_key來publish Message了:cdn
channel.basic_publish(exchange='logs',
routing_key='',
body=message)複製代碼
截至如今,咱們用的queue都是有名字的:第一個是hello,第二個是task_queue。使用有名字的queue,使得在Producer和Consumer以前共享queue成爲可能。
可是對於咱們將要構建的日誌系統,並不須要有名字的queue。咱們但願獲得全部的log,而不是它們中間的一部分。並且咱們只對當前的log感興趣。爲了實現這個目標,咱們須要兩件事情:
1) 每當Consumer鏈接時,咱們須要一個新的,空的queue。由於咱們不對老的log感興趣。幸運的是,若是在聲明queue時不指定名字,那麼RabbitMQ會隨機爲咱們選擇這個名字。方法:
result = channel.queue_declare()複製代碼
經過result.method.queue 能夠取得queue的名字。基本上都是這個樣子:amq.gen-JzTY20BRgKO-HjmUJj0wLg。
2)當Consumer關閉鏈接時,這個queue要被deleted。能夠加個exclusive的參數。方法:
result = channel.queue_declare(exclusive=True)複製代碼
複製代碼
方法:
channel.queue_bind(exchange='logs',
queue=result.method.queue)複製代碼
如今logs的exchange就將它的Message附加到咱們建立的queue了。
Listing bindings
使用命令rabbitmqctl list_bindings。
咱們最終實現的數據流圖以下:
Producer,在這裏就是產生log的program,基本上和前幾個都差很少。最主要的區別就是publish經過了exchange而不是routing_key。
emit_log.py script:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print " [x] Sent %r" % (message,)
connection.close()複製代碼
還有一點要注意的是咱們聲明瞭exchange。publish到一個不存在的exchange是被禁止的。若是沒有queue bindings exchange的話,log是被丟棄的。
Consumer:receive_logs.py:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs',
queue=queue_name)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r" % (body,)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()複製代碼
咱們開始不是說須要兩個Consumer嗎?一個負責記錄到文件;一個負責打印到屏幕?
其實用重定向就能夠了,固然你想修改callback本身寫文件也行。咱們使用重定向的方法:
We’re done. If you want to save logs to a file, just open a console and type:
$ python receive_logs.py > logs_from_rabbit.log複製代碼
Consumer2:打印到屏幕:
$ python receive_logs.py複製代碼
接下來,Producer:
$ python emit_log.py複製代碼
使用命令rabbitmqctl list_bindings你能夠看咱們建立的queue。
一個output:
$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
...done.複製代碼
這個結果仍是很好理解的。
參考資料:
1. http://www.rabbitmq.com/tutorials/tutorial-three-python.html
2. http://blog.csdn.net/anzhsoft/article/details/19617305