源碼:https://github.com/ltoddy/rabbitmq-tutorialpython
(using the Pika Python client)git
在以前的教程中,咱們改進了日誌記錄系統。咱們沒有使用只有虛擬廣播的fanout交換,而是使用了direct交換,並讓選擇性接收日誌成爲了可能。github
儘管使用direct交換改進了咱們的系統,但它仍然有侷限性 - 它不能根據多個標準進行路由。併發
在咱們的日誌系統中,咱們可能不只須要根據嚴重性來訂閱日誌,還要根據發佈日誌的來源進行訂閱。您可能從syslog unix工具知道這個概念,
該工具根據嚴重性(info / warning / crit...)和工具(auth / cron / kern ...)來路由日誌。工具
這會給咱們很大的靈活性 - 由於咱們可能想聽取來自'cron'的error日誌,並且還聽取來自'kern'的全部日誌。ui
爲了在咱們的日誌系統中實現這一點,咱們須要瞭解更復雜的topic交換。spa
發送到topic交換的消息必須有規範的routing_key - 它必須是由點分隔的單詞列表。單詞能夠是任何東西,但一般它們指定了與該消息相關的一些功能。
一些有效的routing_key例子: "stock.usd.nyse","nyse.vmw","quick.orange.rabbit"。只要您願意,路由鍵中能夠有任意的單詞,但最多255個字節。unix
綁定鍵也必須是相同的形式。topic交換背後的邏輯與direct topic交換背後的邏輯相似 - 使用特定路由鍵發送的消息將被傳遞到與匹配綁定鍵綁定的全部隊列。
可是綁定鍵有兩個重要的特殊狀況:日誌
在這個例子中解釋這個很簡單:code
在這個例子中,咱們將發送全部描述動物的消息。消息將使用由三個字(兩個點)組成的路由鍵發送。
路由關鍵字中的第一個單詞將描述速度,第二個顏色和第三個物種:" <celerity> <color> <species> "。
咱們建立了三個綁定:Q1綁定了綁定鍵" *.orange.* ",Q2綁定了" *.*.rabbit "和" lazy.#"。
這些綁定能夠歸納爲:
將路由鍵設置爲"quick.orange.rabbit"的消息將傳遞到兩個隊列。消息"lazy.orange.elephant"也會去他們兩個。
另外一方面,"quick.orange.fox"只會進入第一個隊列,而"lazy.brown.fox"只會進入第二個隊列。
"lazy.pink.rabbit"只會傳遞到第二個隊列一次,即便它匹配了兩個綁定。
"quick.brown.fox"不匹配任何綁定,所以將被丟棄。
若是咱們違反咱們的合同併發送帶有一個或四個單詞的消息,如"orange"或"quick.orange.male.rabbit",
會發生什麼狀況?那麼,這些消息將不匹配任何綁定,並會丟失。
另外一方面,"lazy.orange.male.rabbit"即便有四個單詞,也會匹配最後一個綁定,並將傳遞到第二個隊列。
direct change 話題交換功能強大,能夠像其餘交流同樣行事。 當使用" \# "(散列)綁定鍵綁定隊列時,它將接收全部消息, 而無論路由密鑰如何 - 就像在*fanout*交換中同樣。 當在綁定中沒有使用特殊字符"\*"(星號)和"\#"(散列)時,主題交換將像*direct*交換同樣。
咱們將在咱們的日誌系統中使用topic交換。咱們首先假定日誌的路由鍵有兩個單詞:" <facility>.<severity> "。
代碼幾乎與前一個教程中的代碼相同 。
emit_log_topic.py的代碼:
#!/usr/bin/env python import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') routing_key = sys.argv[1:] if len(sys.argv) > 2 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
receive_logs_topic.py的代碼:
#!/usr/bin/env python import sys import pika # connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) connection = pika.BlockingConnection(pika.ConnectionParameters('172.17.0.2')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', exchange_type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
要接收全部日誌運行:
python receive_logs_topic.py "#"
要從設施「 kern 」 接收全部日誌:
python receive_logs_topic.py "kern.*"
或者,若是您只想聽到關於「 critical 」日誌的信息:
python receive_logs_topic.py "*.critical"
您能夠建立多個綁定:
python receive_logs_topic.py "kern. " ".critical"
發佈帶有路由鍵「 kern.critical 」類型的日誌:
python emit_log_topic.py "kern.critical" "A critical kernel error"