RabbitMQ一個簡單可靠的方案(.Net Core實現)

前言

  最近須要使用到消息隊列相關技術,因而從新接觸RabbitMQ。其中遇到了很多可靠性方面的問題,概括了一下,大概有如下幾種:html

  1. 臨時異常,如數據庫網絡閃斷、http請求臨時失效等;git

  2. 時序異常,如A任務依賴於B任務,但可能因爲調度或消費者分配的緣由,致使A任務先於B任務執行;github

  3. 業務異常,因爲系統測試不充分,上線後發現某幾個或某幾種消息沒法正常處理;數據庫

  4. 系統異常,業務中間件沒法正常操做,如網絡中斷、數據庫宕機等;api

  5. 非法異常,一些僞造、攻擊類型的消息。網絡

 

  針對這些異常,我採用了一種基於消息審計、消息重試、消息檢索、消息重發的方案。ide

 

方案

 

 

  1. 消息均使用Exchange進行通信,方式能夠是direct或topic,不建議fanout。函數

  2. 根據業務在Exchange下分配一個或多個Queue,同時設置一個審計線程(Audit)監聽全部Queue,用於記錄消息到MongoDB,同時又不阻塞正常業務處理測試

  3. 生產者(Publisher)在發佈消息時,基於AMQP協議,生成消息標識MessageId和時間戳Timestamp,根據消息業務添加頭信息Headers便於跟蹤。ui

  

  4. 消費者(Comsumer)消息處理失敗時,則把消息發送到重試交換機(Retry Exchange),並設置過時(重試)時間及更新重試次數;若是超太重試次數則刪除消息。

  5. 重試交換機Exchange設置死信交換機(Dead Letter Exchange),消息過時後自動轉發到業務交換機(Exchange)。

  6. WebApi能夠根據消息標識MessageId、時間戳Timestamp以及頭信息Headers在MongoDB中對消息進行檢索或重試。

   

  注:選擇MongoDB做爲存儲介質的主要緣由是其對頭信息(headers)的動態查詢支持較好,同等的替代產品還能夠是Elastic Search這些。

 

生產者(Publisher)

  1. 設置斷線自動恢復

  var factory = new ConnectionFactory
  {
      Uri = new Uri("amqp://guest:guest@192.168.132.137:5672"),
      AutomaticRecoveryEnabled = true
  };

 

  2. 定義Exchange,模式爲direct

  channel.ExchangeDeclare("Exchange", "direct");

 

  3. 根據業務定義QueueA和QueueB

  channel.QueueDeclare("QueueA", true, false, false);
  channel.QueueBind("QueueA", "Exchange", "RouteA");

  channel.QueueDeclare("QueueB", true, false, false);
  channel.QueueBind("QueueB", "Exchange", "RouteB");

 

  4. 啓動消息發送確認機制,即須要收到RabbitMQ服務端的確認消息

  channel.ConfirmSelect();

 

  5. 設置消息持久化

  var properties = channel.CreateBasicProperties();
  properties.Persistent = true;

 

  6. 生成消息標識MessageId、時間戳Timestamp以及頭信息Headers

  properties.MessageId = Guid.NewGuid().ToString("N");
  properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());
  properties.Headers = new Dictionary<string, object>
  {
      { "key", "value" + i}
  };

 

  7. 發送消息,偶數序列發送到QueueA(RouteA),奇數序列發送到QueueB(RouteB)

  channel.BasicPublish("Exchange", i % 2 == 0 ? "RouteA" : "RouteB", properties, body);

 

  8. 肯定收到RabbitMQ服務端的確認消息

  var isOk = channel.WaitForConfirms();
  if (!isOk)
  {
      throw new Exception("The message is not reached to the server!");
  }

 

  完整代碼

var factory = new ConnectionFactory
{
    Uri = new Uri("amqp://guest:guest@localhost:5672"),
    AutomaticRecoveryEnabled = true
};

using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.ExchangeDeclare("Exchange", "direct");

        channel.QueueDeclare("QueueA", true, false, false);
        channel.QueueBind("QueueA", "Exchange", "RouteA");

        channel.QueueDeclare("QueueB", true, false, false);
        channel.QueueBind("QueueB", "Exchange", "RouteB");

        channel.ConfirmSelect();

        for (var i = 0; i < 2; i++)
        {
            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;
            properties.MessageId = Guid.NewGuid().ToString("N");
            properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());

            properties.Headers = new Dictionary<string, object>
            {
                { "key", "value" + i}
            };

            var message = "Hello " + i;
            var body = Encoding.UTF8.GetBytes(message);

            channel.BasicPublish("Exchange", i % 2 == 0 ? "RouteA" : "RouteB", properties, body);
            var isOk = channel.WaitForConfirms();
            if (!isOk)
            {
                throw new Exception("The message is not reached to the server!");
            }
        }
    }
}
View Code

 

  效果:QueueA和QueueB各一條消息,QueueAudit兩條消息

 

   注:Exchange下必須先聲明Queue才能接收到消息,上述代碼並無QueueAudit的聲明;須要手動聲明,或者先執行下面的消費者程序進行聲明。

 

