RabbitMQ學習之:(三)AMQP和RabbitMQ介紹

準備開始

高級消息隊列協議(AMQP1)是一個異步消息傳遞所使用的應用層協議規範。做爲線路層協議,而不是API(例如JMS2),AMQP客戶端可以無視消息的來源任意發送和接受信息。如今,已經有至關一部分不一樣平臺的服務器3和客戶端能夠投入使用4html

AMQP的原始用途只是爲金融界提供一個能夠彼此協做的消息協議,而如今的目標則是爲通用消息隊列架構提供通用構建工具。所以,面向消息的中間件(MOM)系統,例如發佈/訂閱隊列,沒有做爲基本元素實現。反而經過發送簡化的AMQ實體,用戶被賦予了構建例如這些實體的能力。這些實體也是規範的一部分,造成了在線路層協議頂端的一個層級:AMQP模型。這個模型統一了消息模式,諸如以前提到的發佈/訂閱,隊列,事務以及流數據,而且添加了額外的特性,例如更易於擴展,基於內容的路由。java

本文中區別發佈/訂閱是爲了將生產者和消費者拆分開來:生產者無需知道消費者按照什麼標準接受消息。隊列是一個先入先出的數據結構。路由封裝了消息隊列中的消息的相關信息,這些信息決定了消息在異步消息系統中的最終展示形式。node

在這裏,我嘗試解釋一下這個模型的一些概念,Aman Gupta使用Ruby5實現了AMQP模型6。它使用的是一種事件驅動架構(基於EventMachine7),在閱讀和使用的時候都會讓人以爲有些不太熟悉。可是API的設計代表了在AMQ模型實體之間通訊的方式是很是簡單的,所以,即使開發者對Ruby並不熟悉,他一樣也能夠獲得收穫。git

應該注意到,至少有三個或者更多的Ruby客戶端8910可供選擇。其中的一個客戶端Carrot很明顯使用了非事件驅動的同步Ruby架構,所以,這個客戶端在使用事件驅動的Ruby API的時候,風格很是簡潔。github

本文中的AMQP服務器是使用Erlang11編寫的RabbitMQ。它實現了AMQP規範0-8版的內容,而且將在近期實現0-9-1版的內容12web

在開始以前再交代一些東西:異步消息是一個很是普通而且普遍使用的技術,從例如Skype或者XMPP/Jabber這樣各類各樣的即時消息協議到古老的email。可是,這些服務都有以下特徵:apache

- 它們會在傳輸消息的時候或多或少加入一些隨意的內容(例如一封email可能會包含一個文本和關於辦公室笑話的PPT)和一些比較正式的路由信息(例如email地址)。安全

- 它們都是異步的,也就是說它們將生產者和消費者區分開來,所以可能將消息加入隊列(例如某人發給你一條消息,可是你不在線或者你的郵箱會收到一封email)。ruby

- 生產者和消費者是具備不一樣知識的不一樣角色。我不須要知道你的IMAP用戶名和密碼就可以給你發送email。事實上,我甚至不須要知道你的email地址是不是一個馬甲或者「真實」地址。這個特性意味着生產者不能控制什麼內容被閱讀或者訂閱了 - 就像個人email客戶端會捨棄掉大多數主動發送給個人醫藥廣告。服務器

AMQP是一個抽象的協議(也就是說它不負責處理具體的數據),這個事實並不會將事情變得更復雜。反而,Internet使得消息無處不在。人們一般使用它們和異步消息簡單靈活地解決不少問題。並且構建AMQ中的異步消息架構模型最困難的地方在於上手的時候,一旦這些困難被克服,那麼構建過程將變得簡單。

你可能須要安裝一些軟件來本身動手實現這些例子。若是你已經在系統上安裝了Ruby,那麼只須要不到十分鐘的設置時間。RabbitMQ網站也有許多信息13幫助你儘快開始。你只需作這些準備工做:

- Erlang/OTP包。下載地址是 http://erlang.org/download.html,安裝說明在http://www.erlang.org/doc/installation_guide/part_frame.html 

- RabbitMQ。下載地址是 http://www.rabbitmq.com/download.html,安裝說明在 http://www.rabbitmq.com/install.html

