官方文檔:https://www.rabbitmq.com/tutorials/tutorial-one-python.htmlhtml
RabbitMQ是一個基於AMQP協議的消息代理。它的工做就是接收和轉發消息。你能夠把它想像成一個郵局:你把信件放入郵箱,郵遞員就會把信件投遞到你的收件人處。在這個比喻中,RabbitMQ就扮演着郵箱、郵局以及郵遞員的角色。python
瞭解AMQP:https://www.cnblogs.com/Xuuuuuu/p/10880640.html算法
RabbitMQ和郵局的主要區別在於,它處理紙張,而是接收、存儲和發送消息(message)這種二進制數據。shell
(使用pika 1.0.1 Python客戶端)小程序
接下來咱們用Python寫兩個小程序。一個發送單條消息的生產者(producer)和一個接收消息並將其輸出的消費者(consumer)。傳遞的消息是"Hello World"。緩存
下圖中,「P」表明生產者,「C」表明消費者,中間的盒子表明爲消費者保留的消息緩衝區,也就是咱們的隊列。安全
生產者(producer)把消息發送到一個名爲「hello」的隊列中。消費者(consumer)從這個隊列中獲取消息。bash
咱們第一個程序send.py
會發送一個消息到隊列中。首先要作的事情就是創建一個到RabbitMQ服務器的鏈接。服務器
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.1.233')) channel = connection.channel()
如今咱們已經跟本地機器的代理創建了鏈接。若是你想鏈接到其餘機器的代理上,須要把表明本地的localhost
改成指定的名字或IP地址。網絡
接下來,在發送消息以前,咱們須要確認服務於消費者的隊列已經存在。若是將消息發送給一個不存在的隊列,RabbitMQ會將消息丟棄掉。下面咱們建立一個名爲"hello"的隊列用來將消息投遞進去。
channel.queue_declare(queue='hello')
在RabbitMQ中,消息是不能直接發送到隊列中的,這個過程須要經過交換機(exchange)來進行。可是爲了避免讓細節拖累咱們的進度,這裏咱們只須要知道如何使用由空字符串表示的默認交換機便可。默認交換機比較特別,它容許咱們指定消息究竟須要投遞到哪一個具體的隊列中,隊列名字須要在routing_key
參數中指定。
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'")
在退出程序以前,咱們須要確認網絡緩衝已經被刷寫、消息已經投遞到RabbitMQ。經過安全關閉鏈接能夠作到這一點。
connection.close()
咱們的第二個程序receive.py
,將會從隊列中獲取消息並將其打印到屏幕上。
此次咱們仍是須要要先鏈接到RabbitMQ服務器。鏈接服務器的代碼和以前是同樣的。
下一步也和以前同樣,咱們須要確認隊列是存在的。咱們能夠屢次使用queue_declare
命令來建立同一個隊列,可是隻有一個隊列會被真正的建立。
channel.queue_declare(queue='hello')
你也許要問: 爲何要重複聲明隊列呢 —— 咱們已經在前面的代碼中聲明過它了。若是咱們肯定了隊列是已經存在的,那麼咱們能夠不這麼作,好比此前預先運行了send.py程序。但是咱們並不肯定哪一個程序會首先運行。這種狀況下,在程序中重複將隊列重複聲明一下是種值得推薦的作法。
列出全部隊列
你也許但願查看RabbitMQ中有哪些隊列、有多少消息在隊列中。此時你可使用rabbitmqctl工具(使用有權限的用戶):
sudo rabbitmqctl list_queues(在Windows中不須要sudo命令)
rabbitmqctl list_queues
從隊列中獲取消息相對來講稍顯複雜。須要爲隊列定義一個回調(callback)函數。當咱們獲取到消息的時候,Pika庫就會調用此回調函數。這個回調函數會將接收到的消息內容輸出到屏幕上。
def callback(ch, method, properties, body): print(" [x] Received %r" % body)
下一步,咱們須要告訴RabbitMQ這個回調函數將會從名爲"hello"的隊列中接收消息:
channel.basic_consume(queue='hello',
auto_ack=True, on_message_callback=callback)
要成功運行這些命令,咱們必須保證隊列是存在的,咱們的確能夠確保它的存在——由於咱們以前已經使用queue_declare
將其聲明過了。
no_ack
參數稍後會進行介紹。
最後,咱們運行一個用來等待消息數據而且在須要的時候運行回調函數的無限循環。
print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
輸出:
python receive.py # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Hello World!'
在第一篇教程中,咱們已經寫了一個從已知隊列中發送和獲取消息的程序。在這篇教程中,咱們將建立一個工做隊列(work queue),它會發送一些耗時的任務給多個工做者(Worker)
工做隊列(又稱:任務隊列)是爲了不等待一些佔用大量資源、時間的操做。當咱們把任務發送到隊列中,一個運行在後臺的工做者進程就會取出任務而後處理。當你運行多個工做者,任務就會在它們之間共享。
以前的教程中,咱們發送了一個包含「Hello World!」的字符串消息。如今,咱們將發送一些字符串,把這些字符串看成複雜的任務。咱們沒有真實的例子,例如圖片縮放、pdf文件轉換。因此使用time.sleep()函數來模擬這種狀況。咱們在字符串中加上點號(.)來表示任務的複雜程度,一個點(.)將會耗時1秒鐘。好比"Hello..."就會耗時3秒鐘。
咱們對以前教程的send.py作些簡單的調整,以即可以發送隨意的消息。這個程序會按照計劃發送任務到咱們的工做隊列中。咱們把它命名爲new_task.py:
import sys message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print " [x] Sent %r" % (message,)
咱們的舊腳本(receive.py)一樣須要作一些改動:它須要爲消息體中每個點號(.)模擬1秒鐘的操做。它會從隊列中獲取消息並執行,咱們把它命名爲worker.py:
import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.decode("utf-8").count('.') ) print " [x] Done"
使用工做隊列的一個好處就是它可以並行的處理隊列。若是堆積了不少任務,咱們只須要添加更多的工做者(workers)就能夠了,擴展很簡單。
首先,咱們先同時運行兩個worker.py腳本,它們都會從隊列中獲取消息,究竟是不是這樣呢?咱們看看。
你須要打開三個終端,兩個用來運行worker.py腳本,這兩個終端就是咱們的兩個消費者(consumers)—— C1 和 C2。
shell1$ python worker.py [*] Waiting for messages. To exit press CTRL+C
shell2$ python worker.py [*] Waiting for messages. To exit press CTRL+C
第三個終端,咱們用來發布新任務。你能夠發送一些消息給消費者(consumers):
shell3$ python new_task.py First message.
shell3$ python new_task.py Second message..
shell3$ python new_task.py Third message...
shell3$ python new_task.py Fourth message....
shell3$ python new_task.py Fifth message.....
看看到底發送了什麼給咱們的工做者(workers):
shell1$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
shell2$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
默認來講,RabbitMQ會按順序得把消息發送給每一個消費者(consumer)。平均每一個消費者都會收到同等數量得消息。這種發送消息得方式叫作——輪詢(round-robin)。試着添加三個或更多得工做者(workers)。
當處理一個比較耗時得任務的時候,你也許想知道消費者(consumers)是否運行到一半就掛掉。當前的代碼中,當消息被RabbitMQ發送給消費者(consumers)以後,立刻就會在內存中移除。這種狀況,你只要把一個工做者(worker)中止,正在處理的消息就會丟失。同時,全部發送到這個工做者的尚未處理的消息都會丟失。
咱們不想丟失任何任務消息。若是一個工做者(worker)掛掉了,咱們但願任務會從新發送給其餘的工做者(worker)。
爲了防止消息丟失,RabbitMQ提供了消息響應(acknowledgments)。消費者會經過一個ack(響應),告訴RabbitMQ已經收到並處理了某條消息,而後RabbitMQ就會釋放並刪除這條消息。
若是消費者(consumer)掛掉了,沒有發送響應,RabbitMQ就會認爲消息沒有被徹底處理,而後從新發送給其餘消費者(consumer)。這樣,及時工做者(workers)偶爾的掛掉,也不會丟失消息。
消息是沒有超時這個概念的;當工做者與它斷開連的時候,RabbitMQ會從新發送消息。這樣在處理一個耗時很是長的消息任務的時候就不會出問題了。
消息響應默認是開啓的。以前的例子中咱們可使用no_ack=True標識把它關閉。是時候移除這個標識了,當工做者(worker)完成了任務,就發送一個響應。
def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.decode('utf-8').count('.')) print(" [x] Done") ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(queue='hello', on_message_callback=callback)
運行上面的代碼,咱們發現即便使用CTRL+C殺掉了一個工做者(worker)進程,消息也不會丟失。當工做者(worker)掛掉這後,全部沒有響應的消息都會從新發送
忘記確認
一個很容易犯的錯誤就是忘了basic_ack,後果很嚴重。消息在你的程序退出以後就會從新發送,若是它不可以釋放沒響應的消息,RabbitMQ就會佔用愈來愈多的內存。
爲了排除這種錯誤,你可使用rabbitmqctl命令,輸出messages_unacknowledged字段:
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done.
若是你沒有特地告訴RabbitMQ,那麼在它退出或者崩潰的時候,將會丟失全部隊列和消息。爲了確保信息不會丟失,有兩個事情是須要注意的:咱們必須把「隊列」和「消息」設爲持久化。
首先,爲了避免讓隊列消失,須要把隊列聲明爲持久化(durable):
channel.queue_declare(queue='hello', durable=True)
儘管這行代碼自己是正確的,可是仍然不會正確運行。由於咱們已經定義過一個叫hello的非持久化隊列。RabbitMq不容許你使用不一樣的參數從新定義一個隊列,它會返回一個錯誤。但咱們如今使用一個快捷的解決方法——用不一樣的名字,例如task_queue。
channel.queue_declare(queue='task_queue', durable=True)
這個queue_declare必須在生產者(producer)和消費者(consumer)對應的代碼中修改。
這時候,咱們就能夠確保在RabbitMq重啓以後queue_declare隊列不會丟失。另外,咱們須要把咱們的消息也要設爲持久化——將delivery_mode的屬性設爲2。
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
將消息設爲持久化並不能徹底保證不會丟失。以上代碼只是告訴了RabbitMq要把消息存到硬盤,但從RabbitMq收到消息到保存之間仍是有一個很小的間隔時間。由於RabbitMq並非全部的消息都使用fsync(2)——它有可能只是保存到緩存中,並不必定會寫到硬盤中。並不能保證真正的持久化,但已經足夠應付咱們的簡單工做隊列。若是你必定要保證持久化,你須要改寫你的代碼來支持事務(transaction)。
你應該已經發現,它仍舊沒有按照咱們指望的那樣進行分發。好比有兩個工做者(workers),處理奇數消息的比較繁忙,處理偶數消息的比較輕鬆。然而RabbitMQ並不知道這些,它仍然一如既往的派發消息。
這時由於RabbitMQ只管分發進入隊列的消息,不會關心有多少消費者(consumer)沒有做出響應。它盲目的把第n-th條消息發給第n-th個消費者。
咱們可使用basic.qos方法,並設置prefetch_count=1。這樣是告訴RabbitMQ,再同一時刻,不要發送超過1條消息給一個工做者(worker),直到它已經處理了上一條消息而且做出了響應。這樣,RabbitMQ就會把消息分發給下一個空閒的工做者(worker)。
channel.basic_qos(prefetch_count=1)
new_task.py的完整代碼:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print " [x] Sent %r" % (message,) connection.close()
咱們的worker:
import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.decode("utf-8").count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
在上篇教程中,咱們搭建了一個工做隊列,每一個任務只分發給一個工做者(worker)。在本篇教程中,咱們要作的跟以前徹底不同 —— 分發一個消息給多個消費者(consumers)。這種模式被稱爲「發佈/訂閱」。
爲了描述這種模式,咱們將會構建一個簡單的日誌系統。它包括兩個程序——第一個程序負責發送日誌消息,第二個程序負責獲取消息並輸出內容。
在咱們的這個日誌系統中,全部正在運行的接收方程序都會接受消息。咱們用其中一個接收者(receiver)把日誌寫入硬盤中,另一個接受者(receiver)把日誌輸出到屏幕上。
最終,日誌消息被廣播給全部的接受者(receivers)。
前面的教程中,咱們發送消息到隊列並從中取出消息。如今是時候介紹RabbitMQ中完整的消息模型了。
讓咱們簡單的歸納一下以前的教程:
RabbitMQ消息模型的核心理念是:發佈者不會直接發送任何消息給隊列。事實上,發佈者甚至不知道消息是否已經被投遞到隊列。
發佈者只須要把消息發送給一個交換機。交換機很是簡單,它一邊從發佈者接收消息,一邊把消息推送到隊列。交換機必須知道如何處理它接收到的消息,是應該推送到指定的隊列仍是多個丟列,或則是直接忽略信息。這些規則是經過交換機類型來定義的。
有幾個可供選擇的交換機類型:直連交換機(direct), 主題交換機(topic), (頭交換機)headers和 扇型交換機(fanout)。咱們在這裏主要說明最後一個 —— 扇型交換機(fanout)。先建立一個fanout類型的交換機,命名爲logs:
channel.exchange_declare(exchange='logs', type='fanout')
扇型交換機(fanout)很簡單,你可能從名字上就能猜想出來,它把消息發送給它所知道的全部隊列。這正是咱們的日誌系統所須要的。
交換器列表
rabbitmqctl可以列出服務器上全部的交換器:
$ sudo rabbitmqctl list_exchanges Listing exchanges ... logs fanout amq.direct direct amq.topic topic amq.fanout fanout amq.headers headers ...done.
這個列表中有一些叫作amq.*的交換器。這些都是默認建立的,不過這時候你還不須要使用他們。
匿名的交換器
前面的教程中咱們對交換機一無所知,但仍然可以發送消息到隊列中。由於咱們使用了命名爲空字符串("")默認的交換機。
回想咱們以前是如何發佈一則消息:
channel.basic_publish(exchange='', routing_key='hello', body=message)exchange參數就是交換機的名稱。空字符串表明默認或者匿名交換機:消息將會根據指定的routing_key分發到指定的隊列。
如今,咱們就能夠發送消息到一個具名交換機了:
channel.basic_publish(exchange='logs', routing_key='', body=message)
你還記得以前咱們使用的隊列名嗎( hello和task_queue)?給一個隊列命名是很重要的——咱們須要把工做者(workers)指向正確的隊列。若是你打算在發佈者(producers)和消費者(consumers)之間共享同隊列的話,給隊列命名是十分重要的。
可是這並不適用於咱們的日誌系統。咱們打算接收全部的日誌消息,而不只僅是一小部分。咱們關心的是最新的消息而不是舊的。爲了解決這個問題,咱們須要作兩件事情。
首先,當咱們鏈接上RabbitMQ的時候,咱們須要一個全新的、空的隊列。咱們能夠手動建立一個隨機的隊列名,或者讓服務器爲咱們選擇一個隨機的隊列名(推薦)。咱們只須要在調用queue_declare方法的時候,不提供queue參數就能夠了:
result = channel.queue_declare()
這時候咱們能夠經過result.method.queue得到已經生成的隨機隊列名。它多是這樣子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。
第二步,當與消費者(consumer)斷開鏈接的時候,這個隊列應當被當即刪除。exclusive標識符便可達到此目的。
result = channel.queue_declare(exclusive=True)
咱們已經建立了一個扇型交換機(fanout)和一個隊列。如今咱們須要告訴交換機如何發送消息給咱們的隊列。交換器和隊列之間的聯繫咱們稱之爲綁定(binding)。
channel.queue_bind(exchange='logs', queue=result.method.queue)
如今,logs交換機將會把消息添加到咱們的隊列中。
綁定(binding)列表
你可使用
rabbitmqctl list_bindings
列出全部現存的綁定。
發佈日誌消息的程序看起來和以前的沒有太大區別。最重要的改變就是咱們把消息發送給logs交換機而不是匿名交換機。在發送的時候咱們須要提供routing_key參數,可是它的值會被扇型交換機(fanout exchange)忽略。如下是emit_log.py腳本:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.1.233")) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type="fanout") message = " ".join(sys.argv[1:]) or "info: hello world!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % (message,)) connection.close()
正如你看到的那樣,在鏈接成功以後,咱們聲明瞭一個交換器,這一個是很重要的,由於不容許發佈消息到不存在的交換器。
若是沒有綁定隊列到交換器,消息將會丟失。但這個沒有所謂,若是沒有消費者監聽,那麼消息就會被忽略。
receive_logs.py的代碼:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.1.233")) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type="fanout") message = " ".join(sys.argv[1:]) or "info: hello world!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % (message,)) connection.close()
正如你看到的那樣,在鏈接成功以後,咱們聲明瞭一個交換器,這一個是很重要的,由於不容許發佈消息到不存在的交換器。
若是沒有綁定隊列到交換器,消息將會丟失。但這個沒有所謂,若是沒有消費者監聽,那麼消息就會被忽略。
receive_logs.py的代碼:
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.1.233")) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type="fanout") result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print('[*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r" % (body,)) channel.basic_consume(auto_ack=True, queue=queue_name, on_message_callback=callback) channel.start_consuming()
這樣咱們就完成了。若是你想把日誌保存到文件中,只須要打開控制檯輸入:
$ python receive_logs.py > logs_from_rabbit.log
若是你想在屏幕中查看日誌,那麼打開一個新的終端而後運行:
$ python receive_logs.py
固然還要發送日誌:
$ python emit_log.py
使用rabbitmqctl list_bindings
你可確認已經建立的隊列綁定。你能夠看到運行中的兩個receive_logs.py程序:
$ sudo rabbitmqctl list_bindings Listing bindings ... ... logs amq.gen-TJWkez28YpImbWdRKMa8sg== [] logs amq.gen-x0kymA4yPzAT6BoC/YP+zw== [] ...done.
在前面的教程中,咱們實現了一個簡單的日誌系統。能夠把日誌消息廣播給多個接收者。
本篇教程中咱們打算新增一個功能 —— 使得它可以只訂閱消息的一個字集。例如,咱們只須要把嚴重的錯誤日誌信息寫入日誌文件(存儲到磁盤),但同時仍然把全部的日誌信息輸出到控制檯中
前面的例子,咱們已經建立過綁定(bindings),代碼以下:
channel.queue_bind(exchange=exchange_name,
queue=queue_name)
綁定是指交換機和隊列的關係。能夠簡單理解爲:這個隊列對這個交換機的消息感興趣。
綁定的時候能夠帶上一個額外的routing_key參數。爲了不與basic_publish的參數混淆,咱們把它叫作綁定鍵(binding key)。如下是如何建立一個帶綁定鍵的綁定。
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')
綁定鍵的意義取決於交換機(exchange)的類型。咱們以前使用過的扇型交換機(fanout exchanges)會忽略這個值。
咱們的日誌系統廣播全部的消息給全部的消費者(consumers)。咱們打算擴展它,使其基於日誌的嚴重程度進行消息過濾。例如咱們也許只是但願將比較嚴重的錯誤(error)日誌寫入磁盤,以避免在警告(warning)或者信息(info)日誌上浪費磁盤空間。
咱們使用的扇型交換機(fanout exchange)沒有足夠的靈活性 —— 它能作的僅僅是廣播。
咱們將會使用直連交換機(direct exchange)來代替。路由的算法很簡單 —— 交換機將會對綁定鍵(binding key)和路由鍵(routing key)進行精確匹配,從而肯定消息該分發到哪一個隊列。
下圖可以很好的描述這個場景:
在這個場景中,咱們能夠看到直連交換機 X和兩個隊列進行了綁定。第一個隊列使用orange做爲綁定鍵,第二個隊列有兩個綁定,一個使用black做爲綁定鍵,另一個使用green。
這樣以來,當路由鍵爲orange的消息發佈到交換機,就會被路由到隊列Q1。路由鍵爲black或者green的消息就會路由到Q2。其餘的全部消息都將會被丟棄。
咱們將會發送消息到一個直連交換機,把日誌級別做爲路由鍵。這樣接收日誌的腳本就能夠根據嚴重級別來選擇它想要處理的日誌。咱們先看看發送日誌。
咱們須要建立一個交換機(exchange):
channel.exchange_declare(exchange='direct_logs', type='direct')
而後咱們發送一則消息:
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
咱們先假設「severity」的值是info、warning、error中的一個。
處理接收消息的方式和以前差很少,只有一個例外,咱們將會爲咱們感興趣的每一個嚴重級別分別建立一個新的綁定。
result = channel.queue_declare(exclusive=True) queue_name = result.method.queue for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
emit_log_direct.py的代碼:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print " [x] Sent %r:%r" % (severity, message) connection.close()
receive_logs_direct.py的代碼:
$ python receive_logs_direct.py warning error > logs_from_rabbit.log
若是你但願全部的日誌信息都輸出到屏幕中,打開一個新的終端,而後輸入:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = sys.argv[1:] if not severities: print >> sys.stderr, "Usage: %s [info] [warning] [error]" % \ (sys.argv[0],) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r:%r" % (method.routing_key, body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
若是你但願只是保存warning和error級別的日誌到磁盤,只須要打開控制檯並輸入
$ python receive_logs_direct.py info warning error [*] Waiting for logs. To exit press CTRL+C
若是要觸發一個error級別的日誌,只須要輸入:
$ python emit_log_direct.py error "Run. Run. Or it will explode." [x] Sent 'error':'Run. Run. Or it will explode.'
發送到主題交換機(topic exchange)的消息不能夠攜帶隨意什麼樣子的路由鍵(routing_key),它的路由鍵必須是一個由.
分隔開的詞語列表。這些單詞隨即是什麼均可以,可是最好是跟攜帶它們的消息有關係的詞彙。如下是幾個推薦的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。詞語的個數能夠隨意,可是不要超過255字節。
綁定鍵也必須擁有一樣的格式。主題交換機背後的邏輯跟直連交換機很類似 —— 一個攜帶着特定路由鍵的消息會被主題交換機投遞給綁定鍵與之想匹配的隊列。可是它的綁定鍵和路由鍵有兩個特殊應用方式:
*
(星號) 用來表示一個單詞.#
(井號) 用來表示任意數量(零個或多個)單詞。這個例子裏,咱們發送的全部消息都是用來描述小動物的。發送的消息所攜帶的路由鍵是由三個單詞所組成的,這三個單詞被兩個.
分割開。路由鍵裏的第一個單詞描述的是動物的手腳的利索程度,第二個單詞是動物的顏色,第三個是動物的種類。因此它看起來是這樣的: <celerity>.<colour>.<species>
。
咱們建立了三個綁定:Q1的綁定鍵爲 *.orange.*
,Q2的綁定鍵爲 *.*.rabbit
和 lazy.#
。
這三個綁定鍵被能夠總結爲:
一個攜帶有 quick.orange.rabbit
的消息將會被分別投遞給這兩個隊列。攜帶着 lazy.orange.elephant
的消息一樣也會給兩個隊列都投遞過去。另外一方面攜帶有 quick.orange.fox
的消息會投遞給第一個隊列,攜帶有 lazy.brown.fox
的消息會投遞給第二個隊列。攜帶有 lazy.pink.rabbit
的消息只會被投遞給第二個隊列一次,即便它同時匹配第二個隊列的兩個綁定。攜帶着 quick.brown.fox
的消息不會投遞給任何一個隊列。
若是咱們違反約定,發送了一個攜帶有一個單詞或者四個單詞("orange"
or "quick.orange.male.rabbit"
)的消息時,發送的消息不會投遞給任何一個隊列,並且會丟失掉。
可是另外一方面,即便 "lazy.orange.male.rabbit"
有四個單詞,他仍是會匹配最後一個綁定,而且被投遞到第二個隊列中。
主題交換機
主題交換機是很強大的,它能夠表現出跟其餘交換機相似的行爲
當一個隊列的綁定鍵爲 "#"(井號) 的時候,這個隊列將會無視消息的路由鍵,接收全部的消息。
當
*
(星號) 和#
(井號) 這兩個特殊字符都未在綁定鍵中出現的時候,此時主題交換機就擁有的直連交換機的行爲。
接下來咱們會將主題交換機應用到咱們的日誌系統中。在開始工做前,咱們假設日誌的路由鍵由兩個單詞組成,路由鍵看起來是這樣的:<facility>.<severity>
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print " [x] Sent %r:%r" % (routing_key, message) connection.close()
receive_logs_topic.py的代碼:
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r:%r" % (method.routing_key, body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
執行下邊命令 接收全部日誌:
python receive_logs_topic.py "#"
執行下邊命令 接收來自」kern「設備的日誌:
python receive_logs_topic.py "kern.*"
執行下邊命令 只接收嚴重程度爲」critical「的日誌:
python receive_logs_topic.py "*.critical"
執行下邊命令 創建多個綁定:
python receive_logs_topic.py "kern.*" "*.critical"
執行下邊命令 發送路由鍵爲 "kern.critical" 的日誌:
python emit_log_topic.py "kern.critical" "A critical kernel error"