在Centos上安裝RabbitMQ流程(轉)

在Centos上安裝RabbitMQ流程
------------------------

html

1. 需求

   因爲項目中要用到消息隊列,通過ActiveMQ與RabbitMQ的比較,最終選擇了RabbbitMQ作爲咱們的消息系統,可是ActiveMQ在效率和可擴展性上都不錯,只是網上不少人反應它會時常崩潰,並且隨着消息併發數的增長,時常會出現鏈接很慢的狀況。
   目前我測試的服務器系統信息以下:node

  1. LSB Version:    :core-3.1-amd64:core-3.1-ia32:core-3.1-noarch:graphics-3.1-amd64:graphics-3.1-ia32:graphics-3.1-noarch  
  2. Distributor ID: CentOS  
  3. Description:    CentOS release 5.4 (Final)  
  4. Release:    5.4  
  5. Codename:   Final  


   因爲ActiveMQ是用erlang寫的,這裏簡單介紹一下這個傢伙是什麼,Erlang 是由愛立信公司開發的一種平臺式語言,能夠說是一種自帶了操做系統平臺的編程語言,並且在這個平臺上實現了併發機制、 進程調度、內存管理、分佈式計算、網絡通信等功能,這些功能都是徹底獨立於用戶的操做系統的,它採用的是相似於Java同樣的虛擬機的方式來實現對操做系統的獨立性的。
  
  一面是它的特色:  python

  1. 併發性:Erlang的輕量級進程能夠支持極高的併發性,並且在高併發的狀況下內存使用至關的少。Erlang的併發性並不會受到宿主操做系統併發性的限制。
  2. 分佈式:最開始Erlang的設計目標就是實現分佈式環境,一個Erlang的虛擬機就是一個Erlang網絡上的節點。一個 Erlang節點能夠在另外一個Erlang節點上建立本身的併發進程,而子進程所在的節點多是運行其餘的操做系統的服務器。不一樣節點的之間的能夠進行極爲高效而又精確的通訊,就像它們運行在同一個節點同樣。
  3. 魯棒形:Erlang內部建設有多種錯誤檢測原語。咱們能夠經過這些原語來架設高容錯性的系統。例如,一個進程能夠監視其餘進程的狀態和活動,即便那些被監控的進程處於其餘節點。在分佈式狀態下,咱們能夠把系統配置成具備Fail-over功能的分佈式系統。當有其餘節點出錯的時候,系統會把他的運行場景自動快速的切換備份節點上。Erlang支持9個9的級別的故障率,一年只有幾分鐘的故障時間。
  4. 軟實時:Erlang是一個「軟」實時系統(Soft Real Time),它能夠提供毫秒級別的響應。

2. AMQP的一些概念 

    注: 下面內容是參考http://blog.ftofficer.com/tag/rabbitmq/這篇文章的git

   AMQP當中有四個概念很是重要:虛擬主機(virtual host),交換機(exchange),隊列(queue)和綁定(binding)。一個虛擬主機持有一組交換機、隊列和綁定。爲何須要多個虛擬主機呢?很簡單,RabbitMQ當中,用戶只能在虛擬主機的粒度進行權限控制。所以,若是須要禁止A組訪問B組的交換機/隊列/綁定,必須爲A和B分別建立一個虛擬主機。每個RabbitMQ服務器都有一個默認的虛擬主機「/」。若是這就夠了,那如今就能夠開始了。

交換機,隊列,還有綁定……天哪!
剛開始我思惟的列車就是在這裏脫軌的…… 這些鬼東西怎麼結合起來的?

隊列(Queues)是你的消息(messages)的終點,能夠理解成裝消息的容器。消息就一直在裏面,直到有客戶端(也就是消費者,Consumer)鏈接到這個隊列而且將其取走爲止。不過。你能夠將一個隊列配置成這樣的:一旦消息進入這個隊列,biu~,它就煙消雲散了。這個有點跑題了……

須要記住的是,隊列是由消費者(Consumer)經過程序創建的,不是經過配置文件或者命令行工具。這沒什麼問題,若是一個消費者試圖建立一個已經存在的隊列,RabbitMQ就會起來拍拍他的腦殼,笑一笑,而後忽略這個請求。所以你能夠將消息隊列的配置寫在應用程序的代碼裏面。這個概念不錯。