- 一個Ruby虛擬機。若是在你的系統平臺上沒有可供選擇的Ruby解釋器,你可能須要下載Ruby MRI VM。在 http://www.ruby-lang.org/en/downloads/能夠找到下載地址和安裝說明。

- 兩個Ruby 「gem」(已打包的庫)。gem工具應該會隨着你的Ruby安裝包一塊兒分發。

     - 若是你須要全新安裝或者不肯定它是否是當前版本,那麼你能夠選擇升級gem工具。輸入gem update --system。在BSD/UNIX系統中,你可能須要有超級用戶的權限才能運行此命令(以及後續指令)。
     - 告訴gem在GitHub搜索包:gem sources -a http://gems.github.com。
     - 安裝AMQPgem:gem install tmm1-amqp。這也會安裝event-machine gem。

如今你須要作的就是啓動RabbitMQ服務器14

AMQ模型

在AMQ規範中描述了一些實體。一個用來分辨這些實體的方法是檢查它們是否由服務器管理員配置或者由客戶端在運行的時候聲明。

可配置的實體有:

- 消息協商器(Message Broker),它在TCP/IP等端口監聽AMQ消息。

- 將消息協商數據劃分到多個不一樣集合的虛擬主機,它很像webserver中的虛擬主機,例如Apache的http守護進程。

- 使用安全憑據鏈接到虛擬主機的用戶。

1 require 'rubygems'
2 require 'mq'
3
4 event_loop = Thread.new do
5   EM.run do
6     EM.add_timer(1) do
7       EM.stop
8     end
9   end
10 end
11
12 # connect to the rabbitmq demonstration broker server (http://www.rabbitmq.com/examples.html#demoserver)
13
14 AMQP.start :host => 'dev.rabbitmq.com', :port => 5672, :user => 'guest', :password => 'guest', :vhost
=> 'localhost'
15
16 event_loop.join

值得注意的是,規範中僅僅授予用戶訪問虛擬主機的權限,並無採納其餘比這高級的訪問控制措施,所以RabbitMQ並不支持這些高級訪問控制措施。一個由廠商開發的解決方法15指望會加入到下個主要版本中。可是,這個功能16能夠經過使用Mercurial代碼庫的默認branch17來實現,並且已經有一些RabbitMQ用戶在使用了。

爲了和協商器交流,一個客戶端須要創建一個或者多個鏈接。這些鏈接只是限於鏈接用戶和虛擬主機。客戶端默認使用guest/guest訪問權限和訪問虛擬主機的根目錄,這些默認實現也是RabbitMQ的默認安裝選項。

在一個鏈接中,客戶端聲明瞭一個通道。這個通道是消息協商器的網絡鏈接中的一個邏輯鏈接。這種多工機制是必要的,由於協議中的某些操做是須要這樣的通道。所以,經過單一鏈接到協商器的併發控制須要創建一個可靠的模型,這裏可使用通道池和串行訪問或者例如線程本地通道這樣的線程併發模型。在這個例子中,Ruby API對用戶隱藏了通道管理這樣的細節。

若是須要在一個通道上進行操做,那麼客戶端須要聲明AMQ組件。聲明組件是斷言特定的組件存在於協商器中──若是不存在的話,那麼在運行時建立。

這些組件包括:

- 交換器(Exchange),它是發送消息的實體。

- 隊列(Queue),這是接收消息的實體。

- 綁定器(Bind),將交換器和隊列鏈接起來,而且封裝消息的路由信息。

全部這些組件的屬性各不相同,可是隻有交換器和隊列一樣被命名。客戶端能夠經過交換器的名字來發送消息,也能夠經過隊列的名字收取信息。由於AMQ協議沒有一個通用的標準方法來得到全部組件的名稱,因此客戶端對隊列和交換器的訪問被限制在僅能使用熟知的或者只有本身知道的名字(參見18瞭解這種訪問控制的信息)。

綁定器沒有名字,它們的生命期依賴於所緊密鏈接的交換器和隊列。若是這二者任意一個被刪除掉,那麼綁定器便失效了。這就說明,若要知道交換器和隊列的名字,還須要設置消息路由。

消息是一個不透明的數據包,這些包有以下性質:

- 元數據,例如內容的編碼或者代表來源的字段。

- 標誌位,標記消息投遞時候的一些保障機制。

- 一個特殊的字段叫作routing key。

