RabbitMQ知多少

1.引言

RabbitMQ——Rabbit Message Queue的簡寫,但不能僅僅理解其爲消息隊列,消息代理更合適。RabbitMQ 是一個由 Erlang 語言開發的AMQP(高級消息隊列協議)的開源實現,其內部結構以下:html

RabbitMQ 內部結構

RabbitMQ做爲一個消息代理,主要和消息打交道,負責接收並轉發消息。RabbitMQ提供了可靠的消息機制、跟蹤機制和靈活的消息路由,支持消息集羣和分佈式部署。適用於排隊算法、秒殺活動、消息分發、異步處理、數據同步、處理耗時任務、CQRS等應用場景。git

下面咱們就來學習下RabbitMQ。github

2. 環境搭建

本文主要基於Windows下使用Vs Code 基於.net core進行demo演示。開始以前咱們須要準備好如下環境。算法

  • 安裝Erlang運行環境
    下載安裝Erlang
  • 安裝RabbitMQ
    下載安裝Windows版本的RabbitMQ
  • 啓動RabbitMQ Server
    點擊Windows開始按鈕,輸入RabbitMQ找到RabbitMQ Comman Prompt,以管理員身份運行。
  • 依次執行如下命令啓動RabbitMQ服務
rabbitmq-service install
rabbitmq-service enable
rabbitmq-service start
  • 執行rabbitmqlctl status檢查RabbitMQ狀態
  • 安裝管理平臺插件
    執行rabbitmq-plugins enable rabbitmq_management便可成功安裝,使用默認帳號密碼(guest/guest)登陸http://localhost:15672/便可。

3. Hello RabbitMQ

在開始以前咱們先來了解下消息模型:
消息流
消費者(consumer)訂閱某個隊列。生產者(producer)建立消息,而後發佈到隊列(queue)中,隊列再將消息發送到監聽的消費者。windows

下面咱們咱們經過demo來了解RabbitMQ的基本用法。緩存

3.1.消息的發送和接收

建立RabbitMQ文件夾,打開命令提示符,分別建立兩個控制檯項目Send、Receive。服務器

dotnet new console --name Send //建立發送端控制檯應用
cd Send //進入Send目錄
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢復包
dotnet new console --name Receive //建立接收端控制檯應用
cd Receive //進入Receive目錄
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢復包

咱們先來添加消息發送端邏輯:網絡

//Send.cs 
public static void Main(string[] args)
{
    //1.1.實例化鏈接工廠
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 創建鏈接
    using (var connection = factory.CreateConnection())
    {
        //3. 建立信道
        using (var channel = connection.CreateModel())
        {
            //4. 申明隊列
            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            //5. 構建byte消息數據包
            string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
            var body = Encoding.UTF8.GetBytes(message);
            //6. 發送數據包
            channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }
    }
}

再來完善消息接收端邏輯:異步