正常消費者(ComsumerA)

  1. 設置預取消息,避免公平輪訓問題,能夠根據須要設置預取消息數,這裏是1

  _channel.BasicQos(0, 1, false);

  

 

  2. 聲明Exchange和Queue

  _channel.ExchangeDeclare("Exchange", "direct");
  _channel.QueueDeclare("QueueA", true, false, false);
  _channel.QueueBind("QueueA", "Exchange", "RouteA");

 

  3. 編寫回調函數

  var consumer = new EventingBasicConsumer(_channel);
  consumer.Received += (model, ea) =>
  {
      //The QueueA is always successful.
      try
      {
          _channel.BasicAck(ea.DeliveryTag, false);
      }
      catch (AlreadyClosedException ex)
      {
          _logger.LogCritical(ex, "RabbitMQ is closed!");
      }
  };

  _channel.BasicConsume("QueueA", false, consumer);

  注:設置了RabbitMQ的斷線恢復機制,當RabbitMQ鏈接不可用時,與MQ通信的操做會拋出AlreadyClosedException的異常,致使主線程退出,哪怕鏈接恢復了,程序也沒法恢復,所以,須要捕獲處理該異常。

 

異常消費者(ComsumerB)

  1. 設置預取消息

  _channel.BasicQos(0, 1, false);

 

  2. 聲明Exchange和Queue

  _channel.ExchangeDeclare("Exchange", "direct");
  _channel.QueueDeclare("QueueB", true, false, false);   _channel.QueueBind("QueueB", "Exchange", "RouteB");

 

  3.  設置死信交換機(Dead Letter Exchange)

  var retryDic = new Dictionary<string, object>
  {
      {"x-dead-letter-exchange", "Exchange"},
      {"x-dead-letter-routing-key", "RouteB"}
  };

  _channel.ExchangeDeclare("Exchange_Retry", "direct");
  _channel.QueueDeclare("QueueB_Retry", true, false, false, retryDic);
  _channel.QueueBind("QueueB_Retry", "Exchange_Retry", "RouteB_Retry");

 

  4. 重試設置,3次重試;第一次1秒,第二次10秒,第三次30秒

  _retryTime = new List<int>
  {
      1 * 1000,
      10 * 1000,
      30 * 1000
  };

 

  5. 獲取當前重試次數

  var retryCount = 0;
  if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey("retryCount"))
  {
      retryCount = (int)ea.BasicProperties.Headers["retryCount"];
      _logger.LogWarning($"[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started...");
  }

 

  6. 發生異常,判斷是否能夠重試

  private bool CanRetry(int retryCount)
  {
      return retryCount <= _retryTime.Count - 1;
  }

 

  7. 能夠重試,則啓動重試機制

  private void SetupRetry(int retryCount, string retryExchange, string retryRoute, BasicDeliverEventArgs ea)
  {
      var body = ea.Body;
      var properties = ea.BasicProperties;
      properties.Headers = properties.Headers ?? new Dictionary<string, object>();
      properties.Headers["retryCount"] = retryCount;
      properties.Expiration = _retryTime[retryCount].ToString();

      try
      {
          _channel.BasicPublish(retryExchange, retryRoute, properties, body);
      }
      catch (AlreadyClosedException ex)
      {
          _logger.LogCritical(ex, "RabbitMQ is closed!");
      }
  }

 

  完整代碼

    _channel.BasicQos(0, 1, false);
    
    _channel.ExchangeDeclare("Exchange", "direct");
    _channel.QueueDeclare("QueueB", true, false, false);
    _channel.QueueBind("QueueB", "Exchange", "RouteB");
    
    var retryDic = new Dictionary<string, object>
    {
        {"x-dead-letter-exchange", "Exchange"},
        {"x-dead-letter-routing-key", "RouteB"}
    };
    
    _channel.ExchangeDeclare("Exchange_Retry", "direct");
    _channel.QueueDeclare("QueueB_Retry", true, false, false, retryDic);
    _channel.QueueBind("QueueB_Retry", "Exchange_Retry", "RouteB_Retry");
    
    var consumer = new EventingBasicConsumer(_channel);
    consumer.Received += (model, ea) =>
    {
        //The QueueB is always failed.
        bool canAck;
        var retryCount = 0;
        if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey("retryCount"))
        {
            retryCount = (int)ea.BasicProperties.Headers["retryCount"];
            _logger.LogWarning($"[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started...");
        }
    
        try
        {
            Handle();
            canAck = true;
        }
        catch (Exception ex)
        {
            _logger.LogCritical(ex, "Error!");
            if (CanRetry(retryCount))
            {
                SetupRetry(retryCount, "Exchange_Retry", "RouteB_Retry", ea);
                canAck = true;
            }
            else
            {
                canAck = false;
            }
        }
    
        try
        {
            if (canAck)
            {
                _channel.BasicAck(ea.DeliveryTag, false);
            }
            else
            {
                _channel.BasicNack(ea.DeliveryTag, false, false);
            }
        }
        catch (AlreadyClosedException ex)
        {
            _logger.LogCritical(ex, "RabbitMQ is closed!");
        }
    };
    
    _channel.BasicConsume("QueueB", false, consumer);