2.1 接受和發送消息:交換器類型

發送消息是一個很是簡單的過程。客戶端聲明一個它想要發送消息的目的交換器,而後將消息傳遞給交換器。

接受消息的最簡單辦法是設置一個訂閱。客戶端須要聲明一個隊列,而且使用一個綁定器將以前的交換器和隊列綁定起來,這樣的話,訂閱就設置完畢。

1 require 'rubygems'
2 require 'mq'
3
4 event_loop = Thread.new do
5   EM.run do
6     EM.add_timer(1) do
7       EM.stop
8     end
9   end
10 end
11
12 def subscribe_to_queue
13
14   exchange = MQ.fanout('my-fanout-exchange')
15   queue = MQ.queue('my-fanout-queue')
16
17   queue.bind(exchange).subscribe do |header, body|
18     yield header, body
19   end
20
21 end
22
23 def send_to_exchange(message)
24
25   exchange = MQ.fanout('my-fanout-exchange')
26   exchange.publish message
27
28 end
29
30 subscribe_to_queue do |header, body|
31   p "I received a message: #{body}"
32 end
33
34 send_to_exchange 'Hello'
35 send_to_exchange 'World'
36
37 event_loop.join

三個標準決定了一條消息是否真的被投遞到了隊列中:

  1. 交換器的類型。在這個例子中類型是fanout。

  2. 消息的屬性。在這個例子中,消息沒有任何屬性,只是有內容(首先是Hello,而後World)。

  3. 給定的綁定器的惟一可選屬性:鍵值。在這個例子中綁定器沒有任何鍵值。

交換器的類型決定了它如何解釋這個鏈接。咱們的例子中,fanout交換器不會解釋任何東西:它只是將消息投遞到全部綁定到它的隊列中。

沒有綁定器,哪怕是最簡單的消息,交換器也不能將其投遞到隊列中,只能拋棄它。經過訂閱一個隊列,消費者可以從隊列中獲取消息,而後在使用事後將其從隊列中刪除。

下列交換器類型都在規範中被說起。隨後我會由淺入深地介紹它們。

- direct交換器將消息根據其routing-key屬性投遞到包含對應key屬性的綁定器上。

1 require 'rubygems'
2 require 'mq'
3
4 event_loop = Thread.new do
5   EM.run do
6     EM.add_timer(1) do
7       EM.stop
8     end
9   end
10 end
11
12 def subscribe_to_queue(key)
13
14   exchange = MQ.direct('my-direct-exchange')
15   queue = MQ.queue('my-direct-queue')
16
17   queue.bind(exchange, :key => key).subscribe do |header, body|
18     yield header, body
19   end
20
21 end
22
23 def send_to_exchange(message, key)
24
25   exchange = MQ.direct('my-direct-exchange')
26   exchange.publish message, :routing_key => key
27
28 end
29
30 subscribe_to_queue('hello_world') do |header, body|
31   p "I received a message: #{body}"
32 end
33
34 send_to_exchange 'Hello', 'hello_world'
35 send_to_exchange 'Cruel', 'ignored'
36 send_to_exchange 'World', 'hello_world'
37
38 event_loop.join

- topic交換器用過模式匹配分析消息的routing-key屬性。它將routing-key和binding-key的字符串切分紅單詞。這些單詞之間用點隔開。它一樣也會識別兩個通配符:#匹配0個或者多個單詞,*匹配一個單詞。例如,binding key *.stock.#匹配routing key usd.stcok和eur.stock.db,可是不匹配stock.nasdaq。

1 require 'rubygems'
2 require 'mq'
3
4 event_loop = Thread.new do
5   EM.run do
6     EM.add_timer(1) do
7       EM.stop
8     end
9   end
10 end
11
12 def subscribe_to_queue(key)
13
14   exchange = MQ.topic('my-topic-exchange')
15   queue = MQ.queue('my-topic-queue')
16
17   queue.bind(exchange, :key => key).subscribe do |header, body|
18     yield header, body
19   end
20
21 end
22
23 def send_to_exchange(message, key)
24
25   exchange = MQ.topic('my-topic-exchange')
26   exchange.publish message, :routing_key => key
27
28 end
29
30 subscribe_to_queue('hello.*.message.#') do |header, body|
31   p 」I received a message: #{body}」
32 end
33
34 send_to_exchange 'Hello', 'hello.world.message.example.in.ruby'
35 send_to_exchange 'Cruel', 'cruel.world.message'
36 send_to_exchange 'World', 'hello.world.message'
37
38 event_loop.join

