RabbitMQ+Python入門經典] 兔子和兔子窩

  

RabbitMQ做爲一個工業級的消息隊列服務器,在其客戶端手冊列表Python段當中推薦了一篇blog,做爲RabbitMQ+Python的入門手冊再合適不過了。不過,正如其標題Rabbit and Warrens(兔子和養兔場)同樣,這篇英文寫的至關俏皮,以致於對於我等非英文讀者來講不像通常的技術文檔那麼好懂,因此,翻譯一下吧。翻譯過了,但願其餘人能夠少用一些時間。翻譯水平有限,不可能像原文同樣俏皮,部分地方可能就意譯了,但願以容易懂爲準。想看看老外的幽默的,推薦去看原文,其實,也不是那麼難理解……html

原文:http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/java

兔子和兔子窩python

當時咱們的動機很簡單:從生產環境的電子郵件處理流程當中分支出一個特定的離線分析流程。咱們開始用的MySQL,將要處理的東西放在表裏面,另外一個程序從中取。不過很快,這種設計的醜陋之處就顯現出來了…… 你想要多個程序從一個隊列當中取數據來處理?沒問題,咱們硬編碼程序的個數好了……什麼?還要可以容許程序動態地增長和減小的時候動態進行壓力分配?mysql

是的,當年咱們想的簡單的東西(作一個分支處理)逐漸變成了一個棘手的問題。之前拿着錘子(MySQL)看全部東西都是釘子(表)的年代是多麼美好……git

在搜索了一下以後,咱們走進了消息隊列(message queue)的大門。不不,咱們固然知道消息隊列是什麼,咱們但是以作電子郵件程序謀生的。咱們實現過各類各樣的專業的,高速的內存隊列用來作電子郵件處理。咱們不知道的是那一大類現成的、通用的消息隊列(MQ)服務器——不管是用什麼語言寫出的,不須要複雜的裝配的,能夠天然的在網絡上的應用程序之間傳送數據的一類程序。不用咱們本身寫?看看再說。sql

讓你們看看大家的Queue吧……數據庫

過去的4年裏,人們寫了有好多好多的開源的MQ服務器啊。其中大多數都是某公司例如LiveJournal寫出來用來解決特定問題的。它們的確不關心上面跑的是什麼類型的消息,不過他們的設計思想一般是和建立者息息相關的(消息的持久化,崩潰恢復等一般不在他們考慮範圍內)。不過,有三個專門設計用來作及其靈活的消息隊列的程序值得關注:apache

·        Apache ActiveMQ編程

·        ZeroMQruby

·        RabbitMQ

Apache ActiveMQ 曝光率最高,不過看起來它有些問題,可能會形成丟消息。不可接受,下一個。

ZeroMQ 和 RabbitMQ 都支持一個開源的消息協議,成爲AMQP。AMQP的一個優勢是它是一個靈活和開放的協議,以便和另外兩個商業化的Message Queue (IBM和Tibco)競爭,很好。不過ZeroMQ不支持消息持久化和崩潰恢復,不太好。剩下的只有RabbitMQ了。若是你不在乎消息持久化和崩潰恢復,試試ZeroMQ吧,延遲很低,並且支持靈活的拓撲。

剩下的只有這個吃胡蘿蔔的傢伙了……

當我讀到它是用Erlang寫的時候,RabbitMQ震了我一下。Erlang 是愛立信開發的高度並行的語言,用來跑在電話交換機上。是的,那些要求6個9的在線時間的東西。在Erlang當中,充斥着大量輕量進程,它們之間用消息傳遞來通訊。聽起來思路和咱們用消息隊列的思路是同樣的,不是麼?

並且,RabbitMQ支持持久化。是的,若是RabbitMQ死掉了,消息並不會丟失,當隊列重啓,一切都會回來。並且,正如在DigiTar(注:原文做者的公司)作事情指望的那樣,它能夠和Python無縫結合。除此以外,RabbitMQ的文檔至關的……恐怖。若是你懂AMQP,這些文檔還好,可是有多少人懂AMQP?這些文檔就像MySQL的文檔假設你已經懂了SQL同樣……不過不要緊啦。