View Code

 

審計消費者(Audit Comsumer)

  1. 聲明Exchange和Queue

  _channel.ExchangeDeclare("Exchange", "direct");

  _channel.QueueDeclare("QueueAudit", true, false, false);
  _channel.QueueBind("QueueAudit", "Exchange", "RouteA");
  _channel.QueueBind("QueueAudit", "Exchange", "RouteB");

 

  2. 排除死信Exchange轉發過來的重複消息

  if (ea.BasicProperties.Headers == null || !ea.BasicProperties.Headers.ContainsKey("x-death"))
  {
      ...
  }

 

  3. 生成消息實體

  var message = new Message
  {
      MessageId = ea.BasicProperties.MessageId,
      Body = ea.Body,
      Exchange = ea.Exchange,
      Route = ea.RoutingKey
  };

 

  4. RabbitMQ會用bytes來存儲字符串,所以,要把頭中bytes轉回字符串

  if (ea.BasicProperties.Headers != null)
  {
      var headers = new Dictionary<string, object>();

      foreach (var header in ea.BasicProperties.Headers)
      {
          if (header.Value is byte[] bytes)
          {
              headers[header.Key] = Encoding.UTF8.GetString(bytes);
          }
          else
          {
              headers[header.Key] = header.Value;
          }
      }

      message.Headers = headers;
  }

 

  5. 把Unix格式的Timestamp轉成UTC時間

  if (ea.BasicProperties.Timestamp.UnixTime > 0)
  {
      message.TimestampUnix = ea.BasicProperties.Timestamp.UnixTime;
      var offset = DateTimeOffset.FromUnixTimeMilliseconds(ea.BasicProperties.Timestamp.UnixTime);
      message.Timestamp = offset.UtcDateTime;
  }

 

  6. 消息存入MongoDB

  _mongoDbContext.Collection<Message>().InsertOne(message, cancellationToken: cancellationToken);

 

  MongoDB記錄:

  

 

  重試記錄:

  

 

消息檢索及重發(WebApi)

  1. 經過消息Id檢索消息

  

 

  2. 經過頭消息檢索消息

  

  

 

  3. 消息重發,會從新生成MessageId

  

  

 

Ack,Nack,Reject的關係

  1. 消息處理成功,執行Ack,RabbitMQ會把消息從隊列中刪除。

  2. 消息處理失敗,執行Nack或者Reject:

  a) 當requeue=true時,消息會從新回到隊列,而後當前消費者會立刻再取回這條消息;

  b) 當requeue=false時,若是Exchange有設置Dead Letter Exchange,則消息會去到Dead Letter Exchange;

  c) 當requeue=false時,若是Exchange沒設置Dead Letter Exchange,則消息從隊列中刪除,效果與Ack相同。

 

  3. Nack與Reject的區別在於:Nack能夠批量操做,Reject只能單條操做。

  

RabbitMQ自動恢復

鏈接(Connection)恢復

  1. 重連(Reconnect)

  2. 恢復鏈接監聽(Listeners)

  3. 從新打開通道(Channels)

  4. 恢復通道監聽(Listeners)

  5. 恢復basic.qos,publisher confirms以及transaction設置

   

拓撲(Topology)恢復

  1. 從新聲明交換機(Exchanges)

  2. 從新聲明隊列(Queues)

  3. 恢復全部綁定(Bindings)

  4. 恢復全部消費者(Consumers)

 

異常處理機制

  1. 臨時異常,如數據庫網絡閃斷、http請求臨時失效等

  經過短期重試(如1秒後)的方式處理,也能夠考慮Nack/Reject來實現重試(時效性更高)。

 

  2. 時序異常,如A任務依賴於B任務,但可能因爲調度或消費者分配的緣由,致使A任務先於B任務執行

  經過長時間重試(如1分鐘、30分鐘、1小時、1天等),等待B任務先執行完的方式處理。

  

  3. 業務異常,因爲系統測試不充分,上線後發現某幾個或某幾種消息沒法正常處理

  等系統修正後,經過消息重發的方式處理。

 

  4. 系統異常,業務中間件沒法正常操做,如網絡中斷、數據庫宕機等

  等系統恢復後,經過消息重發的方式處理。

 

  5. 非法異常,一些僞造、攻擊類型的消息

  屢次重試失敗後,消息從隊列中被刪除,也能夠針對此業務作進一步處理。

 

源碼地址

https://github.com/ErikXu/RabbitMesage

相關文章
相關標籤/搜索