//Receive.cs  省略部分代碼
public static void Main()
{
    //1.實例化鏈接工廠
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 創建鏈接
    using (var connection = factory.CreateConnection())
    {
        //3. 建立信道
        using (var channel = connection.CreateModel())
        {
            //4. 申明隊列
            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            //5. 構造消費者實例
            var consumer = new EventingBasicConsumer(channel);
            //6. 綁定消息接收後的事件委託
            consumer.Received += (model, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);
                Console.WriteLine(" [x] Received {0}", message);
                Thread.Sleep(6000);//模擬耗時
                Console.WriteLine (" [x] Done");
            };
            //7. 啓動消費者
            channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

先運行消息接收端,再運行消息發送端,結果以下圖。分佈式

運行結果

從上面的代碼中能夠看出,發送端和消費端的代碼前4步都是同樣的。主要的區別在於發送端調用channel.BasicPublish方法發送消息;而接收端須要實例化一個EventingBasicConsumer實例來進行消息處理邏輯。另一點須要注意的是:消息接收端和發送端的隊列名稱(queue)必須保持一致,這裏指定的隊列名稱爲hello。

3.2. 循環調度

使用工做隊列的好處就是它可以並行的處理隊列。若是堆積了不少任務,咱們只須要添加更多的工做者(workers)就能夠了。咱們先啓動兩個接收端,等待消息接收,再啓動一個發送端進行消息發送。

消息分發

咱們增長運行一個消費端後的運行結果:

循環調度

從圖中可知,咱們循環發送4條信息,兩個消息接收端按順序被循環分配。
默認狀況下,RabbitMQ將按順序將每條消息發送給下一個消費者。平均每一個消費者將得到相同數量的消息。這種分發消息的方式叫作循環(round-robin)。

3.3. 消息確認

按照咱們上面的demo,一旦RabbitMQ將消息發送到消費端,消息就會當即從內存中移出,不管消費端是否處理完成。在這種狀況下,消息就會丟失。

爲了確保一個消息永遠不會丟失,RabbitMQ支持消息確認(message acknowledgments)。當消費端接收消息而且處理完成後,會發送一個ack(消息確認)信號到RabbitMQ,RabbitMQ接收到這個信號後,就能夠刪除掉這條已經處理的消息任務。但若是消費端掛掉了(好比,通道關閉、鏈接丟失等)沒有發送ack信號。RabbitMQ就會明白某個消息沒有正常處理,RabbitMQ將會從新將消息入隊,若是有另一個消費端在線,就會快速的從新發送到另一個消費端。

RabbitMQ中沒有消息超時的概念,只有當消費端關閉或奔潰時,RabbitMQ纔會從新分發消息。

微調下Receive中的代碼邏輯:

//5. 構造消費者實例
 var consumer = new EventingBasicConsumer(channel);
 //6. 綁定消息接收後的事件委託
 consumer.Received += (model, ea) =>
 {
     var message = Encoding.UTF8.GetString(ea.Body);
     Console.WriteLine(" [x] Received {0}", message);
     Thread.Sleep(6000);//模擬耗時
     Console.WriteLine(" [x] Done");
     // 7. 發送消息確認信號(手動消息確認)
     channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
 };
 //8. 啓動消費者
 //autoAck:true;自動進行消息確認,當消費端接收到消息後,就自動發送ack信號,無論消息是否正確處理完畢
 //autoAck:false;關閉自動消息確認,經過調用BasicAck方法手動進行消息確認
 channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);

主要改動的是將 autoAck:true修改成autoAck:fasle,以及在消息處理完畢後手動調用BasicAck方法進行手動消息確認。

從圖中可知,消息發送端連續發送4條消息,其中消費端1先被分配處理第一條消息,消費端2被循環分配第二條消息,第三條消息因爲沒有空閒消費者仍然在隊列中。
在消費端2未處理完第一條消息以前,手動中斷(ctrl+c)。咱們能夠發現RabbitMQ在下一次分發時,會優先將被中斷的消息分發給消費端1處理。

3.4. 消息持久化

消息確認確保了即便消費端異常,消息也不會丟失可以被從新分發處理。可是若是RabbitMQ服務端異常,消息依然會丟失。除非咱們指定durable:true,不然當RabbitMQ退出或奔潰時,消息將依然會丟失。經過指定durable:true,並指定Persistent=true,來告知RabbitMQ將消息持久化。

//send.cs
//4. 申明隊列(指定durable:true,告知rabbitmq對消息進行持久化)
channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments
//將消息標記爲持久性 - 將IBasicProperties.SetPersistent設置爲true
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//5. 構建byte消息數據包
string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
//6. 發送數據包(指定basicProperties)
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body);

將消息標記爲持久性不能徹底保證消息不會丟失。雖然它告訴RabbitMQ將消息保存到磁盤,可是當RabbitMQ接受消息而且尚未保存時​​,仍然有一個很短的時間窗口。RabbitMQ 可能只是將消息保存到了緩存中,並無將其寫入到磁盤上。持久化是不可以必定保證的,可是對於一個簡單任務隊列來講已經足夠。若是須要確保消息隊列的持久化,可使用publisher confirms.

3.5. 公平分發

RabbitMQ的消息分發默認按照消費端的數量,按順序循環分發。這樣僅是確保了消費端被平均分發消息的數量,但卻忽略了消費端的閒忙狀況。這就可能出現某個消費端一直處理耗時任務處於阻塞狀態,某個消費端一直處理通常任務處於空置狀態,而只是它們分配的任務數量同樣。