好了,廢話少說。這裏是花了一週時間閱讀關於AMQP和關於它如何在RabbitMQ上工做的文檔以後的一個總結,還有,怎麼在Python當中使用。

開始吧

AMQP當中有四個概念很是重要:虛擬主機(virtual host),交換機(exchange),隊列(queue)和綁定(binding)。一個虛擬主機持有一組交換機、隊列和綁定。爲何須要多個虛擬主機呢?很簡單,RabbitMQ當中,用戶只能在虛擬主機的粒度進行權限控制。所以,若是須要禁止A組訪問B組的交換機/隊列/綁定,必須爲A和B分別建立一個虛擬主機。每個RabbitMQ服務器都有一個默認的虛擬主機「/」。若是這就夠了,那如今就能夠開始了。

交換機,隊列,還有綁定……天哪!

剛開始我思惟的列車就是在這裏脫軌的…… 這些鬼東西怎麼結合起來的?

隊列(Queues)是你的消息(messages)的終點,能夠理解成裝消息的容器。消息就一直在裏面,直到有客戶端(也就是消費者,Consumer)鏈接到這個隊列而且將其取走爲止。不過。你能夠將一個隊列配置成這樣的:一旦消息進入這個隊列,biu~,它就煙消雲散了。這個有點跑題了……

須要記住的是,隊列是由消費者(Consumer)經過程序創建的,不是經過配置文件或者命令行工具。這沒什麼問題,若是一個消費者試圖建立一個已經存在的隊列,RabbitMQ就會起來拍拍他的腦殼,笑一笑,而後忽略這個請求。所以你能夠將消息隊列的配置寫在應用程序的代碼裏面。這個概念不錯。

OK,你已經建立而且鏈接到了你的隊列,你的消費者程序正在百無聊賴的敲着手指等待消息的到來,敲啊,敲啊…… 沒有消息。發生了什麼?你固然須要先把一個消息放進隊列才行。不過要作這個,你須要一個交換機(Exchange)……

交換機能夠理解成具備路由表的路由程序,僅此而已。每一個消息都有一個稱爲路由鍵(routing key)的屬性,就是一個簡單的字符串。交換機當中有一系列的綁定(binding),即路由規則(routes),例如,指明具備路由鍵 「X」 的消息要到名爲timbuku的隊列當中去。先不討論這個,咱們有點超前了。

你的消費者程序要負責建立你的交換機(複數)。啥?你是說你能夠有多個交換機?是的,這個能夠有,不過爲啥?很簡單,每一個交換機在本身獨立的進程當中執行,所以增長多個交換機就是增長多個進程,能夠充分利用服務器上的CPU核以便達到更高的效率。例如,在一個8核的服務器上,能夠建立5個交換機來用5個核,另外3個核留下來作消息處理。相似的,在RabbitMQ的集羣當中,你能夠用相似的思路來擴展交換機一邊獲取更高的吞吐量。

OK,你已經建立了一個交換機。可是他並不知道要把消息送到哪一個隊列。你須要路由規則,即綁定(binding)。一個綁定就是一個相似這樣的規則:將交換機「desert(沙漠)」當中具備路由鍵「阿里巴巴」的消息送到隊列「hideout(山洞)」裏面去。換句話說,一個綁定就是一個基於路由鍵將交換機和隊列鏈接起來的路由規則。例如,具備路由鍵「audit」的消息須要被送到兩個隊列,「log-forever」和「alert-the-big-dude」。要作到這個,就須要建立兩個綁定,每一個都鏈接一個交換機和一個隊列,二者都是由「audit」路由鍵觸發。在這種狀況下,交換機會複製一份消息而且把它們分別發送到兩個隊列當中。交換機不過就是一個由綁定構成的路由表。

如今複雜的東西來了:交換機有多種類型。他們都是作路由的,不過接受不一樣類型的綁定。爲何不建立一種交換機來處理全部類型的路由規則呢?由於每種規則用來作匹配分子的CPU開銷是不一樣的。例如,一個「topic」類型的交換機試圖將消息的路由鍵與相似「dogs.*」的模式進行匹配。匹配這種末端的通配符比直接將路由鍵與「dogs」比較(「direct」類型的交換機)要消耗更多的CPU。若是你不須要「topic」類型的交換機帶來的靈活性,你能夠經過使用「direct」類型的交換機獲取更高的處理效率。那麼有哪些類型,他們又是怎麼處理的呢?