OK,你已經建立而且鏈接到了你的隊列,你的消費者程序正在百無聊賴的敲着手指等待消息的到來,敲啊,敲啊…… 沒有消息。發生了什麼?你固然須要先把一個消息放進隊列才行。不過要作這個,你須要一個交換機(Exchange)……

交換機能夠理解成具備路由表的路由程序,僅此而已。每一個消息都有一個稱爲路由鍵(routing key)的屬性,就是一個簡單的字符串。交換機當中有一系列的綁定(binding),即路由規則(routes),例如,指明具備路由鍵 「X」 的消息要到名爲timbuku的隊列當中去。先不討論這個,咱們有點超前了。

你的消費者程序要負責建立你的交換機們(複數)。啥?你是說你能夠有多個交換機?是的,這個能夠有,不過爲啥?很簡單,每一個交換機在本身獨立的進程當中執行,所以增長多個交換機就是增長多個進程,能夠充分利用服務器上的CPU核以便達到更高的效率。例如,在一個8核的服務器上,能夠建立5個交換機來用5個核,另外3個核留下來作消息處理。相似的,在RabbitMQ的集羣當中,你能夠用相似的思路來擴展交換機一邊獲取更高的吞吐量。

OK,你已經建立了一個交換機。可是他並不知道要把消息送到哪一個隊列。你須要路由規則,即綁定(binding)。一個綁定就是一個相似這樣的規則:將交換機「desert(沙漠)」當中具備路由鍵「阿里巴巴」的消息送到隊列「hideout(山洞)」裏面去。換句話說,一個綁定就是一個基於路由鍵將交換機和隊列鏈接起來的路由規則。例如,具備路由鍵「audit」的消息須要被送到兩個隊列,「log-forever」和「alert-the-big-dude」。要作到這個,就須要建立兩個綁定,每一個都鏈接一個交換機和一個隊列,二者都是由「audit」路由鍵觸發。在這種狀況下,交換機會複製一份消息而且把它們分別發送到兩個隊列當中。交換機不過就是一個由綁定構成的路由表。

如今複雜的東西來了:交換機有多種類型。他們都是作路由的,不過接受不一樣類型的綁定。爲何不建立一種交換機來處理全部類型的路由規則呢?由於每種規則用來作匹配分子的CPU開銷是不一樣的。例如,一個「topic」類型的交換機試圖將消息的路由鍵與相似「dogs.*」的模式進行匹配。匹配這種末端的通配符比直接將路由鍵與「dogs」比較(「direct」類型的交換機)要消耗更多的CPU。若是你不須要「topic」類型的交換機帶來的靈活性,你能夠經過使用「direct」類型的交換機獲取更高的處理效率。那麼有哪些類型,他們又是怎麼處理的呢?

Fanout Exchange – 不處理路由鍵。你只須要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的全部隊列上。很像子網廣播,每臺子網內的主機都得到了一份複製的消息。Fanout交換機轉發消息是最快的。

Direct Exchange – 處理路由鍵。須要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵徹底匹配。這是一個完整的匹配。若是一個隊列綁定到該交換機上要求路由鍵 「dog」,則只有被標記爲「dog」的消息才被轉發,不會轉發dog.puppy,也不會轉發dog.guard,只會轉發dog。

Topic Exchange – 將路由鍵和某模式進行匹配。此時隊列須要綁定要一個模式上。符號「#」匹配一個或多個詞,符號「*」匹配很少很多一個詞。所以「audit.#」可以匹配到「audit.irs.corporate」,可是「audit.*」 只會匹配到「audit.irs」。我在RedHat的朋友作了一張不錯的圖,來代表topic交換機是如何工做的:


github