但咱們能夠經過channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
設置prefetchCount : 1來告知RabbitMQ,在未收到消費端的消息確認時,再也不分發消息,也就確保了當消費端處於忙碌狀態時,再也不分配任務。

//Receive.cs
//4. 申明隊列
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
//設置prefetchCount : 1來告知RabbitMQ,在未收到消費端的消息確認時,再也不分發消息,也就確保了當消費端處於忙碌狀態時
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

這時你須要注意的是若是全部的消費端都處於忙碌狀態,你的隊列可能會被塞滿。你須要注意這一點,要麼添加更多的消費端,要麼採起其餘策略。

4. Exchange

細心的你也許發現上面的demo,生產者和消費者直接是經過相同隊列名稱進行匹配銜接的。消費者訂閱某個隊列,生產者建立消息發佈到隊列中,隊列再將消息轉發到訂閱的消費者。這樣就會有一個侷限性,即消費者一次只能發送消息到某一個隊列。

那消費者如何才能發送消息到多個消息隊列呢?
RabbitMQ提供了Exchange,它相似於路由器的功能,它用於對消息進行路由,將消息發送到多個隊列上。Exchange一方面從生產者接收消息,另外一方面將消息推送到隊列。但exchange必須知道如何處理接收到的消息,是將其附加到特定隊列仍是附加到多個隊列,仍是直接忽略。而這些規則由exchange type定義,exchange的原理以下圖所示。
Exchange

常見的exchange type 有如下幾種:

  • direct(明確的路由規則:消費端綁定的隊列名稱必須和消息發佈時指定的路由名稱一致)
  • topic (模式匹配的路由規則:支持通配符)
  • fanout (消息廣播,將消息分發到exchange上綁定的全部隊列上)

下面咱們就來一一這介紹它們的用法。

4.1 fanout

本着先易後難的思想,咱們先來了解下fanout的廣播路由機制。fanout的路由機制以下圖,即發送到 fanout 類型exchange的消息都會分發到全部綁定該exchange的隊列上去。

fanout 路由機制

生產者示例代碼:

// 生成隨機隊列名稱
var queueName = channel.QueueDeclare().QueueName;
//使用fanout exchange type,指定exchange名稱
channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//發佈到指定exchange,fanout類型無需指定routingKey
channel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: null, body: body);

消費者示例代碼:

//申明fanout類型exchange
channel.ExchangeDeclare (exchange: "fanoutEC", type: "fanout");
//申明隨機隊列名稱
var queuename = channel.QueueDeclare ().QueueName;
//綁定隊列到指定fanout類型exchange,無需指定路由鍵
channel.QueueBind (queue : queuename, exchange: "fanoutEC", routingKey: "");

4.2. direct

direct相對於fanout就屬於徹底匹配、單播的模式,路由機制以下圖,即隊列名稱和消息發送時指定的路由徹底匹配時,消息纔會發送到指定隊列上。
direct路由機制

生產者示例代碼:

// 生成隨機隊列名稱
var queueName = channel.QueueDeclare().QueueName;
//使用direct exchange type,指定exchange名稱
channel.ExchangeDeclare(exchange: "directEC", type: "direct");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//發佈到direct類型exchange,必須指定routingKey
channel.BasicPublish(exchange: "directEC", routingKey: "green", basicProperties: null, body: body);

消費者示例代碼:

//申明direct類型exchange
channel.ExchangeDeclare (exchange: "directEC", type: "direct");
//綁定隊列到direct類型exchange,需指定路由鍵routingKey
channel.QueueBind (queue : green, exchange: "directEC", routingKey: "green");

4.3. topic

topic是direct的升級版,是一種模式匹配的路由機制。它支持使用兩種通配符來進行模式匹配:符號#和符號*。其中*匹配一個單詞, #則表示匹配0個或多個單詞,單詞之間用.分割。以下圖所示。
topic路由機制

生產者示例代碼:

// 生成隨機隊列名稱
var queueName = channel.QueueDeclare().QueueName;
//使用topic exchange type,指定exchange名稱
channel.ExchangeDeclare(exchange: "topicEC", type: "topic");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//發佈到topic類型exchange,必須指定routingKey
channel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: null, body: body);

消費者示例代碼:

//申明topic類型exchange
channel.ExchangeDeclare (exchange: "topicEC", type: "topic");
//申明隨機隊列名稱
var queuename = channel.QueueDeclare ().QueueName;
//綁定隊列到topic類型exchange,需指定路由鍵routingKey
channel.QueueBind (queue : queuename, exchange: "topicEC", routingKey: "#.*.fast");

5. RPC

RPC——Remote Procedure Call,遠程過程調用。
那RabbitMQ如何進行遠程調用呢?示意圖以下:
RPC機制
第一步,主要是進行遠程調用的客戶端須要指定接收遠程回調的隊列,並申明消費者監聽此隊列。
第二步,遠程調用的服務端除了要申明消費端接收遠程調用請求外,還要將結果發送到客戶端用來監聽的結果的隊列中去。

遠程調用客戶端:

//申明惟一guid用來標識這次發送的遠程調用請求
 var correlationId = Guid.NewGuid().ToString();
 //申明須要監聽的回調隊列
 var replyQueue = channel.QueueDeclare().QueueName;
 var properties = channel.CreateBasicProperties();
 properties.ReplyTo = replyQueue;//指定回調隊列
 properties.CorrelationId = correlationId;//指定消息惟一標識
 string number = args.Length > 0 ? args[0] : "30";
 var body = Encoding.UTF8.GetBytes(number);
 //發佈消息
 channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body);
 Console.WriteLine($"[*] Request fib({number})");
 // //建立消費者用於處理消息回調(遠程調用返回結果)
 var callbackConsumer = new EventingBasicConsumer(channel);
 channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer);
 callbackConsumer.Received += (model, ea) =>
 {
      //僅當消息回調的ID與發送的ID一致時,說明遠程調用結果正確返回。
     if (ea.BasicProperties.CorrelationId == correlationId)
     {
         var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}";
         Console.WriteLine($"[x]: {responseMsg}");
     }
 };

遠程調用服務端:

//申明隊列接收遠程調用請求
channel.QueueDeclare(queue: "rpc_queue", durable: false,
    exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine("[*] Waiting for message.");
//請求處理邏輯
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);
    int n = int.Parse(message);
    Console.WriteLine($"Receive request of Fib({n})");
    int result = Fib(n);
    //從請求的參數中獲取請求的惟一標識,在消息回傳時一樣綁定
    var properties = ea.BasicProperties;
    var replyProerties = channel.CreateBasicProperties();
    replyProerties.CorrelationId = properties.CorrelationId;
    //將遠程調用結果發送到客戶端監聽的隊列上
    channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo,
        basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString()));
    //手動發回消息確認
    channel.BasicAck(ea.DeliveryTag, false);
    Console.WriteLine($"Return result: Fib({n})= {result}");
};
channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);

6. 總結

基於上面的demo和對幾種不一樣exchange路由機制的學習,咱們發現RabbitMQ主要是涉及到如下幾個核心概念:

  1. Publisher:生產者,消息的發送方。
  2. Connection:網絡鏈接。
  3. Channel:信道,多路複用鏈接中的一條獨立的雙向數據流通道。
  4. Exchange:交換器(路由器),負責消息的路由到相應隊列。
  5. Binding:隊列與交換器間的關聯綁定。消費者將關注的隊列綁定到指定交換器上,以便Exchange能準確分發消息到指定隊列。
  6. Queue:隊列,消息的緩衝存儲區。
  7. Virtual Host:虛擬主機,虛擬主機提供資源的邏輯分組和分離。包含鏈接,交換,隊列,綁定,用戶權限,策略等。
  8. Broker:消息隊列的服務器實體。
  9. Consumer:消費者,消息的接收方。

此次做爲入門就講到這裏,下次咱們來說解下EventBus + RabbitMQ如何實現事件的分發。

參考資料:
RabbitMQ Tutorials
Demo路徑——RabbitMQ

相關文章
相關標籤/搜索