Fanout Exchange – 不處理路由鍵。你只須要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的全部隊列上。很像子網廣播,每臺子網內的主機都得到了一份複製的消息。Fanout交換機轉發消息是最快的。

Direct Exchange – 處理路由鍵。須要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵徹底匹配。這是一個完整的匹配。若是一個隊列綁定到該交換機上要求路由鍵 「dog」,則只有被標記爲「dog」的消息才被轉發,不會轉發dog.puppy,也不會轉發dog.guard,只會轉發dog

Topic Exchange – 將路由鍵和某模式進行匹配。此時隊列須要綁定要一個模式上。符號「#」匹配一個或多個詞,符號「*」匹配很少很多一個詞。所以「audit.#」可以匹配到「audit.irs.corporate」,可是「audit.*」 只會匹配到「audit.irs」。我在RedHat的朋友作了一張不錯的圖,來代表topic交換機是如何工做的:

Source: Red Hat Messaging Tutorial: 1.3Topic Exchange

持久化這些小東西們

你花了大量的時間來建立隊列、交換機和綁定,而後,砰~服務器程序掛了。你的隊列、交換機和綁定怎麼樣了?還有,放在隊列裏面可是還沒有處理的消息們呢?

放鬆~若是你是用默認參數構造的這一切的話,那麼,他們,都,biu~,灰飛煙滅了。是的,RabbitMQ重啓以後會乾淨的像個新生兒。你必須重作全部的一切,亡羊補牢,如何避免未來再度發生此類杯具?

隊列和交換機有一個建立時候指定的標誌durable,直譯叫作堅固的。durable的惟一含義就是具備這個標誌的隊列和交換機會在重啓以後從新創建,它不表示說在隊列當中的消息會在重啓後恢復。那麼如何才能作到不僅是隊列和交換機,還有消息都是持久的呢?

可是首先一個問題是,你真的須要消息是持久的嗎?對於一個須要在重啓以後回覆的消息來講,它須要被寫入到磁盤上,而即便是最簡單的磁盤操做也是要消耗時間的。若是和消息的內容相比,你更看重的是消息處理的速度,那麼不要使用持久化的消息。不過對於咱們@DigiTar來講,持久化很重要。

當你將消息發佈到交換機的時候,能夠指定一個標誌「Delivery Mode」(投遞模式)。根據你使用的AMQP的庫不一樣,指定這個標誌的方法可能不太同樣(咱們後面會討論如何用Python搞定)。簡單的說,就是將Delivery Mode設置成2,也就是持久的(persistent)便可。通常的AMQP庫都是將Delivery Mode設置成1,也就是非持久的。因此要持久化消息的步驟以下:

1.   將交換機設成 durable。

2.   將隊列設成 durable。

3.   將消息的 Delivery Mode 設置成2 。

就這樣,不是很複雜,起碼沒有造火箭複雜,不過也有可能犯點小錯誤。

下面還要羅嗦一個東西……綁定(Bindings)怎麼辦?咱們沒法在建立綁定的時候設置成durable。沒問題,若是你綁定了一個durable的隊列和一個durable的交換機,RabbitMQ會自動保留這個綁定。相似的,若是刪除了某個隊列或交換機(不管是否是durable),依賴它的綁定都會自動刪除。

注意兩點:

·        RabbitMQ 不容許你綁定一個非堅固(non-durable)的交換機和一個durable的隊列。反之亦然。要想成功必須隊列和交換機都是durable的。

·        一旦建立了隊列和交換機,就不能修改其標誌了。例如,若是建立了一個non-durable的隊列,而後想把它改變成durable的,惟一的辦法就是刪除這個隊列而後重現建立。所以,最好仔細檢查建立的標誌。

開始喂蛇了~

【譯註】說喂蛇是由於Python的圖標是條蛇。

