rabbitMQ是一個在AMQP協議標準基礎上完整的,可服用的企業消息系統。他遵循Mozilla Public License開源協議。採用 Erlang 實現的工業級的消息隊列(MQ)服務器。 html
RabbitMQ的官方站:http://www.rabbitmq.com/
AMQP(高級消息隊列協議) 是一個異步消息傳遞所使用的應用層協議規範,做爲線路層協議,而不是API(例如JMS),AMQP 客戶端可以無視消息的來源任意發送和接受信息。AMQP的原始用途只是爲金融界提供一個能夠彼此協做的消息協議,而如今的目標則是爲通用消息隊列架構提供通用構建工具。所以,面向消息的中間件 (MOM)系統,例如發佈/訂閱隊列,沒有做爲基本元素實現。反而經過發送簡化的AMQ實體,用戶被賦予了構建例如這些實體的能力。這些實體也是規範的一 部分,造成了在線路層協議頂端的一個層級:AMQP模型。這個模型統一了消息模式,諸如以前提到的發佈/訂閱,隊列,事務以及流數據,而且添加了額外的特性,例如更易於擴展,基於內容的路由。 git
AMQP當中有四個概念很是重要 github
一個虛擬主機持有一組交換機、隊列和綁定。 數據庫
爲何須要多個虛擬主機呢?由於RabbitMQ當中,用戶只能在虛擬主機的粒度進行權限控制。所以,若是須要禁止A組訪問B組的交換機/隊列/綁定,必須爲A和B分別建立一個虛擬主機。每個RabbitMQ服務器都有一個默認的虛擬主機/。 編程
何謂虛擬主機(virtual host),交換機(exchange),隊列(queue)和綁定(binding) windows
隊列(Queues)是你的消息(messages)的終點,能夠理解成裝消息的容器。消息就一直在裏面,直到有客戶端(也就是消費者,Consumer)鏈接到這個隊列而且將其取走爲止。不過,也能夠將一個隊列配置成這樣的:一旦消息進入這個隊列,此消息就被刪除。 數組
隊列是由消費者(Consumer)經過程序創建的,不是經過配置文件或者命令行工具。這沒什麼問題,若是一個消費者試圖建立一個已經存在的隊列,RabbitMQ會直接忽略這個請求。所以咱們能夠將消息隊列的配置寫在應用程序的代碼裏面。 瀏覽器
而要把一個消息放進隊列前,須要有一個交換機(Exchange)。 服務器
交換機(Exchange)能夠理解成具備路由表的路由程序。每一個消息都有一個稱爲路由鍵(routing key)的屬性,就是一個簡單的字符串。交換機當中有一系列的綁定(binding),即路由規則(routes)。(例如,指明具備路由鍵 「X」 的消息要到名爲timbuku的隊列當中去。) 微信
消費者程序(Consumer)要負責建立你的交換機。交換機能夠存在多個,每一個交換機在本身獨立的進程當中執行,所以增長多個交換機就是增長多個進程,能夠充分利用服務器上的CPU核以便達到更高的效率。例如,在一個8核的服務器上,能夠建立5個交換機來用5個核,另外3個核留下來作消息處理。相似的,在RabbitMQ的集羣當中,你能夠用相似的思路來擴展交換機一邊獲取更高的吞吐量。
交換機如何判斷要把消息送到哪一個隊列?你須要路由規則,即綁定(binding)。一個綁定就是一個相似這樣的規則:將交換機「desert(沙漠)」當中具備路由鍵「阿里巴巴」的消息送到隊列「hideout(山洞)」裏面去。換句話說,一個綁定就是一個基於路由鍵將交換機和隊列鏈接起來的路由規則。例如,具備路由鍵「audit」的消息須要被送到兩個隊列,「log-forever」和「alert-the-big-dude」。要作到這個,就須要建立兩個綁定,每一個都鏈接一個交換機和一個隊列,二者都是由「audit」路由鍵觸發。在這種狀況下,交換機會複製一份消息而且把它們分別發送到兩個隊列當中。交換機不過就是一個由綁定構成的路由表。
交換機有多種類型。他們都是作路由的,可是它們接受不一樣類型的綁定。爲何不建立一種交換機來處理全部類型的路由規則呢?由於每種規則用來作匹配分子的CPU開銷是不一樣的。例如,一個「topic」類型的交換機試圖將消息的路由鍵與相似「dogs.*」的模式進行匹配。匹配這種末端的通配符比直接將路由鍵與「dogs」比較(「direct」類型的交換機)要消耗更多的CPU。若是你不須要「topic」類型的交換機帶來的靈活性,你能夠經過使用「direct」類型的交換機獲取更高的處理效率。那麼有哪些類型,他們又是怎麼處理的呢?
Exchange
你花了大量的時間來建立隊列、交換機和綁定,而後,服務器程序掛了。你的隊列、交換機和綁定怎麼樣了?還有,放在隊列裏面可是還沒有處理的消息們呢?
若是你是用默認參數構造的這一切的話,那麼,他們都灰飛煙滅了。RabbitMQ重啓以後會乾淨的像個新生兒。你必須重作全部的一切,亡羊補牢,如何避免未來再度發生此類杯具?
隊列和交換機有一個建立時候指定的標誌durable。durable的惟一含義就是具備這個標誌的隊列和交換機會在重啓以後從新創建,它不表示說在隊列當中的消息會在重啓後恢復。那麼如何才能作到不僅是隊列和交換機,還有消息都是持久的呢?
可是首先須要考慮的問題是:是否真的須要消息的持久化?若是須要重啓後消息能夠回覆,那麼它須要被寫入磁盤。但即便是最簡單的磁盤操做也是要消耗時間的。因此須要衡量判斷。
當你將消息發佈到交換機的時候,能夠指定一個標誌「Delivery Mode」(投遞模式)。根據你使用的AMQP的庫不一樣,指定這個標誌的方法可能不太同樣。簡單的說,就是將Delivery Mode設置成2,也就是持久的(persistent)便可。通常的AMQP庫都是將Delivery Mode設置成1,也就是非持久的。因此要持久化消息的步驟以下:
綁定(Bindings)怎麼辦?綁定沒法在建立的時候設置成durable。沒問題,若是你綁定了一個durable的隊列和一個durable的交換機,RabbitMQ會自動保留這個綁定。相似的,若是刪除了某個隊列或交換機(不管是否是durable),依賴它的綁定都會自動刪除。
注意:
在Windows上安裝Rabbit MQ 指南,最好的是這篇《Rabbit MQ Windows Installation guide》,其中還包括了使用.NET RabbitMQ.Client Nuget 包訪問Rabbit MQ的示例代碼。
安裝Rabbit MQ
Rabbit MQ 是創建在強大的Erlang OTP平臺上,所以安裝Rabbit MQ的前提是安裝Erlang。經過下面兩個鏈接下載安裝3.2.3 版本:
默認安裝的Rabbit MQ 監聽端口是5672
激活Rabbit MQ's Management Plugin
使用Rabbit MQ 管理插件,能夠更好的可視化方式查看Rabbit MQ 服務器實例的狀態,你能夠在命令行中使用下面的命令激活:
"C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin\rabbitmq-plugins.bat" enable rabbitmq_management
要重啓服務才能生效,能夠執行
net stop RabbitMQ && net start RabbitMQ
下面咱們使用rabbitmqctl控制檯命令(位於C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>)來建立用戶,密碼,綁定權限等。
Microsoft Windows [版本 6.3.9600]
(c) 2013 Microsoft Corporation。保留全部權利。
c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin 的目錄
2014/11/01 15:04 <DIR> .
2014/11/01 15:04 <DIR> ..
2014/01/23 22:57 817 rabbitmq-echopid.bat
2014/01/23 22:57 1,900 rabbitmq-plugins.bat
2014/01/23 22:57 4,356 rabbitmq-server.bat
2014/01/23 22:57 7,123 rabbitmq-service.bat
2014/01/23 22:57 1,621 rabbitmqctl.bat
5 個文件 15,817 字節
2 個目錄 96,078,618,624 可用字節
c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba
t list_users
Listing users ...
guest [administrator]
...done.
c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba
t list_vhosts
Listing vhosts ...
/
...done.
c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba
t add_user geffzhang zsy@2014
Creating user "geffzhang" ...
...done.
c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba
t list_users
Listing users ...
geffzhang []
guest [administrator]
...done.
c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba
t set_user_tags geffzhang administrator
Setting tags for user "geffzhang" to [administrator] ...
...done.
c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba
t set_permissions -p / geffzhang ".*" ".*" ".*"
Setting permissions for user "geffzhang" in vhost "/" ...
...done.
c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba
t list_users
Listing users ...
geffzhang [administrator]
guest [administrator]
...done.
使用瀏覽器打開http://localhost:15672 訪問Rabbit Mq的管理控制檯,使用剛纔建立的帳號登錄系統:
在.NET上使用Rabbit MQ
經過Nuget 獲取Rabbit MQ NET client bindings from NuGet:
PM> Install-Package RabbitMQ.Client
咱們最多見的一個場景是發送和接收Rabbit MQ 持久化消息:
第一步是聲明durable Exchange 和 Queue
private readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory { HostName = "Geffzhang-NB", UserName="geffzhang", Password ="zsy@2014 ", VirtualHost ="/" };
const string ExchangeName = "test.exchange";
const string QueueName = "test.queue";
using (IConnection conn = rabbitMqFactory.CreateConnection())
using (IModel channel = conn.CreateModel())
{
channel.ExchangeDeclare(ExchangeName, "direct", durable:true, autoDelete:false, arguments:null);
channel.QueueDeclare(QueueName, durable:true, exclusive:false, autoDelete:false,arguments:null);
channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
}
下面對上面代碼進行說明:
1. 使用ConnectionFactory建立鏈接,雖然建立時指定了多個server address,但每一個connection只與一個物理的server進行鏈接。
2. 定義交換方式 ,建立了Direct Exchange和Durable Queue,並使用QueueName做爲routing key ,能夠把消息直接投遞到某個隊列。rabbitmq交換方式分爲三種,分別是:
Direct Exchange – 處理路由鍵。須要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵徹底匹配。這是一個完整的匹配。若是一個隊列綁定到該交換機上要求路由鍵 「dog」,則只有被標記爲「dog」的消息才被轉發,不會轉發dog.puppy,也不會轉發dog.guard,只會轉發dog。
Fanout Exchange – 不處理路由鍵。你只須要簡單的將隊列綁定到交換機上。一個發送到交換機的消息都會被轉發到與該交換機綁定的全部隊列上。很像子網廣播,每臺子網內的主機都得到了一份複製的消息。Fanout交換機轉發消息是最快的。
Topic Exchange – 將路由鍵和某模式進行匹配。此時隊列須要綁定要一個模式上。符號「#」匹配一個或多個詞,符號「*」匹配很少很多一個詞。所以「audit.#」可以匹配到「audit.irs.corporate」,可是「audit.*」 只會匹配到「audit.irs」。
運行上述代碼,能夠在Rabbit MQ的管理控制檯上看到test.exchange Exchange 綁定到 建立的隊列 test.queue
第二步就是發佈持久化消息到隊列
Exchange和Queue創建好之後,就能夠發送消息到隊列了。RabbitMq 能夠接受byte[]的數據,字符串採用utf-8編碼的字節數組。確保消息可持久化的,須要設置PersistMode爲true,參看下面的代碼:
var props = channel.CreateBasicProperties();
props.SetPersistent(true);
var msgBody = Encoding.UTF8.GetBytes("Hello, World!");
channel.BasicPublish(ExchangeName, routingKey:QueueName, basicProperties:props, body:msgBody);
第三步就是消費消息了,有幾種不一樣的方法從隊列中消費消息,最多見的是使用BasicGet:
BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck: true);
var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
NoAck:true 告訴RabbitMQ當即從隊列中刪除消息,另外一個很是受歡迎的方式是從隊列中刪除已經確認接收的消息,能夠經過單獨調用BasicAck 進行確認:
BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck:false);
//process message ...
channel.BasicAck(msgResponse.DeliveryTag, multiple:false);
使用BasicAck方式來告之是否從隊列中移除該條消息,這一點很重要,由於在某些應用場景下,好比從隊列中獲取消息並用它來操做數據庫或日誌文件時,若是出現操做失敗時,則該條消息應該保留在隊列中,只到操做成功時才從隊列中移除。
另一種方法是經過基於推送的事件訂閱。您能夠使用內置的 QueueingBasicConsumer 提供簡化的編程模型,經過容許您在共享隊列上阻塞,直到收到一條消息,例如
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(QueueName, noAck: true, consumer: consumer);
var msgResponse = consumer.Queue.Dequeue(); //blocking
var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
歡迎你們關注微信號opendotnet,微信公衆號名稱:dotNET跨平臺。掃下面的二維碼或者收藏下面的二維碼關注吧(長按下面的二維碼圖片、並選擇識別圖中的二維碼)