rabbitmq中文教程python版 - 路由

源碼:https://github.com/ltoddy/rabbitmq-tutorialpython

路由

本章節教程重點介紹的內容

在以前的教程中,咱們構建了一個簡單的日誌系統 咱們可以將日誌消息廣播給許多接收者。git

在本教程中,咱們將添加一個功能 - 咱們將只能訂閱一部分消息。例如,咱們只能將重要的錯誤消息引導到日誌文件(以節省磁盤空間),同時仍然可以在控制檯上打印全部日誌消息。github

綁定

在前面的例子中,咱們已經建立了綁定。您可能會回想一下代碼:算法

channel.queue_bind(exchange=EXCHANGE_NAME,
                   queue=queue_name)

綁定是交換和隊列之間的關係。這能夠簡單地理解爲: the queue is interested in messages from this exchange.this

綁定可使用額外的routing_key參數。爲了不與basic_publish參數混淆,咱們將其稱爲綁定鍵。這就是咱們如何使用一個鍵建立一個綁定:spa

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

綁定鍵的含義取決於交換類型。咱們以前使用的 fanout 交換簡單地忽略了它的價值。rest

直接交換

咱們以前教程的日誌記錄系統將全部消息廣播給全部消費者。咱們但願將其擴展爲容許根據其進行嚴格的過濾消息。
例如,咱們可能但願將嚴重錯誤的日誌消息寫入磁盤,而不會寫入警告或信息日誌消息。日誌

咱們正在使用fanout交換,這不會給咱們太多的靈活性 - 它只能無心識地播放。code

咱們將使用direct交換。direct交換背後的路由算法很簡單 - 消息進入隊列,其綁定密鑰與消息的路由密鑰徹底匹配。教程

爲了說明這一點,請考慮如下設置:

image

在這個設置中,咱們能夠看到有兩個隊列綁定的直接交換機X. 第一個隊列用綁定鍵orange綁定,第二個隊列有兩個綁定,一個綁定鍵爲black,另外一個爲green

在這種設置中,使用路由鍵orange發佈到交換機的消息 將被路由到隊列Q1。帶有blackgree路由鍵的消息將進入Q2。全部其餘消息將被丟棄。

多個綁定

image

使用相同的綁定密鑰綁定多個隊列是徹底合法的。在咱們的例子中,咱們可使用綁定鍵black添加XQ1之間的綁定。
在這種狀況下,direct交換就像fanout同樣,並將消息廣播到全部匹配的隊列。帶有路由鍵black的消息將傳送到Q1Q2

發出日誌

咱們將使用這個模型用於咱們的日誌系統。取而代之的fanout,咱們將消息發送到direct交換。咱們將提供嚴格的日誌做爲路由鍵(routing key)。
這樣接收腳本將可以選擇想要接收的消息。咱們先關注發出日誌的實現。

像往常同樣,咱們須要首先建立一個交換:

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

咱們準備發送一條消息:

channel.basic_publish(exchange='direct_logs',
                      routing_key='',
                      body=message)

爲了簡化事情,咱們將假設「severity」能夠是'info','warning','error'之一。

訂閱

接收郵件的方式與上一個教程中的同樣,只有一個例外 - 咱們將爲每一個咱們感興趣的嚴重程度建立一個新綁定。

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

把它放在一塊兒

image

emit_log_direct.py的代碼:

#!/usr/bin/env python
import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

severity = sys.args[1:] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

receive_logs_direct.py的代碼:

#!/usr/bin/env python
import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(cb, method, properities, body):
    print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

若是隻想保存'warning'和'error'(而不是'info')將消息記錄到文件中,只需打開一個控制檯並輸入:

python receive_logs_direct.py warning error > logs_from_rabbit.log

若是您但願在屏幕上看到全部日誌消息,請打開一個新終端並執行如下操做:

python receive_logs_direct.py info warning error

例如,要輸出error日誌消息,只需輸入:

python emit_log_direct.py error "Run. Run. Or it will explode."
相關文章
相關標籤/搜索