- 在規範中還有其餘的交換器被說起,例如header交換器(它根據應用程序消息的特定屬性進行匹配,這些消息可能在binding key中標記爲可選或者必選),failover和system交換器。可是這些交換器如今在當前RabbitMQ版本中均未實現。

不一樣於隊列的是,交換器有相應的類型,代表它們的投遞方式(一般是在和綁定器協做的時候)。由於交換器是命名實體,因此聲明一個已經存在的交換器,可是試圖賦予不一樣類型是會致使錯誤。客戶端須要刪除這個已經存在的交換器,而後從新聲明而且賦予新的類型

交換器也有一些性質:

- 持久性:若是啓用,交換器將會在協商器重啓前都有效。

- 自動刪除:若是啓用,那麼交換器將會在其綁定的隊列都被刪除掉以後自動刪除掉自身。

- 惰性:若是沒有聲明交換器,那麼在執行到使用的時候會致使異常,並不會主動聲明。

2.2 默認交換器和綁定器

AMQP協商器都會對其支持的每種交換器類型(爲每個虛擬主機)聲明一個實例。這些交換器的命名規則是amq.前綴加上類型名。例如 amq.fanout。空的交換器名稱等於amq.direct。對這個默認的direct交換器(也僅僅是對這個交換器),協商器將會聲明一個綁定了系統中全部隊列的綁定器。

這個特色告訴咱們,在系統中,任意隊列均可以和默認的direct交換器綁定在一塊兒,只要其routing-key等於隊列名字。

2.3 隊列屬性和多綁定器

默認綁定器的行爲揭示了多綁定器的存在 - 將一個或者多個隊列和一個或者多個交換器綁定起來。這使得能夠將發送到不一樣交換器的具備不一樣routing key(或者其餘屬性)的消息發送到同一個隊列中。

1 require 'rubygems'
2 require 'mq'
3
4 event_loop = Thread.new do
5   EM.run do
6     EM.add_timer(1) do
7       EM.stop
8     end
9   end
10 end
11
12 def subscribe_to_queue(*keys)
13
14   exchange = MQ.direct('my-direct-exchange')
15   queue = MQ.queue('my-direct-queue-with-multiple-bindings')
16
17   bindings = keys.map do |key|
18     queue.bind(exchange, :key => key)
19   end
20
21   bindings.last.subscribe do |header, body|
22     yield header, body
23   end
24
25 end
26
27 def send_to_exchange(message, key)
28
29   exchange = MQ.direct('my-direct-exchange')
30   exchange.publish message, :routing_key => key
31
32 end
33
34 subscribe_to_queue('foo', 'bar', 'wee') do |header, body|
35   p "I received a message: #{body}"
36 end

37
38 send_to_exchange 'Hello', 'foo'
39 send_to_exchange 'You', 'gee'
40 send_to_exchange 'Cruel', 'bar'
41 send_to_exchange 'World', 'wee'
42
43 event_loop.join

雖然不能被命名,可是隊列也有如下屬性,這些屬性和交換器所具備的屬性相似。

- 持久性:若是啓用,隊列將會在協商器重啓前都有效。

自動刪除:若是啓用,那麼隊列將會在全部的消費者中止使用以後自動刪除掉自身。

惰性:若是沒有聲明隊列,那麼在執行到使用的時候會致使異常,並不會主動聲明。

排他性:若是啓用,隊列只能被聲明它的消費者使用。

這些性質能夠用來建立例如排他和自刪除的transient或者私有隊列。這種隊列將會在全部連接到它的客戶端斷開鏈接以後被自動刪除掉 - 它們只是短暫地鏈接到協商器,可是能夠用於實現例如RPC或者在AMQ上的對等通訊。

AMQP上的RPC是這樣的:RPC客戶端聲明一個回覆隊列,惟一命名(例如用UUID19),而且是自刪除和排他的。而後它發送請求給一些交換器,在消息的reply-to字段中包含了以前聲明的回覆隊列的名字。RPC服務器將會回答這些請求,使用消息的reply-to做爲routing key(以前提到過默認綁定器會綁定全部的隊列到默認交換器)發送到默認交換器。注意僅僅是慣例而已。根據和RPC服務器的約定,它能夠解釋消息的任何屬性(甚至數據體)來決定回覆給誰。