AMQP的一個空白地帶是如何在Python當中使用。對於其餘語言有一大坨材料。

·        Java – http://www.rabbitmq.com/java-client.html

·        .NET – http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.5.0/rabbitmq-dotnet-client-1.5.0-user-guide.pdf

·        Ruby – http://somic.org/2008/06/24/ruby-amqp-rabbitmq-example/

可是對Python老兄來講,你須要花點時間來挖掘一下。因此我寫了這個,這樣別的傢伙們就不須要經歷我這種抓狂的過程了。

首先,咱們須要一個Python的AMQP庫。有兩個可選:

·        py-amqplib – 通用的AMQP

·        txAMQP – 使用 Twisted 框架的AMQP庫,所以容許異步I/O。

根據你的需求,py-amqplib或者txAMQP都是能夠的。由於是基於Twisted的,txAMQP能夠保證用異步IO構建超高性能的AMQP程序。可是Twisted編程自己就是一個很大的主題……所以清晰起見,咱們打算用 py-amqplib。更新:請參見Esteve Fernandez關於txAMQP的使用和代碼樣例的回覆

AMQP支持在一個TCP鏈接上啓用多個MQ通訊channel,每一個channel均可以被應用做爲通訊流。每一個AMQP程序至少要有一個鏈接和一個channel。

  1. from amqplib import client_0_8 as amqp  

  2. conn = amqp.Connection(host="localhost:5672 ", userid="guest",  

  3. password="guest", virtual_host="/", insist=False)  

  4. chan = conn.channel()  

 

每一個channel都被分配了一個整數標識,自動由Connection()類的.channel()方法維護。或者,你可使用.channel(x)來指定channel標識,其中x是你想要使用的channel標識。一般狀況下,推薦使用.channel()方法來自動分配channel標識,以便防止衝突。

如今咱們已經有了一個能夠用的鏈接和channel。如今,咱們的代碼將分紅兩個應用,生產者(producer)和消費者(consumer)。咱們先建立一個消費者程序,他會建立一個叫作「po_box」的隊列和一個叫「sorting_room」的交換機:

  1. chan.queue_declare(queue="po_box", durable=True,  

  2. exclusive=False, auto_delete=False)  

  3. chan.exchange_declare(exchange="sorting_room", type="direct", durable=True,  

  4. auto_delete=False,)  

 

這段代碼幹了啥?首先,它建立了一個名叫「po_box」的隊列,它是durable的(重啓以後會從新創建),而且最後一個消費者斷開的時候不會自動刪除(auto_delete=False)。在建立durable的隊列(或者交換機)的時候,將auto_delete設置成false是很重要的,不然隊列將會在最後一個消費者斷開的時候消失,與durable與否無關。若是將durable和auto_delete都設置成True,只有尚有消費者活動的隊列能夠在RabbitMQ意外崩潰的時候自動恢復。

(你能夠注意到了另外一個標誌,稱爲「exclusive」。若是設置成True,只有建立這個隊列的消費者程序才容許鏈接到該隊列。這種隊列對於這個消費者程序是私有的)。

還有另外一個交換機聲明,建立了一個名字叫「sorting_room」的交換機。auto_delete和durable的含義和隊列是同樣的。可是,.excange_declare() 還有另一個參數叫作type,用來指定要建立的交換機的類型(如前面列出的): fanoutdirect 和 topic.

到此爲止,你已經有了一個能夠接收消息的隊列和一個能夠發送消息的交換機。不過咱們須要建立一個綁定,把它們鏈接起來。

chan.queue_bind(queue=」po_box」,exchange=」sorting_room」,
routing_key=」jason」)

這個綁定的過程很是直接。任何送到交換機「sorting_room」的具備路由鍵「jason」 的消息都被路由到名爲「po_box」 的隊列。

如今,你有兩種方法從隊列當中取出消息。第一個是調用chan.basic_get(),主動從隊列當中拉出下一個消息(若是隊列當中沒有消息,chan.basic_get()會返回None, 所以下面代碼當中print msg.body 會在沒有消息的時候崩掉):

  1. msg = chan.basic_get("po_box")  

  2. print msg.body  

  3. chan.basic_ack(msg.delivery_tag)  

 

