RabbitMQ能作些什麼?
消息系統容許軟件、應用相互鏈接和擴展.這些應用能夠相互連接起來組成一個更大的應用,或者將用戶設備和數據進行鏈接.消息系統經過將消息的發送和接收分離來實現應用程序的異步和解偶.html
或許你正在考慮進行數據投遞,非阻塞操做或推送通知。或許你想要實現發佈/訂閱,異步處理,或者工做隊列。全部這些均可以經過消息系統實現。python
RabbitMQ是一個消息代理 - 一個消息系統的媒介。它能夠爲你的應用提供一個通用的消息發送和接收平臺,而且保證消息在傳輸過程當中的安全。git
技術亮點
可靠性
RabbitMQ提供了多種技術可讓你在性能和可靠性之間進行權衡。這些技術包括持久性機制、投遞確認、發佈者證明和高可用性機制。github
靈活的路由
消息在到達隊列前是經過交換機進行路由的。RabbitMQ爲典型的路由邏輯提供了多種內置交換機類型。若是你有更復雜的路由需求,能夠將這些交換機組合起來使用,你甚至能夠實現本身的交換機類型,而且當作RabbitMQ的插件來使用。shell
集羣
在相同局域網中的多個RabbitMQ服務器能夠聚合在一塊兒,做爲一個獨立的邏輯代理來使用。編程
聯合
對於服務器來講,它比集羣須要更多的鬆散和非可靠連接。爲此RabbitMQ提供了聯合模型。小程序
高可用的隊列
在同一個集羣裏,隊列能夠被鏡像到多個機器中,以確保當其中某些硬件出現故障後,你的消息仍然安全。緩存
多協議
RabbitMQ 支持多種消息協議的消息傳遞。安全
普遍的客戶端
只要是你能想到的編程語言幾乎都有與其相適配的RabbitMQ客戶端。bash
可視化管理工具
RabbitMQ附帶了一個易於使用的可視化管理工具,它能夠幫助你監控消息代理的每個環節。
追蹤
若是你的消息系統有異常行爲,RabbitMQ還提供了追蹤的支持,讓你可以發現問題所在。
插件系統
RabbitMQ附帶了各類各樣的插件來對本身進行擴展。你甚至也能夠寫本身的插件來使用。
AMQP協議
AMQP是什麼
AMQP(高級消息隊列協議)是一個網絡協議。它支持符合要求的客戶端應用(application)和消息中間件代理(messaging middleware broker)之間進行通訊。
消息代理和他們所扮演的角色
消息代理(message brokers)從發佈者(publishers)亦稱生產者(producers)那兒接收消息,並根據既定的路由規則把接收到的消息發送給處理消息的消費者(consumers)。
因爲AMQP是一個網絡協議,因此這個過程當中的發佈者,消費者,消息代理 能夠存在於不一樣的設備上。
介紹
RabbitMQ是一個消息代理。它的工做就是接收和轉發消息。你能夠把它想像成一個郵局:你把信件放入郵箱,郵遞員就會把信件投遞到你的收件人處。在這個比喻中,RabbitMQ就扮演着郵箱、郵局以及郵遞員的角色。
RabbitMQ和郵局的主要區別在於,它處理紙張,而是接收、存儲和發送消息(message)這種二進制數據。
下面是RabbitMQ和消息所涉及到的一些術語。
-
生產(Producing)的意思就是發送。發送消息的程序就是一個生產者(producer)。咱們通常用"P"來表示:
-
隊列(queue)就是存在於RabbitMQ中郵箱的名稱。雖然消息的傳輸通過了RabbitMQ和你的應用程序,可是它只能被存儲於隊列當中。實質上隊列就是個巨大的消息緩衝區,它的大小隻受主機內存和硬盤限制。多個生產者(producers)能夠把消息發送給同一個隊列,一樣,多個消費者(consumers)也可以從同一個隊列(queue)中獲取數據。隊列能夠繪製成這樣(圖上是隊列的名稱):
-
在這裏,消費(Consuming)和接收(receiving)是同一個意思。一個消費者(consumer)就是一個等待獲取消息的程序。咱們把它繪製爲"C":
須要指出的是生產者、消費者、代理需不要待在同一個設備上;事實上大多數應用也確實不在會將他們放在一臺機器上。
Hello World!
(使用pika 0.10.0 Python客戶端)
接下來咱們用Python寫兩個小程序。一個發送單條消息的生產者(producer)和一個接收消息並將其輸出的消費者(consumer)。傳遞的消息是"Hello World"。
下圖中,「P」表明生產者,「C」表明消費者,中間的盒子表明爲消費者保留的消息緩衝區,也就是咱們的隊列。
生產者(producer)把消息發送到一個名爲「hello」的隊列中。消費者(consumer)從這個隊列中獲取消息。
RabbitMQ庫
RabbitMQ使用的是AMQP 0.9.1協議。這是一個用於消息傳遞的開放、通用的協議。針對不一樣編程語言有大量的RabbitMQ客戶端可用。在這個系列教程中,RabbitMQ團隊推薦使用Pika這個Python客戶端。你們能夠經過pip這個包管理工具進行安裝:
發送
咱們第一個程序send.py
會發送一個消息到隊列中。首先要作的事情就是創建一個到RabbitMQ服務器的鏈接。
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel()
如今咱們已經跟本地機器的代理創建了鏈接。若是你想鏈接到其餘機器的代理上,須要把表明本地的localhost
改成指定的名字或IP地址。
接下來,在發送消息以前,咱們須要確認服務於消費者的隊列已經存在。若是將消息發送給一個不存在的隊列,RabbitMQ會將消息丟棄掉。下面咱們建立一個名爲"hello"的隊列用來將消息投遞進去。
channel.queue_declare(queue='hello')
這時候咱們就能夠發送消息了,咱們第一條消息只包含了Hello World!字符串,咱們打算把它發送到hello隊列。
在RabbitMQ中,消息是不能直接發送到隊列中的,這個過程須要經過交換機(exchange)來進行。可是爲了避免讓細節拖累咱們的進度,這裏咱們只須要知道如何使用由空字符串表示的默認交換機便可。若是你想要詳細瞭解交換機,能夠查看咱們教程的第三部分來獲取更多細節。默認交換機比較特別,它容許咱們指定消息究竟須要投遞到哪一個具體的隊列中,隊列名字須要在routing_key
參數中指定。
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'")
在退出程序以前,咱們須要確認網絡緩衝已經被刷寫、消息已經投遞到RabbitMQ。經過安全關閉鏈接能夠作到這一點。
connection.close()
發送不成功!
若是這是你第一次使用RabbitMQ,而且沒有看到「Sent」消息出如今屏幕上,你可能會抓耳撓腮不知因此。這也許是由於沒有足夠的磁盤空間給代理使用所形成的(代理默認須要200MB的空閒空間),因此它纔會拒絕接收消息。查看一下代理的日誌文件進行確認,若是須要的話也能夠減小限制。配置文件文檔會告訴你如何更改磁盤空間限制(disk_free_limit)。
接收
咱們的第二個程序receive.py
,將會從隊列中獲取消息並將其打印到屏幕上。
此次咱們仍是須要要先鏈接到RabbitMQ服務器。鏈接服務器的代碼和以前是同樣的。
下一步也和以前同樣,咱們須要確認隊列是存在的。咱們能夠屢次使用queue_declare
命令來建立同一個隊列,可是隻有一個隊列會被真正的建立。
channel.queue_declare(queue='hello')
你也許要問: 爲何要重複聲明隊列呢 —— 咱們已經在前面的代碼中聲明過它了。若是咱們肯定了隊列是已經存在的,那麼咱們能夠不這麼作,好比此前預先運行了send.py程序。但是咱們並不肯定哪一個程序會首先運行。這種狀況下,在程序中重複將隊列重複聲明一下是種值得推薦的作法。
列出全部隊列
你也許但願查看RabbitMQ中有哪些隊列、有多少消息在隊列中。此時你可使用rabbitmqctl工具(使用有權限的用戶):
sudo rabbitmqctl list_queues(在Windows中不須要sudo命令)
rabbitmqctl list_queues
從隊列中獲取消息相對來講稍顯複雜。須要爲隊列定義一個回調(callback)函數。當咱們獲取到消息的時候,Pika庫就會調用此回調函數。這個回調函數會將接收到的消息內容輸出到屏幕上。
def callback(ch, method, properties, body): print(" [x] Received %r" % body)
下一步,咱們須要告訴RabbitMQ這個回調函數將會從名爲"hello"的隊列中接收消息:
channel.basic_consume(callback,
queue='hello', no_ack=True)
要成功運行這些命令,咱們必須保證隊列是存在的,咱們的確能夠確保它的存在——由於咱們以前已經使用queue_declare
將其聲明過了。
no_ack
參數稍後會進行介紹。
最後,咱們運行一個用來等待消息數據而且在須要的時候運行回調函數的無限循環。
print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
將代碼整合到一塊兒
send.py的完整代碼:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
receive.py的完整代碼:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
如今咱們能夠在終端中嘗試一下咱們的程序了。
首先咱們啓動一個消費者,它會持續的運行來等待投遞到達。
python receive.py
# => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Hello World!'
而後啓動生產者,生產者程序每次執行後都會中止運行。
python send.py
# => [x] Sent 'Hello World!'
成功了!咱們已經經過RabbitMQ發送第一條消息。你也許已經注意到了,receive.py程序並無退出。它一直在準備獲取消息,你能夠經過Ctrl-C來停止它。
試下在新的終端中再次運行send.py
。
咱們已經學會如何發送消息到一個已知隊列中並接收消息。是時候移步到第二部分了,咱們將會創建一個簡單的工做隊列(work queue)。
工做隊列
(使用pika 0.9.5 Python客戶端)
在第一篇教程中,咱們已經寫了一個從已知隊列中發送和獲取消息的程序。在這篇教程中,咱們將建立一個工做隊列(Work Queue),它會發送一些耗時的任務給多個工做者(Worker)。
工做隊列(又稱:任務隊列——Task Queues)是爲了不等待一些佔用大量資源、時間的操做。當咱們把任務(Task)看成消息發送到隊列中,一個運行在後臺的工做者(worker)進程就會取出任務而後處理。當你運行多個工做者(workers),任務就會在它們之間共享。
這個概念在網絡應用中是很是有用的,它能夠在短暫的HTTP請求中處理一些複雜的任務。
準備
以前的教程中,咱們發送了一個包含「Hello World!」的字符串消息。如今,咱們將發送一些字符串,把這些字符串看成複雜的任務。咱們沒有真實的例子,例如圖片縮放、pdf文件轉換。因此使用time.sleep()函數來模擬這種狀況。咱們在字符串中加上點號(.)來表示任務的複雜程度,一個點(.)將會耗時1秒鐘。好比"Hello..."就會耗時3秒鐘。
咱們對以前教程的send.py作些簡單的調整,以即可以發送隨意的消息。這個程序會按照計劃發送任務到咱們的工做隊列中。咱們把它命名爲new_task.py:
import sys message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print " [x] Sent %r" % (message,)
咱們的舊腳本(receive.py)一樣須要作一些改動:它須要爲消息體中每個點號(.)模擬1秒鐘的操做。它會從隊列中獲取消息並執行,咱們把它命名爲worker.py:
import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done"
循環調度:
使用工做隊列的一個好處就是它可以並行的處理隊列。若是堆積了不少任務,咱們只須要添加更多的工做者(workers)就能夠了,擴展很簡單。
首先,咱們先同時運行兩個worker.py腳本,它們都會從隊列中獲取消息,究竟是不是這樣呢?咱們看看。
你須要打開三個終端,兩個用來運行worker.py腳本,這兩個終端就是咱們的兩個消費者(consumers)—— C1 和 C2。
shell1$ python worker.py [*] Waiting for messages. To exit press CTRL+C
shell2$ python worker.py [*] Waiting for messages. To exit press CTRL+C
第三個終端,咱們用來發布新任務。你能夠發送一些消息給消費者(consumers):
shell3$ python new_task.py First message. shell3$ python new_task.py Second message.. shell3$ python new_task.py Third message... shell3$ python new_task.py Fourth message.... shell3$ python new_task.py Fifth message.....
看看到底發送了什麼給咱們的工做者(workers):
shell1$ python worker.py
[*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
shell2$ python worker.py
[*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
默認來講,RabbitMQ會按順序得把消息發送給每一個消費者(consumer)。平均每一個消費者都會收到同等數量得消息。這種發送消息得方式叫作——輪詢(round-robin)。試着添加三個或更多得工做者(workers)。
消息確認
當處理一個比較耗時得任務的時候,你也許想知道消費者(consumers)是否運行到一半就掛掉。當前的代碼中,當消息被RabbitMQ發送給消費者(consumers)以後,立刻就會在內存中移除。這種狀況,你只要把一個工做者(worker)中止,正在處理的消息就會丟失。同時,全部發送到這個工做者的尚未處理的消息都會丟失。
咱們不想丟失任何任務消息。若是一個工做者(worker)掛掉了,咱們但願任務會從新發送給其餘的工做者(worker)。
爲了防止消息丟失,RabbitMQ提供了消息響應(acknowledgments)。消費者會經過一個ack(響應),告訴RabbitMQ已經收到並處理了某條消息,而後RabbitMQ就會釋放並刪除這條消息。
若是消費者(consumer)掛掉了,沒有發送響應,RabbitMQ就會認爲消息沒有被徹底處理,而後從新發送給其餘消費者(consumer)。這樣,及時工做者(workers)偶爾的掛掉,也不會丟失消息。
消息是沒有超時這個概念的;當工做者與它斷開連的時候,RabbitMQ會從新發送消息。這樣在處理一個耗時很是長的消息任務的時候就不會出問題了。
消息響應默認是開啓的。以前的例子中咱們可使用no_ack=True標識把它關閉。是時候移除這個標識了,當工做者(worker)完成了任務,就發送一個響應。
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello')
運行上面的代碼,咱們發現即便使用CTRL+C殺掉了一個工做者(worker)進程,消息也不會丟失。當工做者(worker)掛掉這後,全部沒有響應的消息都會從新發送。
忘記確認
一個很容易犯的錯誤就是忘了basic_ack,後果很嚴重。消息在你的程序退出以後就會從新發送,若是它不可以釋放沒響應的消息,RabbitMQ就會佔用愈來愈多的內存。
爲了排除這種錯誤,你可使用rabbitmqctl命令,輸出messages_unacknowledged字段:
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done.
消息持久化
若是你沒有特地告訴RabbitMQ,那麼在它退出或者崩潰的時候,將會丟失全部隊列和消息。爲了確保信息不會丟失,有兩個事情是須要注意的:咱們必須把「隊列」和「消息」設爲持久化。
首先,爲了避免讓隊列消失,須要把隊列聲明爲持久化(durable):
channel.queue_declare(queue='hello', durable=True)
儘管這行代碼自己是正確的,可是仍然不會正確運行。由於咱們已經定義過一個叫hello的非持久化隊列。RabbitMq不容許你使用不一樣的參數從新定義一個隊列,它會返回一個錯誤。但咱們如今使用一個快捷的解決方法——用不一樣的名字,例如task_queue。
channel.queue_declare(queue='task_queue', durable=True)
這個queue_declare必須在生產者(producer)和消費者(consumer)對應的代碼中修改。
這時候,咱們就能夠確保在RabbitMq重啓以後queue_declare隊列不會丟失。另外,咱們須要把咱們的消息也要設爲持久化——將delivery_mode的屬性設爲2。
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
注意:消息持久化
將消息設爲持久化並不能徹底保證不會丟失。以上代碼只是告訴了RabbitMq要把消息存到硬盤,但從RabbitMq收到消息到保存之間仍是有一個很小的間隔時間。由於RabbitMq並非全部的消息都使用fsync(2)——它有可能只是保存到緩存中,並不必定會寫到硬盤中。並不能保證真正的持久化,但已經足夠應付咱們的簡單工做隊列。若是你必定要保證持久化,你須要改寫你的代碼來支持事務(transaction)。
公平調度
你應該已經發現,它仍舊沒有按照咱們指望的那樣進行分發。好比有兩個工做者(workers),處理奇數消息的比較繁忙,處理偶數消息的比較輕鬆。然而RabbitMQ並不知道這些,它仍然一如既往的派發消息。
這時由於RabbitMQ只管分發進入隊列的消息,不會關心有多少消費者(consumer)沒有做出響應。它盲目的把第n-th條消息發給第n-th個消費者。
咱們可使用basic.qos方法,並設置prefetch_count=1。這樣是告訴RabbitMQ,再同一時刻,不要發送超過1條消息給一個工做者(worker),直到它已經處理了上一條消息而且做出了響應。這樣,RabbitMQ就會把消息分發給下一個空閒的工做者(worker)。
channel.basic_qos(prefetch_count=1)
關於隊列大小
若是全部的工做者都處理繁忙狀態,你的隊列就會被填滿。你須要留意這個問題,要麼添加更多的工做者(workers),要麼使用其餘策略。
整合代碼
new_task.py的完整代碼:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print " [x] Sent %r" % (message,) connection.close()
(new_task.py源碼)
咱們的worker:
#!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
(worker.py source)
使用消息響應和prefetch_count你就能夠搭建起一個工做隊列了。這些持久化的選項使得在RabbitMQ重啓以後仍然可以恢復。