隊列也能夠是持久的,可共享,非自動刪除以及非排他的。使用同一個隊列的多個用戶接收到的並非發送到這個隊列的消息的一份拷貝,而是這些用戶共享這隊列中的一份數據,而後在使用完以後刪除掉。

2.4 消息投遞的保障機制

消費者會顯式或者隱式地通知消息的使用完畢。當隱式地通知的時候,消息被認爲在投遞以後便被消耗掉。不然客戶端須要顯式地發送一個驗證信息。只有這個驗證信息收到以後,消息纔會被認爲已經收到而且從隊列中刪除。若是沒有收到,那麼協商器會在通道20關閉以前嘗試着從新投遞消息。

1 require 'rubygems'
2 require 'mq'
3
4 event_loop = Thread.new do
5   EM.run do
6     EM.add_timer(1) do
7       EM.stop
8     end
9   end
10 end
11
12 def subscribe_to_queue
13
14   exchange = MQ.fanout('my-fanout-exchange-with-acks')
15   queue = MQ.queue('my-fanout-queue-with-acks')
16
17   queue.bind(exchange).subscribe(:ack => true) do |header, body|
18     yield header, body
19     header.ack unless body == 'Cruel'
20   end
21
22 end
23
24 def send_to_exchange(message)
25
26   exchange = MQ.fanout('my-fanout-exchange-with-acks')
27   exchange.publish message
28
29 end
30
31 subscribe_to_queue do |header, body|
32   p "I received a message: #{body}"
33 end
34
35 send_to_exchange 'Hello'
36 send_to_exchange 'Cruel'
37 send_to_exchange 'World'
38
39 event_loop.join
40
41 __END__
42
43 First run:
44
45 "I received a message: Hello"
46 "I received a message: Cruel"
47 "I received a message: World"
48
49 Second run:
50
51 "I received a message: Cruel"
52 "I received a message: Hello"
53 "I received a message: Cruel"
54 "I received a message: World"
55
56 ... and so forth

消息生產者能夠選擇是否在消息被髮送到交換器而且還未投遞到隊列(沒有綁定器存在)和/或沒有消費者可以當即處理的時候獲得通知。經過設置消息的mandatory和/或immediate屬性爲真,這些投遞保障機制的能力獲得了強化。

如今在本文例子中使用的Ruby AMQP API還不徹底支持這些標誌位。可是,在GitHub上已經有兩個patch2122展現了徹底支持以後的狀況。

此外,一個生產者能夠設置消息的persistent屬性爲真。這樣一來,協商器將會嘗試將這些消息存儲在一個穩定的位置,直到協商器崩潰。固然,這些消息確定不會被投遞到非持久的隊列中。

2.5 擁塞控制

在給出的例子中,對消息的使用永遠看作是一個訂閱。那麼考慮到了擁塞控制嗎?規範制定了QoS23特性,限制了經過一個通道發送到一個消費者的消息總量。很不幸的是,這個特性在當前RabbitMQ的版本中還不支持(計劃在1.6),可是在原則上是應該被AMQP API支持的。

做爲一個替代方案,客戶端能夠選擇從隊列中取出消息而不是經過訂閱。當使用這種方法的時候,擁塞控制能夠手動地實現。

1 require 'rubygems'
2 require 'mq'
3
4 event_loop = Thread.new do
5   EM.run do
6     EM.add_timer(5) do
7       EM.stop
8     end
9   end
10 end
11
12 def subscribe_to_queue
13
14   exchange = MQ.fanout('my-fanout-exchange')
15   queue = MQ.queue('my-fanout-queue')
16
17   queue.bind(exchange).pop do |header, body|
18     yield header, body
19   end
20
21   EM.add_periodic_timer(0.25) do
22     queue.pop
23   end
24
25 end
26
27 def send_to_exchange(message)
28
29   exchange = MQ.fanout('my-fanout-exchange')
30   exchange.publish message
31
32 end
33
34 received = 0
35
36 subscribe_to_queue do |header, body|
37   p "I received a message: #{body}"
38 end
39
40 send_to_exchange 'Hello'
41 send_to_exchange 'World'
42
43 event_loop.join

