RabbitMQ入門(三)訂閱模式

  在以前的文章RabbitMQ入門(二)工做隊列中,咱們建立了一個工做隊列。工做隊列背後的假設是每一項任務都被準確地傳送至一個worker。在本文中,咱們將會作一些不一樣的事情——咱們將會把一個消息發送至許多消費者中。這種模式被稱爲訂閱模式(publish/subscribe)
  爲了解釋這種模式,咱們將會構建一個簡單的日誌系統。它包含兩個程序——第一個將會產生消息,第二個將會接收並輸出這些消息。
  在咱們的日誌系統中,每個正在運行的接收程序都會收到消息。在這種方式下,咱們能夠運行一個接收程序來接收並將日誌保存至硬盤;同時,咱們還能運行另外一個接收程序,在屏幕上觀察到日誌的輸出。
  特別地,發送的這些消息都會被廣播到全部的接收程序。python

交換(Exchanges)

  在以前的文章中,咱們向隊列發送消息,從隊列中接受消息。如今是時候介紹RabbitMQ中的所有消息轉發模式。
  讓咱們快速地瀏覽下以前文章中講了些什麼:緩存

  • 一個生產者(Producer)是用於產生消息的用戶應用程序;
  • 一個隊列(Queue)是緩存區,用於儲存消息;
  • 一個消費者(Consumer)是用於接收消息的用戶應用程序。

RabbitMQ中消息傳輸模式的核心思想是生產者毫不會直接向隊列發送任何消息。實際上,一般狀況下生產者甚至都不會知道消息是否會被髮送至隊列。
  生產者會將消息發送至交換(exchange)交換並不複雜。一方面它從生產者中接受消息,另外一方面將消息推送至隊列。交換必須知道,當它接受一個消息時,它該怎麼作。是否這個消息會附加至一個特殊的隊列?是否它會附加至許多隊列?或者它會被丟棄。這個規則用交換類型(exchange type)來定義。bash

有一些可用的交換類型直接分發(direct)通配分發(topic)headers複製分發(fanout)。咱們將會集中講最後一個——fanout。咱們建立一個交換,類型爲fanout,並取名爲logs:spa

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

fanout交換很是簡單。顧名思義,它會將全部它知道的接收隊列的消息都廣播出去。而這也正是咱們的日誌系統所須要的。
  如今,咱們能夠發佈已經命名好的隊列了:.net

channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

臨時隊列

  你也許還記得在以前的文章中,咱們須要給隊列取名。可是呢,給隊列命名太麻煩了——咱們須要將workers指定到同一個隊列。當你須要在生產者和消費者之間共享隊列的時候,給隊列命名又是很重要的。
  這種情形並不適合咱們的日誌系統。咱們想要監聽全部的消息,而不是部分消息。同時,咱們僅對當前的流動消息感興趣,而不是以前的消息。爲了解決這個問題,咱們須要作兩件事情。
  首先,不管什麼時候咱們鏈接到RabbitMQ,咱們須要一個新的空隊列。爲此,咱們建立一個隨機命名的隊列,或者更好的是,讓RabbitMQ Server來給咱們建立一個隨機命名的隊列。所以,咱們能夠利用queue_declare命令,設置queuq參數爲空:日誌

result = channel.queue_declare(queue='')

此時,result.method.queue會包含一個隨機命名的隊列,好比說,它會和amq.gen-JzTY20BRgKO-HjmUJj0wLg相似。
  其次,一旦消息者的鏈接關閉,咱們須要刪除隊列。這能夠用exclusive參數搞定:code

result = channel.queue_declare(queue='', exclusive=True)

綁定(Bindings)


  咱們已經建立了一個fanout 交換和隊列。如今咱們須要告訴交換,將消息發送至隊列。交換與隊列之間的關係叫作綁定(Bindings)blog

channel.queue_bind(exchange='logs',
                                    queue=result.method.queue)

  從如今開始,logs交換將會在咱們的隊列後追加消息。rabbitmq

代碼


  生產者代碼(emit_log.py):隊列

# -*- coding: utf-8 -*-
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()

  消費者代碼(receive_log.py):

# -*- coding: utf-8 -*-
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()

  開啓四個終端,其中一個用於保存日誌:

python3 receive_log.py > logs_from_rabbit.log

另外一個用於觀察日誌輸出:

python3 receive_log.py

日誌產生:

python3 emit_log.py

監聽綁定:

sudo rabbitmqctl list_bindings

運行截圖以下:

  本次分享到此結束,感謝你們閱讀~

相關文章
相關標籤/搜索