3. 安裝流程

   好了,回到咱們的安裝依賴部分,因爲咱們測試時裝的是rabbitMQ 2.7.0版本,它依賴於erlang R12B-5以上的版本,你能夠在http://www.erlang.org/download.html下載到相應的erlang的源代碼,下載解析之後直接./configure,它會列出你機器上沒有安裝的依賴包,如我這邊有crypto,ssl,ssh,wxWidget都沒有安裝,因爲wxWidget是可選安裝的,因此我這邊沒有安裝,你能夠經過yum install來安裝它們,安裝完之後再./configure一下,成功之後make, sudo make install就安裝好了。安裝好之後就能夠經過在shell中輸入erl命令來驗證安裝的成功。
   
   用wget http://www.rabbitmq.com/releases/rabbitmq-server/v2.7.0/rabbitmq-server-generic-unix-2.7.0.tar.gz下載最新的源代碼,解壓,在編譯這前要先設置一些環境變量,通常配置方法以下
   export TARGET_DIR=/opt/rabbitmq
   export SBIN_DIR=/opt/rabbitmq/sbin
   export MAN_DIR=/opt/rabbitmq/man

   若是rabbitMQ服務只是當前用戶來用的話,能夠用 chown -R myuser /opt/rabbitmq 命令來改變其目錄權限

   下來是運行make與make install命令來安裝。

   最後還要設置一下日誌與消息持久化目錄,命令以下,其中的myuser是你當前的用戶名

web

  1. mkdir /var/log/rabbitmq  
  2. chown myuser /var/log/rabbitmq  
  3. mkdir /var/lib/rabbitmq  
  4. chown myuser /var/lib/rabbitmq  


  如今RabbitMQ已經安裝好了,如今要啓動它了,是否是好馬,遛不才知道啊,它的啓動也很簡單,運行命令sql

  1. cd /opt/rabbitmq/sbin  
  2. ./rabbitmq-server  


   固然你會爲啓動方便,能夠把/opt/rabbitmq/sbin下的命令在/usr/local/bin下作一個連接,命令以下
   sudo ln -s /opt/rabbitmq/sbin/rabbitmq-server /usr/local/bin/rabbitmq-server
   其它命令也同樣,這樣你就能夠在任何地方使用rabbitmq-server命令了。

   這樣就啓動了,是否是很簡單,默認是監聽當前的5672端口的,並且默認也是不須要配置的,固然你也能夠進行相應的配置,具體能夠參考以下:     http://www.rabbitmq.com/configure.html
   
   啓動之後會打印出以下信息shell

  1.   Activating RabbitMQ plugins ...  
  2. 0 plugins activated:  
  3. +---+   +---+  
  4. |   |   |   |  
  5. |   |   |   |  
  6. |   |   |   |  
  7. |   +---+   +-------+  
  8. |                   |  
  9. | RabbitMQ  +---+   |  
  10. |           |   |   |  
  11. |   v2.7.0  +---+   |  
  12. |                   |  
  13. +-------------------+  
  14. AMQP 0-9-1 / 0-9 / 0-8  
  15. Copyright (C) 2007-2011 VMware, Inc.  
  16. Licensed under the MPL.  See http://www.rabbitmq.com/  
  17.   
  18.   
  19. node           : rabbit@xunuu62  
  20. app descriptor : /opt/rabbitmq/sbin/../ebin/rabbit.app  
  21. home dir       : /home/xunuu  
  22. config file(s) : (none)  
  23. cookie hash    : 4OkU6/5RZ9ck4GPo4zyaKw==  
  24. log            : /var/log/rabbitmq/rabbit@xunuu62.log  
  25. sasl log       : /var/log/rabbitmq/rabbit@xunuu62-sasl.log  
  26. database dir   : /var/lib/rabbitmq/mnesia/rabbit@xunuu62  
  27. erlang version : 5.7.4  
  28.   
  29.   
  30. -- rabbit boot start  
  31. starting file handle cache server                                     ...done  
  32. starting worker pool                                                  ...done  
  33. starting database                                                     ...done  
  34. starting codec correctness check                                      ...done  



    RabbitMQ的管理插件的安裝:
    你能夠用以下命令安裝RabbitMQ的管理插件
   數據庫

  1. rabbitmq-plugins enable rabbitmq_management  


    我這邊輸出以下:

編程

 
  1. [xunuu@xunuu62 rabbitmq-server-2.7.0]$ rabbitmq-plugins enable rabbitmq_management  
  2. The following plugins have been enabled:  
  3.   mochiweb  
  4.   webmachine  
  5.   rabbitmq_mochiweb  
  6.   amqp_client  
  7.   rabbitmq_management_agent  
  8.   rabbitmq_management  
  9.   
  10. Plugin configuration has changed. Restart RabbitMQ for changes to take effect.  


    裝好之後要重啓rabbitMQ服務,關閉服務命令以下


  1. rabbitmqctl stop  


    再運行啓動命令:


  1. rabbit-server &  


    這樣你就能夠用瀏覽器訪問 http://server-name:55672/mgmt/ 來進行相應的管理 ,這裏的的默認用戶名密碼都是guest
    一些更加具體的配置能夠看這裏 http://www.rabbitmq.com/management.html


4. 測試 

    這裏有兩種測試方法,能夠在剛纔安裝的管理裏面進行測試,也能夠經過客戶端來進行測試,RabbitMQ支持豐富的客戶端語言

    這裏有相應的工具下載
    http://www.rabbitmq.com/devtools.html

    咱們這裏使用python來作一下測試,由於其代碼比較簡潔。下面內容是參考http://blog.ftofficer.com/tag/rabbitmq/這篇文章的

    首先,咱們須要一個Python的AMQP庫。有兩個可選:

py-amqplib – 通用的AMQP
txAMQP – 使用 Twisted 框架的AMQP庫,所以容許異步I/O。
根據你的需求,py-amqplib或者txAMQP都是能夠的。由於是基於Twisted的,txAMQP能夠保證用異步IO構建超高性能的AMQP程序。可是Twisted編程自己就是一個很大的主題……所以清晰起見,咱們打算用 py-amqplib。更新:請參見Esteve Fernandez關於txAMQP的使用和代碼樣例的回覆。

AMQP支持在一個TCP鏈接上啓用多個MQ通訊channel,每一個channel均可以被應用做爲通訊流。每一個AMQP程序至少要有一個鏈接和一個channel。

  1. from amqplib import client_0_8 as amqp  
  2. conn = amqp.Connection(host="localhost:5672 ", userid="guest",  
  3. password="guest", virtual_host="/", insist=False)  
  4. chan = conn.channel()  


每一個channel都被分配了一個整數標識,自動由Connection()類的.channel()方法維護。或者,你可使用.channel(x)來指定channel標識,其中x是你想要使用的channel標識。一般狀況下,推薦使用.channel()方法來自動分配channel標識,以便防止衝突。

如今咱們已經有了一個能夠用的鏈接和channel。如今,咱們的代碼將分紅兩個應用,生產者(producer)和消費者(consumer)。咱們先建立一個消費者程序,他會建立一個叫作「po_box」的隊列和一個叫「sorting_room」的交換機:


  1. chan.queue_declare(queue="po_box", durable=True,  
  2. exclusive=False, auto_delete=False)  
  3. chan.exchange_declare(exchange="sorting_room", type="direct", durable=True,  
  4. auto_delete=False,)  


這段代碼幹了啥?首先,它建立了一個名叫「po_box」的隊列,它是durable的(重啓以後會從新創建),而且最後一個消費者斷開的時候不會自動刪除(auto_delete=False)。在建立durable的隊列(或者交換機)的時候,將auto_delete設置成false是很重要的,不然隊列將會在最後一個消費者斷開的時候消失,與durable與否無關。若是將durable和auto_delete都設置成True,只有尚有消費者活動的隊列能夠在RabbitMQ意外崩潰的時候自動恢復。

(你能夠注意到了另外一個標誌,稱爲「exclusive」。若是設置成True,只有建立這個隊列的消費者程序才容許鏈接到該隊列。這種隊列對於這個消費者程序是私有的)。

還有另外一個交換機聲明,建立了一個名字叫「sorting_room」的交換機。auto_delete和durable的含義和隊列是同樣的。可是,.excange_declare() 還有另一個參數叫作type,用來指定要建立的交換機的類型(如前面列出的): fanout, direct 和 topic.

到此爲止,你已經有了一個能夠接收消息的隊列和一個能夠發送消息的交換機。不過咱們須要建立一個綁定,把它們鏈接起來。


  1. chan.queue_bind(queue=」po_box」, exchange=」sorting_room」,  
  2. routing_key=」jason」)  


這個綁定的過程很是直接。任何送到交換機「sorting_room」的具備路由鍵「jason」 的消息都被路由到名爲「po_box」 的隊列。