一個模型樣例

想像一下你想建立一個普通的聊天應用,那麼應該有如下幾個基本特性:

- 聊天 - 兩個用戶應該能夠相互發送消息。

- 一個好友系統 - 用戶可以控制誰給他發送消息。

咱們假設在協商器上有兩種消費者:好友服務器和聊天客戶端。

3.1 成爲好友

爲了成爲Bob的好友,Alice首先得發送一個消息給fanout交換器iends,咱們假設這個交換器是訪問受限24的:普通用戶不可以將隊列綁定到它。在這個消息中,Alice表示想和Bob成爲朋友。

在協商器上有大量的聊天服務器,從綁定到friends交換器的一個單一持久隊列中持續地取出消息。這個隊列的名字是例如friends.298F2DBC6865-4225-8A73-8FF6175D396D這樣的,這難以猜想的名字可以阻止聊天客戶端直接取出信息 - 記住:不知道隊列的名字,就不能設置訂閱。

當一個聊天服務器收到Alice的消息(只有一個會獲得這個消息,由於它們都是從同一個隊列中獲取),決定這個請求是否有效,而後將其(也許是作過一些調整或者參數化)發送到默認交換器(能夠是直接的或者持久的)。它使用另一個只有Bob知道的routing key來投遞。當Bob上線的時候(或者一個服務器作了這件事),他會聲明一個隊列,這個隊列的名字就是以前的routing key(記住在虛擬主機上的默認綁定器是將全部的隊列和默認交換器綁定在一塊兒)。

Bob的聊天客戶端如今詢問Bob是否想和Alice成爲朋友。在她的請求消息中,有一個特殊的屬性叫作reply-to - 這個屬性包括了一個持久和排他的好友隊列的名字,這個隊列是Alice聲明將用於和Bob的將來聊天用。若是Bob想和Alice成爲朋友,他會使用這個隊列的名字做爲routing key,發送一個消息到默認交換器。他也會須要聲明一個持久和排他的好友隊列,將其名字設爲reply-to的值。

例如:Alice和Bob的好友隊列的名字是B5725C4A-6621463E-AAF1-8222AA3AD601。Bob發送給Alice的消息的routing-key的值即是這個名字,也是Alice發送給Bob的消息中reply-to的值。

由於好友隊列是持久的,所以發送到消息在用戶離線的時候也不會丟失。當用戶上線以後,全部的在好友隊列的消息將會發送到用戶,而後纔去獲取新的消息。

當Bob再也不想和Alice成爲好友,他能夠簡單地刪除掉爲Alice聲明的好友隊列。在她使用mandatory標誌位發送消息的時候,Alice也會注意到Bob已經再也不想是她的好友。由於交換器會將她的消息認爲不可投遞而返回。[PunCha: 這個例子仍是蠻有趣的。]

仍未說起的事情

仍然有不少本文沒有介紹的東西,例如事務語義,關於信息的重路由,header交換器的規範以及不一樣AMQP規範之間的差別 - 尤爲是在1.0版本以前的模型改變。爲了簡介起見,一個聊天的模型一樣也被略過了。

這裏也沒有介紹了整個系統的管理,由於還不清楚AMQP和RabbitMQ將會走向何方。如今有一個課題,關於在保留的amq命名空間中可用的交換器,它能獲取協商器全部的日誌信息。可是,可以列出如今已經聲明的組件和已鏈接的用戶的工具是用rabbitmqctl命令行接口而不是AMQ實體來實現的。

1 require 'rubygems'
2 require 'mq'
3
4 PATH_TO_RABBITMQCTL = '/usr/local/sbin/rabbitmqctl'
5
6 event_loop = Thread.new { EM.run }
7
8 def subscribe_to_logger
9
10   random_name = (0...50).map{ ('a'..'z').to_a[rand(26)] }.join
11
12   exchange = MQ.topic('amq.rabbitmq.log')
13   queue = MQ.queue(random_name, :autodelete => true, :exclusive => true)
14   binding = queue.bind(exchange, :key => '#')
15
16   binding.subscribe do |header, body|
17     body.split("\n").each do |message|
18       yield header, message
19     end
20   end
21
22 end
23
24 def exchange_info(vhost = '/')
25   info :exchange, vhost, %w(name type durable auto_delete arguments)
26 end
27
28 def queue_info(vhost = '/')
29   info :queue, vhost, %w(name durable auto_delete arguments node messages_ready messages_unacknowledged
                            messages_uncommitted messages acks_uncommitted consumers transactions memory)
