源碼:https://github.com/ltoddy/rabbitmq-tutorialpython
(using the Pika Python client)git
在上一篇教程中,咱們建立了工做隊列。工做隊列背後的假設是每一個任務只能傳遞給一個工做人員。
在這一部分,咱們將作一些徹底不一樣的事情 - 咱們會向多個消費者傳遞信息。這種模式被稱爲「發佈/訂閱」。github
爲了說明這種模式,咱們將創建一個簡單的日誌系統。它將包含兩個程序 - 第一個將發送日誌消息,第二個將接收並打印它們。服務器
在咱們的日誌系統中,接收程序的每一個運行副本都會收到消息。這樣咱們就能夠運行一個接收器並將日誌指向磁盤; 同時咱們將可以運行另外一個接收器並在屏幕上查看日誌。spa
通常來講,發佈的日誌消息將以廣播的形式發給全部的接收者。日誌
在本教程的前幾部分中,咱們發送消息並從隊列中接收消息。如今是時候在rabbitmq中引入完整的消息傳遞模型。code
讓咱們快速回顧一下前面教程中的內容:教程
RabbitMQ中的消息傳遞模型的核心思想是生產者永遠不會將任何消息直接發送到隊列中。實際上,生產者一般甚至不知道郵件是否會被傳送到任何隊列中。rabbitmq
相反,生產者只能發送消息給交易所。交換是一件很是簡單的事情。一方面它接收來自生產者的消息,另外一方則推進他們排隊。
交易所必須知道如何處理收到的消息。是否應該附加到特定隊列?它應該附加到許多隊列中嗎?或者它應該被丟棄。這些規則由交換類型定義 (exchange type)。隊列
有幾種可用的交換類型: direct, topic, header 和 fanout。咱們將關注最後一個 - fanout。讓咱們建立該類型的交換,並將其稱爲logs:
channel.exchange_declare(exchange='logs', exchange_type='fanout')
fanout交換很是簡單。正如你可能從名字中猜出的那樣,它只是將收到的全部消息廣播到它所知道的全部隊列中。這正是咱們logger所須要的。
如今,咱們能夠發佈到咱們的指定交易所:
channel.basic_publish(exchange='logs', routing_key='', body=message)
正如你之前可能記得咱們正在使用具備指定名稱的隊列(還記得hello和task_queue嗎?)。可以命名隊列對咱們相當重要 - 咱們須要將工做人員指向同一隊列。
當你想在生產者和消費者之間分享隊列時,給隊列一個名字是很重要的。
可是,咱們的記錄器並不是如此。咱們但願聽到全部日誌消息,而不單單是其中的一部分。咱們也只對目前流動的消息感興趣,而不是舊消息。要解決這個問題,咱們須要作兩件事。
首先,每當咱們鏈接到rabbitmq,咱們須要一個新的,空的隊列。要作到這一點,咱們能夠建立一個隨機名稱的隊列,或者甚至更好 - 讓服務器爲咱們選擇一個隨機隊列名稱。
咱們能夠經過不將隊列參數提供給queue_declare來實現這一點:
result = channel.queue_declare()
此時,result.method.queue包含一個隨機隊列名稱。例如,它可能看起來像amq.gen-i94oCE_tj3LyWsy-94KXHg。
其次,一旦消費者鏈接關閉,隊列應該被刪除。這是一個專有標誌:
result = channel.queue_declare(exclusive=True)
咱們已經建立了一個fanout交換和一個隊列。如今咱們須要告訴交換所將消息發送到咱們的隊列。交換和隊列之間的關係稱爲綁定。
channel.queue_bind(exchange='logs', queue=result.method.queue)
從如今起,logs 交易所會將消息附加到咱們的隊列中。
發出日誌消息的生產者程序與以前的教程沒有多大區別。最重要的變化是咱們如今想發佈消息到咱們的logs交易所,而不是無名字的消息。發送時咱們須要提供一個routing_key,可是對於fanout交換,它的值將被忽略。這裏是emit_log.py腳本的代碼 :
#!/usr/bin/env python import sys import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello world!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
如你所見,創建鏈接後,咱們宣佈交易所。這一步是必要的,由於發佈到不存在的交易所是被禁止的。
若是沒有隊列綁定到交換機上,這些消息將會丟失,但這對咱們來講沒問題; 若是沒有消費者正在收聽,咱們能夠放心地丟棄消息。
receive_logs.py的代碼:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % body) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
咱們完成了。若是您想將日誌保存到文件中,只需打開一個控制檯並輸入:
python receive_logs.py > logs_from_rabbit.log
若是你想在屏幕上看到日誌,打開一個新的終端並運行:
python receive_logs.py
固然,
python emit_log.py
使用rabbitmqctl list_bindings,你能夠驗證代碼是否真正建立了綁定和隊列。當有兩個receive_logs.py程序正在運行,你應該看到以下所示:
root@921edcb46341:/# rabbitmqctl list_bindings Listing bindings for vhost /... exchange amq.gen-6YXn7BycIwtI7kFuUrTbaA queue amq.gen-6YXn7BycIwtI7kFuUrTbaA [] exchange amq.gen-JhFL-rbMAoricMu5Dyo-hA queue amq.gen-JhFL-rbMAoricMu5Dyo-hA [] logs exchange amq.gen-6YXn7BycIwtI7kFuUrTbaA queue amq.gen-6YXn7BycIwtI7kFuUrTbaA [] logs exchange amq.gen-JhFL-rbMAoricMu5Dyo-hA queue amq.gen-JhFL-rbMAoricMu5Dyo-hA []