在以前的文章RabbitMQ入門(二)工做隊列中,咱們建立了一個工做隊列。工做隊列背後的假設是每一項任務都被準確地傳送至一個worker。在本文中,咱們將會作一些不一樣的事情——咱們將會把一個消息發送至許多消費者中。這種模式被稱爲訂閱模式(publish/subscribe)
。
爲了解釋這種模式,咱們將會構建一個簡單的日誌系統。它包含兩個程序——第一個將會產生消息,第二個將會接收並輸出這些消息。
在咱們的日誌系統中,每個正在運行的接收程序都會收到消息。在這種方式下,咱們能夠運行一個接收程序來接收並將日誌保存至硬盤;同時,咱們還能運行另外一個接收程序,在屏幕上觀察到日誌的輸出。
特別地,發送的這些消息都會被廣播到全部的接收程序。python
在以前的文章中,咱們向隊列發送消息,從隊列中接受消息。如今是時候介紹RabbitMQ中的所有消息轉發模式。
讓咱們快速地瀏覽下以前文章中講了些什麼:緩存
RabbitMQ中消息傳輸模式的核心思想是生產者毫不會直接向隊列發送任何消息。實際上,一般狀況下生產者甚至都不會知道消息是否會被髮送至隊列。
生產者會將消息發送至交換(exchange)
。交換
並不複雜。一方面它從生產者中接受消息,另外一方面將消息推送至隊列。交換
必須知道,當它接受一個消息時,它該怎麼作。是否這個消息會附加至一個特殊的隊列?是否它會附加至許多隊列?或者它會被丟棄。這個規則用交換類型(exchange type)
來定義。bash
有一些可用的交換類型
:直接分發(direct)
,通配分發(topic)
,headers
和複製分發(fanout)
。咱們將會集中講最後一個——fanout。咱們建立一個交換
,類型爲fanout,並取名爲logs:spa
channel.exchange_declare(exchange='logs', exchange_type='fanout')
fanout交換很是簡單。顧名思義,它會將全部它知道的接收隊列的消息都廣播出去。而這也正是咱們的日誌系統所須要的。
如今,咱們能夠發佈已經命名好的隊列了:.net
channel.basic_publish(exchange='logs', routing_key='', body=message)
你也許還記得在以前的文章中,咱們須要給隊列取名。可是呢,給隊列命名太麻煩了——咱們須要將workers指定到同一個隊列。當你須要在生產者和消費者之間共享隊列的時候,給隊列命名又是很重要的。
這種情形並不適合咱們的日誌系統。咱們想要監聽全部的消息,而不是部分消息。同時,咱們僅對當前的流動消息感興趣,而不是以前的消息。爲了解決這個問題,咱們須要作兩件事情。
首先,不管什麼時候咱們鏈接到RabbitMQ,咱們須要一個新的空隊列。爲此,咱們建立一個隨機命名的隊列,或者更好的是,讓RabbitMQ Server來給咱們建立一個隨機命名的隊列。所以,咱們能夠利用queue_declare
命令,設置queuq
參數爲空:日誌
result = channel.queue_declare(queue='')
此時,result.method.queue
會包含一個隨機命名的隊列,好比說,它會和amq.gen-JzTY20BRgKO-HjmUJj0wLg
相似。
其次,一旦消息者的鏈接關閉,咱們須要刪除隊列。這能夠用exclusive
參數搞定:code
result = channel.queue_declare(queue='', exclusive=True)
咱們已經建立了一個fanout 交換和隊列。如今咱們須要告訴交換,將消息發送至隊列。交換與隊列之間的關係叫作綁定(Bindings)
。blog
channel.queue_bind(exchange='logs', queue=result.method.queue)
從如今開始,logs
交換將會在咱們的隊列後追加消息。rabbitmq
生產者代碼(emit_log.py):隊列
# -*- coding: utf-8 -*- import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
消費者代碼(receive_log.py):
# -*- coding: utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
開啓四個終端,其中一個用於保存日誌:
python3 receive_log.py > logs_from_rabbit.log
另外一個用於觀察日誌輸出:
python3 receive_log.py
日誌產生:
python3 emit_log.py
監聽綁定:
sudo rabbitmqctl list_bindings
運行截圖以下:
本次分享到此結束,感謝你們閱讀~