RabbitMQ——Rabbit Message Queue的簡寫,但不能僅僅理解其爲消息隊列,消息代理更合適。RabbitMQ 是一個由 Erlang 語言開發的AMQP(高級消息隊列協議)的開源實現,其內部結構以下:html
RabbitMQ做爲一個消息代理,主要和消息打交道,負責接收並轉發消息。RabbitMQ提供了可靠的消息機制、跟蹤機制和靈活的消息路由,支持消息集羣和分佈式部署。適用於排隊算法、秒殺活動、消息分發、異步處理、數據同步、處理耗時任務、CQRS等應用場景。git
下面咱們就來學習下RabbitMQ。github
本文主要基於Windows下使用Vs Code 基於.net core進行demo演示。開始以前咱們須要準備好如下環境。算法
RabbitMQ Comman Prompt
,以管理員身份運行。rabbitmq-service install rabbitmq-service enable rabbitmq-service start
rabbitmqlctl status
檢查RabbitMQ狀態rabbitmq-plugins enable rabbitmq_management
便可成功安裝,使用默認帳號密碼(guest/guest)登陸http://localhost:15672/便可。在開始以前咱們先來了解下消息模型:
消費者(consumer)訂閱某個隊列。生產者(producer)建立消息,而後發佈到隊列(queue)中,隊列再將消息發送到監聽的消費者。windows
下面咱們咱們經過demo來了解RabbitMQ的基本用法。緩存
建立RabbitMQ文件夾,打開命令提示符,分別建立兩個控制檯項目Send、Receive。服務器
dotnet new console --name Send //建立發送端控制檯應用 cd Send //進入Send目錄 dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包 dotnet restore //恢復包
dotnet new console --name Receive //建立接收端控制檯應用 cd Receive //進入Receive目錄 dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包 dotnet restore //恢復包
咱們先來添加消息發送端邏輯:網絡
//Send.cs public static void Main(string[] args) { //1.1.實例化鏈接工廠 var factory = new ConnectionFactory() { HostName = "localhost" }; //2. 創建鏈接 using (var connection = factory.CreateConnection()) { //3. 建立信道 using (var channel = connection.CreateModel()) { //4. 申明隊列 channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); //5. 構建byte消息數據包 string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!"; var body = Encoding.UTF8.GetBytes(message); //6. 發送數據包 channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } } }
再來完善消息接收端邏輯:異步
//Receive.cs 省略部分代碼 public static void Main() { //1.實例化鏈接工廠 var factory = new ConnectionFactory() { HostName = "localhost" }; //2. 創建鏈接 using (var connection = factory.CreateConnection()) { //3. 建立信道 using (var channel = connection.CreateModel()) { //4. 申明隊列 channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); //5. 構造消費者實例 var consumer = new EventingBasicConsumer(channel); //6. 綁定消息接收後的事件委託 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine(" [x] Received {0}", message); Thread.Sleep(6000);//模擬耗時 Console.WriteLine (" [x] Done"); }; //7. 啓動消費者 channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
先運行消息接收端,再運行消息發送端,結果以下圖。分佈式
從上面的代碼中能夠看出,發送端和消費端的代碼前4步都是同樣的。主要的區別在於發送端調用channel.BasicPublish
方法發送消息;而接收端須要實例化一個EventingBasicConsumer
實例來進行消息處理邏輯。另一點須要注意的是:消息接收端和發送端的隊列名稱(queue)必須保持一致,這裏指定的隊列名稱爲hello。
使用工做隊列的好處就是它可以並行的處理隊列。若是堆積了不少任務,咱們只須要添加更多的工做者(workers)就能夠了。咱們先啓動兩個接收端,等待消息接收,再啓動一個發送端進行消息發送。
咱們增長運行一個消費端後的運行結果:
從圖中可知,咱們循環發送4條信息,兩個消息接收端按順序被循環分配。
默認狀況下,RabbitMQ將按順序將每條消息發送給下一個消費者。平均每一個消費者將得到相同數量的消息。這種分發消息的方式叫作循環(round-robin)。
按照咱們上面的demo,一旦RabbitMQ將消息發送到消費端,消息就會當即從內存中移出,不管消費端是否處理完成。在這種狀況下,消息就會丟失。
爲了確保一個消息永遠不會丟失,RabbitMQ支持消息確認(message acknowledgments)。當消費端接收消息而且處理完成後,會發送一個ack(消息確認)信號到RabbitMQ,RabbitMQ接收到這個信號後,就能夠刪除掉這條已經處理的消息任務。但若是消費端掛掉了(好比,通道關閉、鏈接丟失等)沒有發送ack信號。RabbitMQ就會明白某個消息沒有正常處理,RabbitMQ將會從新將消息入隊,若是有另一個消費端在線,就會快速的從新發送到另一個消費端。
RabbitMQ中沒有消息超時的概念,只有當消費端關閉或奔潰時,RabbitMQ纔會從新分發消息。
微調下Receive中的代碼邏輯:
//5. 構造消費者實例 var consumer = new EventingBasicConsumer(channel); //6. 綁定消息接收後的事件委託 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine(" [x] Received {0}", message); Thread.Sleep(6000);//模擬耗時 Console.WriteLine(" [x] Done"); // 7. 發送消息確認信號(手動消息確認) channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; //8. 啓動消費者 //autoAck:true;自動進行消息確認,當消費端接收到消息後,就自動發送ack信號,無論消息是否正確處理完畢 //autoAck:false;關閉自動消息確認,經過調用BasicAck方法手動進行消息確認 channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);
主要改動的是將 autoAck:true
修改成autoAck:fasle
,以及在消息處理完畢後手動調用BasicAck
方法進行手動消息確認。
從圖中可知,消息發送端連續發送4條消息,其中消費端1先被分配處理第一條消息,消費端2被循環分配第二條消息,第三條消息因爲沒有空閒消費者仍然在隊列中。
在消費端2未處理完第一條消息以前,手動中斷(ctrl+c)。咱們能夠發現RabbitMQ在下一次分發時,會優先將被中斷的消息分發給消費端1處理。
消息確認確保了即便消費端異常,消息也不會丟失可以被從新分發處理。可是若是RabbitMQ服務端異常,消息依然會丟失。除非咱們指定durable:true
,不然當RabbitMQ退出或奔潰時,消息將依然會丟失。經過指定durable:true
,並指定Persistent=true
,來告知RabbitMQ將消息持久化。
//send.cs //4. 申明隊列(指定durable:true,告知rabbitmq對消息進行持久化) channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments //將消息標記爲持久性 - 將IBasicProperties.SetPersistent設置爲true var properties = channel.CreateBasicProperties(); properties.Persistent = true; //5. 構建byte消息數據包 string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!"; var body = Encoding.UTF8.GetBytes(message); //6. 發送數據包(指定basicProperties) channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body);
將消息標記爲持久性不能徹底保證消息不會丟失。雖然它告訴RabbitMQ將消息保存到磁盤,可是當RabbitMQ接受消息而且尚未保存時,仍然有一個很短的時間窗口。RabbitMQ 可能只是將消息保存到了緩存中,並無將其寫入到磁盤上。持久化是不可以必定保證的,可是對於一個簡單任務隊列來講已經足夠。若是須要確保消息隊列的持久化,可使用publisher confirms.
RabbitMQ的消息分發默認按照消費端的數量,按順序循環分發。這樣僅是確保了消費端被平均分發消息的數量,但卻忽略了消費端的閒忙狀況。這就可能出現某個消費端一直處理耗時任務處於阻塞狀態,某個消費端一直處理通常任務處於空置狀態,而只是它們分配的任務數量同樣。
但咱們能夠經過channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
設置prefetchCount : 1
來告知RabbitMQ,在未收到消費端的消息確認時,再也不分發消息,也就確保了當消費端處於忙碌狀態時,再也不分配任務。
//Receive.cs //4. 申明隊列 channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); //設置prefetchCount : 1來告知RabbitMQ,在未收到消費端的消息確認時,再也不分發消息,也就確保了當消費端處於忙碌狀態時 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
這時你須要注意的是若是全部的消費端都處於忙碌狀態,你的隊列可能會被塞滿。你須要注意這一點,要麼添加更多的消費端,要麼採起其餘策略。
細心的你也許發現上面的demo,生產者和消費者直接是經過相同隊列名稱進行匹配銜接的。消費者訂閱某個隊列,生產者建立消息發佈到隊列中,隊列再將消息轉發到訂閱的消費者。這樣就會有一個侷限性,即消費者一次只能發送消息到某一個隊列。
那消費者如何才能發送消息到多個消息隊列呢?
RabbitMQ提供了Exchange,它相似於路由器的功能,它用於對消息進行路由,將消息發送到多個隊列上。Exchange一方面從生產者接收消息,另外一方面將消息推送到隊列。但exchange必須知道如何處理接收到的消息,是將其附加到特定隊列仍是附加到多個隊列,仍是直接忽略。而這些規則由exchange type定義,exchange的原理以下圖所示。
常見的exchange type 有如下幾種:
下面咱們就來一一這介紹它們的用法。
本着先易後難的思想,咱們先來了解下fanout的廣播路由機制。fanout的路由機制以下圖,即發送到 fanout 類型exchange的消息都會分發到全部綁定該exchange的隊列上去。
生產者示例代碼:
// 生成隨機隊列名稱 var queueName = channel.QueueDeclare().QueueName; //使用fanout exchange type,指定exchange名稱 channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout"); var message = "Hello Rabbit!"; var body = Encoding.UTF8.GetBytes(message); //發佈到指定exchange,fanout類型無需指定routingKey channel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: null, body: body);
消費者示例代碼:
//申明fanout類型exchange channel.ExchangeDeclare (exchange: "fanoutEC", type: "fanout"); //申明隨機隊列名稱 var queuename = channel.QueueDeclare ().QueueName; //綁定隊列到指定fanout類型exchange,無需指定路由鍵 channel.QueueBind (queue : queuename, exchange: "fanoutEC", routingKey: "");
direct相對於fanout就屬於徹底匹配、單播的模式,路由機制以下圖,即隊列名稱和消息發送時指定的路由徹底匹配時,消息纔會發送到指定隊列上。
生產者示例代碼:
// 生成隨機隊列名稱 var queueName = channel.QueueDeclare().QueueName; //使用direct exchange type,指定exchange名稱 channel.ExchangeDeclare(exchange: "directEC", type: "direct"); var message = "Hello Rabbit!"; var body = Encoding.UTF8.GetBytes(message); //發佈到direct類型exchange,必須指定routingKey channel.BasicPublish(exchange: "directEC", routingKey: "green", basicProperties: null, body: body);
消費者示例代碼:
//申明direct類型exchange channel.ExchangeDeclare (exchange: "directEC", type: "direct"); //綁定隊列到direct類型exchange,需指定路由鍵routingKey channel.QueueBind (queue : green, exchange: "directEC", routingKey: "green");
topic是direct的升級版,是一種模式匹配的路由機制。它支持使用兩種通配符來進行模式匹配:符號#
和符號*
。其中*
匹配一個單詞, #
則表示匹配0個或多個單詞,單詞之間用.
分割。以下圖所示。
生產者示例代碼:
// 生成隨機隊列名稱 var queueName = channel.QueueDeclare().QueueName; //使用topic exchange type,指定exchange名稱 channel.ExchangeDeclare(exchange: "topicEC", type: "topic"); var message = "Hello Rabbit!"; var body = Encoding.UTF8.GetBytes(message); //發佈到topic類型exchange,必須指定routingKey channel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: null, body: body);
消費者示例代碼:
//申明topic類型exchange channel.ExchangeDeclare (exchange: "topicEC", type: "topic"); //申明隨機隊列名稱 var queuename = channel.QueueDeclare ().QueueName; //綁定隊列到topic類型exchange,需指定路由鍵routingKey channel.QueueBind (queue : queuename, exchange: "topicEC", routingKey: "#.*.fast");
RPC——Remote Procedure Call,遠程過程調用。
那RabbitMQ如何進行遠程調用呢?示意圖以下:
第一步,主要是進行遠程調用的客戶端須要指定接收遠程回調的隊列,並申明消費者監聽此隊列。
第二步,遠程調用的服務端除了要申明消費端接收遠程調用請求外,還要將結果發送到客戶端用來監聽的結果的隊列中去。
遠程調用客戶端:
//申明惟一guid用來標識這次發送的遠程調用請求 var correlationId = Guid.NewGuid().ToString(); //申明須要監聽的回調隊列 var replyQueue = channel.QueueDeclare().QueueName; var properties = channel.CreateBasicProperties(); properties.ReplyTo = replyQueue;//指定回調隊列 properties.CorrelationId = correlationId;//指定消息惟一標識 string number = args.Length > 0 ? args[0] : "30"; var body = Encoding.UTF8.GetBytes(number); //發佈消息 channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body); Console.WriteLine($"[*] Request fib({number})"); // //建立消費者用於處理消息回調(遠程調用返回結果) var callbackConsumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer); callbackConsumer.Received += (model, ea) => { //僅當消息回調的ID與發送的ID一致時,說明遠程調用結果正確返回。 if (ea.BasicProperties.CorrelationId == correlationId) { var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}"; Console.WriteLine($"[x]: {responseMsg}"); } };
遠程調用服務端:
//申明隊列接收遠程調用請求 channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); Console.WriteLine("[*] Waiting for message."); //請求處理邏輯 consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body); int n = int.Parse(message); Console.WriteLine($"Receive request of Fib({n})"); int result = Fib(n); //從請求的參數中獲取請求的惟一標識,在消息回傳時一樣綁定 var properties = ea.BasicProperties; var replyProerties = channel.CreateBasicProperties(); replyProerties.CorrelationId = properties.CorrelationId; //將遠程調用結果發送到客戶端監聽的隊列上 channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo, basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString())); //手動發回消息確認 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine($"Return result: Fib({n})= {result}"); }; channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);
基於上面的demo和對幾種不一樣exchange路由機制的學習,咱們發現RabbitMQ主要是涉及到如下幾個核心概念:
此次做爲入門就講到這裏,下次咱們來說解下EventBus + RabbitMQ如何實現事件的分發。