源碼:https://github.com/ltoddy/rabbitmq-tutorialpython
在以前的教程中,咱們構建了一個簡單的日誌系統 咱們可以將日誌消息廣播給許多接收者。git
在本教程中,咱們將添加一個功能 - 咱們將只能訂閱一部分消息。例如,咱們只能將重要的錯誤消息引導到日誌文件(以節省磁盤空間),同時仍然可以在控制檯上打印全部日誌消息。github
在前面的例子中,咱們已經建立了綁定。您可能會回想一下代碼:算法
channel.queue_bind(exchange=EXCHANGE_NAME, queue=queue_name)
綁定是交換和隊列之間的關係。這能夠簡單地理解爲: the queue is interested in messages from this exchange.this
綁定可使用額外的routing_key參數。爲了不與basic_publish參數混淆,咱們將其稱爲綁定鍵。這就是咱們如何使用一個鍵建立一個綁定:spa
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')
綁定鍵的含義取決於交換類型。咱們以前使用的 fanout 交換簡單地忽略了它的價值。rest
咱們以前教程的日誌記錄系統將全部消息廣播給全部消費者。咱們但願將其擴展爲容許根據其進行嚴格的過濾消息。
例如,咱們可能但願將嚴重錯誤的日誌消息寫入磁盤,而不會寫入警告或信息日誌消息。日誌
咱們正在使用fanout交換,這不會給咱們太多的靈活性 - 它只能無心識地播放。code
咱們將使用direct交換。direct交換背後的路由算法很簡單 - 消息進入隊列,其綁定密鑰與消息的路由密鑰徹底匹配。教程
爲了說明這一點,請考慮如下設置:
在這個設置中,咱們能夠看到有兩個隊列綁定的直接交換機X. 第一個隊列用綁定鍵orange綁定,第二個隊列有兩個綁定,一個綁定鍵爲black,另外一個爲green。
在這種設置中,使用路由鍵orange發佈到交換機的消息 將被路由到隊列Q1。帶有black或gree路由鍵的消息將進入Q2。全部其餘消息將被丟棄。
使用相同的綁定密鑰綁定多個隊列是徹底合法的。在咱們的例子中,咱們可使用綁定鍵black添加X和Q1之間的綁定。
在這種狀況下,direct交換就像fanout同樣,並將消息廣播到全部匹配的隊列。帶有路由鍵black的消息將傳送到Q1和Q2。
咱們將使用這個模型用於咱們的日誌系統。取而代之的fanout,咱們將消息發送到direct交換。咱們將提供嚴格的日誌做爲路由鍵(routing key)。
這樣接收腳本將可以選擇想要接收的消息。咱們先關注發出日誌的實現。
像往常同樣,咱們須要首先建立一個交換:
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
咱們準備發送一條消息:
channel.basic_publish(exchange='direct_logs', routing_key='', body=message)
爲了簡化事情,咱們將假設「severity」能夠是'info','warning','error'之一。
接收郵件的方式與上一個教程中的同樣,只有一個例外 - 咱們將爲每一個咱們感興趣的嚴重程度建立一個新綁定。
result = channel.queue_declare(exclusive=True) queue_name = result.method.queue for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
emit_log_direct.py的代碼:
#!/usr/bin/env python import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') severity = sys.args[1:] if len(sys.argv) > 2 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
receive_logs_direct.py的代碼:
#!/usr/bin/env python import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(cb, method, properities, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
若是隻想保存'warning'和'error'(而不是'info')將消息記錄到文件中,只需打開一個控制檯並輸入:
python receive_logs_direct.py warning error > logs_from_rabbit.log
若是您但願在屏幕上看到全部日誌消息,請打開一個新終端並執行如下操做:
python receive_logs_direct.py info warning error
例如,要輸出error日誌消息,只需輸入:
python emit_log_direct.py error "Run. Run. Or it will explode."