可是若是你想要應用程序在消息到達的時候當即獲得通知怎麼辦?這種狀況下不能使用chan.basic_get(),你須要用chan.basic_consume()註冊一個新消息到達的回調。

  1. def recv_callback(msg):  

  2.     print 'Received: ' + msg.body  

  3. chan.basic_consume(queue='po_box', no_ack=True,  

  4. callback=recv_callback, consumer_tag="testtag")  

  5. while True:  

  6.     chan.wait()  

  7. chan.basic_cancel("testtag")  

 

chan.wait() 放在一個無限循環裏面,這個函數會等待在隊列上,直到下一個消息到達隊列。chan.basic_cancel() 用來註銷該回調函數。參數consumer_tag 當中指定的字符串和chan.basic_consume() 註冊的一直。在這個例子當中chan.basic_cancel() 不會被調用到,由於上面是個無限循環…… 不過你須要知道這個調用,因此我把它放在了代碼裏。

須要注意的另外一個東西是no_ack參數。這個參數能夠傳給chan.basic_get()chan.basic_consume(),默認是false。當從隊列當中取出一個消息的時候,RabbitMQ須要應用顯式地回饋說已經獲取到了該消息。若是一段時間內不回饋,RabbitMQ會將該消息從新分配給另一個綁定在該隊列上的消費者。另外一種狀況是消費者斷開鏈接,可是獲取到的消息沒有回饋,則RabbitMQ一樣從新分配。若是將no_ack 參數設置爲true,則py-amqplib會爲下一個AMQP請求添加一個no_ack屬性,告訴AMQP服務器不須要等待回饋。可是,大多數時候,你也許想要本身手工發送回饋,例如,須要在回饋以前將消息存入數據庫。回饋一般是經過調用chan.basic_ack()方法,使用消息的delivery_tag屬性做爲參數。參見chan.basic_get() 的實例代碼。

好了,這就是消費者的所有代碼。(下載:amqp_consumer.py

不過沒有人發送消息的話,要消費者何用?因此須要一個生產者。下面的代碼示例代表如何將一個簡單消息發送到交換區「sorting_room」,而且標記爲路由鍵「jason」 :

  1. msg = amqp.Message("Test message!")  

  2. msg.properties["delivery_mode"] = 2  

  3. chan.basic_publish(msg,exchange="sorting_room",routing_key="jason")  

 

你也許注意到咱們設置消息的delivery_mode屬性爲2,由於隊列和交換機都設置爲durable的,這個設置將保證消息可以持久化,也就是說,當它尚未送達消費者以前若是RabbitMQ重啓則它可以被恢復。

剩下的最後一件事情(生產者和消費者都須要調用的)是關閉channel和鏈接:

chan.close()

conn.close()

很簡單吧。(下載:amqp_publisher.py

來真實地跑一下吧……

如今咱們已經寫好了生產者和消費者,讓他們跑起來吧。假設你的RabbitMQ在localhost上安裝而且運行。

打開一個終端,執行python ./amqp_consumer.py讓消費者運行,而且建立隊列、交換機和綁定。

而後在另外一個終端運行python ./amqp_publisher.py 「AMQP rocks.」 。若是一切良好,你應該可以在第一個終端看到輸出的消息。

付諸使用吧

我知道這個教程是很是粗淺的關於AMQP/RabbitMQ和如何使用Python訪問的教程。但願這個能夠說明全部的概念如何在Python當中被組合起來。若是你發現任何錯誤,請聯繫原做者(williamsjj@digitar.com) 【譯註:若是是翻譯問題請聯繫譯者】。同時,我很高興回答我知道的問題。【譯註:譯者也是同樣的】。接下來是,集羣化(clustering)!不過我須要先把它弄懂再說。

注:關於RabbitMQ的知識我主要來自這些來源,推薦閱讀:

·        zeromq:消息中間件分析

·        RabbitMQ .NET客戶端庫用戶手冊

·        高級消息隊列協議(Advanced Message QueuingProtocol):協議規約0.8 版本

–完–

相關文章
相關標籤/搜索