如今,你有兩種方法從隊列當中取出消息。第一個是調用chan.basic_get(),主動從隊列當中拉出下一個消息(若是隊列當中沒有消息,chan.basic_get()會返回None, 所以下面代碼當中print msg.body 會在沒有消息的時候崩掉):


  1. msg = chan.basic_get("po_box")  
  2. print msg.body  
  3. chan.basic_ack(msg.delivery_tag)  


可是若是你想要應用程序在消息到達的時候當即獲得通知怎麼辦?這種狀況下不能使用chan.basic_get(),你須要用chan.basic_consume()註冊一個新消息到達的回調。

  1. def recv_callback(msg):  
  2.     print 'Received: ' + msg.body  
  3. chan.basic_consume(queue='po_box', no_ack=True,  
  4. callback=recv_callback, consumer_tag="testtag")  
  5. while True:  
  6.     chan.wait()  
  7. chan.basic_cancel("testtag")  


chan.wait() 放在一個無限循環裏面,這個函數會等待在隊列上,直到下一個消息到達隊列。chan.basic_cancel() 用來註銷該回調函數。參數consumer_tag 當中指定的字符串和chan.basic_consume() 註冊的一直。在這個例子當中chan.basic_cancel() 不會被調用到,由於上面是個無限循環…… 不過你須要知道這個調用,因此我把它放在了代碼裏。

須要注意的另外一個東西是no_ack參數。這個參數能夠傳給chan.basic_get()和chan.basic_consume(),默認是false。當從隊列當中取出一個消息的時候,RabbitMQ須要應用顯式地回饋說已經獲取到了該消息。若是一段時間內不回饋,RabbitMQ會將該消息從新分配給另一個綁定在該隊列上的消費者。另外一種狀況是消費者斷開鏈接,可是獲取到的消息沒有回饋,則RabbitMQ一樣從新分配。若是將no_ack 參數設置爲true,則py-amqplib會爲下一個AMQP請求添加一個no_ack屬性,告訴AMQP服務器不須要等待回饋。可是,大多數時候,你也許想要本身手工發送回饋,例如,須要在回饋以前將消息存入數據庫。回饋一般是經過調用chan.basic_ack()方法,使用消息的delivery_tag屬性做爲參數。參見chan.basic_get() 的實例代碼。

好了,這就是消費者的所有代碼。(下載:amqp_consumer.py http://blogs.digitar.com/jjww/code-samples/amqp_consumer.py)

不過沒有人發送消息的話,要消費者何用?因此須要一個生產者。下面的代碼示例代表如何將一個簡單消息發送到交換區「sorting_room」,而且標記爲路由鍵「jason」 :

  1. msg = amqp.Message("Test message!")  
  2. msg.properties["delivery_mode"] = 2  
  3. chan.basic_publish(msg,exchange="sorting_room",routing_key="jason")  


你也許注意到咱們設置消息的delivery_mode屬性爲2,由於隊列和交換機都設置爲durable的,這個設置將保證消息可以持久化,也就是說,當它尚未送達消費者以前若是RabbitMQ重啓則它可以被恢復。

剩下的最後一件事情(生產者和消費者都須要調用的)是關閉channel和鏈接:

[html]  view plain  copy
 
  1. chan.close()  
  2. conn.close()  

很簡單吧。(下載:amqp_publisher.py http://blogs.digitar.com/jjww/code-samples/amqp_publisher.py)

來真實地跑一下吧……
如今咱們已經寫好了生產者和消費者,讓他們跑起來吧。假設你的RabbitMQ在localhost上安裝而且運行。

打開一個終端,執行python ./amqp_consumer.py讓消費者運行,而且建立隊列、交換機和綁定。

而後在另外一個終端運行python ./amqp_publisher.py 「AMQP rocks.」 。若是一切良好,你應該可以在第一個終端看到輸出的消息。

5. 參考

    • http://gavinroy.com/the-attention-deficit-disorder-guide-to-rabbi
    • http://blog.nosqlfan.com/html/1575.html
    • http://www.rabbitmq.com/configure.html
    • https://github.com/rabbitmq/rabbitmq-tutorials
    • http://www.hrbstation.com/blog/144.html
相關文章
相關標籤/搜索