30 end
31
32 def binding_info(vhost = '/')
33   info :binding, vhost
34 end
35
36 def connection_info
37   info :exchange, nil, %w(node address port peer_address peer_port state channels user vhost timeout
                             frame_max recv_oct recv_cnt send_oct send_cnt send_pend)
38 end
39
40 def info(about, vhost = nil, items = [])
41
42   column_length = 20
43
44   puts "#{about} info\n"
45
46   cmd = "#{PATH_TO_RABBITMQCTL} list_#{about}s"
47   cmd << " -p #{vhost}" if vhost
48   cmd << " #{items.join(' ')} 2>&1"
49
50   pipe = IO.popen(cmd)
51
52   pipe.readlines.map { |line| line.chomp.split("\t").map { |item| item.ljust(column_length)[0,
                             column_length] } }.slice(1..-2).each do |exchange|
53     print exchange.join(' ') + "\n"
54 end
55
56 end
57
58 subscribe_to_logger do |message|
59   p "RabbitMQ logger: #{message}"
60 end
61
62 %w(connection exchange queue binding).each do |method|
63   self.send "#{method}_info".to_sym
64 end
65
66 event_loop.join

必須說起的是,已經有一些使用AMQP(或者RabbitMQ)的分佈式架構。這些架構(例如Nanite25或者Lizzy26)在AMQP的頂部引入了一些抽象層,這樣簡化了一些操做,例如cluster中在Ruby客戶端之間工做的分配。

4.1 下一步該作什麼?

要想使用本地的協商器來玩玩好友和郵件列表,第一步應該是學習有關AMQP和RabbitMQ的知識。不只僅應該在RabbitMQ網站上閱讀幻燈片和文章,還應該經過在IRC的#rabbitmq通道上和社區成員交流或者閱讀關於RabbitMQ和/或AMQP的網誌2728,例如LShift的博客。在Twitter上也能夠經過#rabbitmq或#amqp這兩個標籤找到不少關於AMQP或者RabbitsMQ的內容29303132333435

歡迎進入異步信息的世界,祝您玩得愉快!

1 http://www.infoq.com/amqp
2 ttp://java.sun.com/products/jms/
3 Advanced Message Queuing Protocol/Implementations
4 http://www.rabbitmq.com/how.html#clients
5 http://github.com/tmm1/amqp/tree/master
6 http://www.infoq.com/ruby/
7 http://rubyeventmachine.com/
8 http://github.com/famoseagle/carrot/tree/master
9 http://github.com/celldee/bunny/tree/master
10 http://qpid.apache.org/download.html
11 Erlang
12 http://www.rabbitmq.com/specification.html
13 http://www.rabbitmq.com/how.html
14 http://www.rabbitmq.com/how.html
15 AccessControlDesign
16 ACLs
17 http://www.rabbitmq.com/mercurial.html#defaultbranch
18 Minimum Air Induction
19 Universally Unique Identifier
20 或者關聯到這個通道的鏈接。
21 somic/amqp
22 yawn/amqp
23 BasicQosDesign
24 做爲替代方案,Alice能夠能夠收到另外一個特殊的routing-key來請求加爲好友。全部的聊天服務器將會使用聊天系統中全部用戶的 binding-key綁定到friends.298F2DBC-6865-4225-8A73-8FF6175D396D。也有其餘的可行方案 - 事實上本例子中全部的行爲都有多種實現方法。
25 ezmobius / nanite
26 bmizerany / lizzy
27 lists.rabbitmq.com Mailing Lists
28 RabbitMQ
29 RabbitM - Highlights: presentations, blogs and code
30 freenode
31 Minimum Air Induction
32 Kirk's Rants blogspot
33 http://somic.org/category/rabbitmq/ 34 http://www.lshift.net/blog/category/lshift-sw/rabbitmq
35 Twitter

查看英文原文:Getting started with AMQP and RabbitMQ

相關文章
相關標籤/搜索