最近須要使用到消息隊列相關技術,因而從新接觸RabbitMQ。其中遇到了很多可靠性方面的問題,概括了一下,大概有如下幾種:html
1. 臨時異常,如數據庫網絡閃斷、http請求臨時失效等;git
2. 時序異常,如A任務依賴於B任務,但可能因爲調度或消費者分配的緣由,致使A任務先於B任務執行;github
3. 業務異常,因爲系統測試不充分,上線後發現某幾個或某幾種消息沒法正常處理;數據庫
4. 系統異常,業務中間件沒法正常操做,如網絡中斷、數據庫宕機等;api
5. 非法異常,一些僞造、攻擊類型的消息。網絡
針對這些異常,我採用了一種基於消息審計、消息重試、消息檢索、消息重發的方案。ide
1. 消息均使用Exchange進行通信,方式能夠是direct或topic,不建議fanout。函數
2. 根據業務在Exchange下分配一個或多個Queue,同時設置一個審計線程(Audit)監聽全部Queue,用於記錄消息到MongoDB,同時又不阻塞正常業務處理。測試
3. 生產者(Publisher)在發佈消息時,基於AMQP協議,生成消息標識MessageId和時間戳Timestamp,根據消息業務添加頭信息Headers便於跟蹤。ui
4. 消費者(Comsumer)消息處理失敗時,則把消息發送到重試交換機(Retry Exchange),並設置過時(重試)時間及更新重試次數;若是超太重試次數則刪除消息。
5. 重試交換機Exchange設置死信交換機(Dead Letter Exchange),消息過時後自動轉發到業務交換機(Exchange)。
6. WebApi能夠根據消息標識MessageId、時間戳Timestamp以及頭信息Headers在MongoDB中對消息進行檢索或重試。
注:選擇MongoDB做爲存儲介質的主要緣由是其對頭信息(headers)的動態查詢支持較好,同等的替代產品還能夠是Elastic Search這些。
1. 設置斷線自動恢復
var factory = new ConnectionFactory { Uri = new Uri("amqp://guest:guest@192.168.132.137:5672"), AutomaticRecoveryEnabled = true };
2. 定義Exchange,模式爲direct
channel.ExchangeDeclare("Exchange", "direct");
3. 根據業務定義QueueA和QueueB
channel.QueueDeclare("QueueA", true, false, false); channel.QueueBind("QueueA", "Exchange", "RouteA"); channel.QueueDeclare("QueueB", true, false, false); channel.QueueBind("QueueB", "Exchange", "RouteB");
4. 啓動消息發送確認機制,即須要收到RabbitMQ服務端的確認消息
channel.ConfirmSelect();
5. 設置消息持久化
var properties = channel.CreateBasicProperties(); properties.Persistent = true;
6. 生成消息標識MessageId、時間戳Timestamp以及頭信息Headers
properties.MessageId = Guid.NewGuid().ToString("N"); properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()); properties.Headers = new Dictionary<string, object> { { "key", "value" + i} };
7. 發送消息,偶數序列發送到QueueA(RouteA),奇數序列發送到QueueB(RouteB)
channel.BasicPublish("Exchange", i % 2 == 0 ? "RouteA" : "RouteB", properties, body);
8. 肯定收到RabbitMQ服務端的確認消息
var isOk = channel.WaitForConfirms(); if (!isOk) { throw new Exception("The message is not reached to the server!"); }
完整代碼
var factory = new ConnectionFactory { Uri = new Uri("amqp://guest:guest@localhost:5672"), AutomaticRecoveryEnabled = true }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare("Exchange", "direct"); channel.QueueDeclare("QueueA", true, false, false); channel.QueueBind("QueueA", "Exchange", "RouteA"); channel.QueueDeclare("QueueB", true, false, false); channel.QueueBind("QueueB", "Exchange", "RouteB"); channel.ConfirmSelect(); for (var i = 0; i < 2; i++) { var properties = channel.CreateBasicProperties(); properties.Persistent = true; properties.MessageId = Guid.NewGuid().ToString("N"); properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()); properties.Headers = new Dictionary<string, object> { { "key", "value" + i} }; var message = "Hello " + i; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("Exchange", i % 2 == 0 ? "RouteA" : "RouteB", properties, body); var isOk = channel.WaitForConfirms(); if (!isOk) { throw new Exception("The message is not reached to the server!"); } } } }
效果:QueueA和QueueB各一條消息,QueueAudit兩條消息
注:Exchange下必須先聲明Queue才能接收到消息,上述代碼並無QueueAudit的聲明;須要手動聲明,或者先執行下面的消費者程序進行聲明。
1. 設置預取消息,避免公平輪訓問題,能夠根據須要設置預取消息數,這裏是1
_channel.BasicQos(0, 1, false);
2. 聲明Exchange和Queue
_channel.ExchangeDeclare("Exchange", "direct"); _channel.QueueDeclare("QueueA", true, false, false); _channel.QueueBind("QueueA", "Exchange", "RouteA");
3. 編寫回調函數
var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => { //The QueueA is always successful. try { _channel.BasicAck(ea.DeliveryTag, false); } catch (AlreadyClosedException ex) { _logger.LogCritical(ex, "RabbitMQ is closed!"); } }; _channel.BasicConsume("QueueA", false, consumer);
注:設置了RabbitMQ的斷線恢復機制,當RabbitMQ鏈接不可用時,與MQ通信的操做會拋出AlreadyClosedException的異常,致使主線程退出,哪怕鏈接恢復了,程序也沒法恢復,所以,須要捕獲處理該異常。
1. 設置預取消息
_channel.BasicQos(0, 1, false);
2. 聲明Exchange和Queue
_channel.ExchangeDeclare("Exchange", "direct");
_channel.QueueDeclare("QueueB", true, false, false); _channel.QueueBind("QueueB", "Exchange", "RouteB");
3. 設置死信交換機(Dead Letter Exchange)
var retryDic = new Dictionary<string, object> { {"x-dead-letter-exchange", "Exchange"}, {"x-dead-letter-routing-key", "RouteB"} }; _channel.ExchangeDeclare("Exchange_Retry", "direct"); _channel.QueueDeclare("QueueB_Retry", true, false, false, retryDic); _channel.QueueBind("QueueB_Retry", "Exchange_Retry", "RouteB_Retry");
4. 重試設置,3次重試;第一次1秒,第二次10秒,第三次30秒
_retryTime = new List<int> { 1 * 1000, 10 * 1000, 30 * 1000 };
5. 獲取當前重試次數
var retryCount = 0; if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey("retryCount")) { retryCount = (int)ea.BasicProperties.Headers["retryCount"]; _logger.LogWarning($"[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started..."); }
6. 發生異常,判斷是否能夠重試
private bool CanRetry(int retryCount) { return retryCount <= _retryTime.Count - 1; }
7. 能夠重試,則啓動重試機制
private void SetupRetry(int retryCount, string retryExchange, string retryRoute, BasicDeliverEventArgs ea) { var body = ea.Body; var properties = ea.BasicProperties; properties.Headers = properties.Headers ?? new Dictionary<string, object>(); properties.Headers["retryCount"] = retryCount; properties.Expiration = _retryTime[retryCount].ToString(); try { _channel.BasicPublish(retryExchange, retryRoute, properties, body); } catch (AlreadyClosedException ex) { _logger.LogCritical(ex, "RabbitMQ is closed!"); } }
完整代碼
_channel.BasicQos(0, 1, false); _channel.ExchangeDeclare("Exchange", "direct"); _channel.QueueDeclare("QueueB", true, false, false); _channel.QueueBind("QueueB", "Exchange", "RouteB"); var retryDic = new Dictionary<string, object> { {"x-dead-letter-exchange", "Exchange"}, {"x-dead-letter-routing-key", "RouteB"} }; _channel.ExchangeDeclare("Exchange_Retry", "direct"); _channel.QueueDeclare("QueueB_Retry", true, false, false, retryDic); _channel.QueueBind("QueueB_Retry", "Exchange_Retry", "RouteB_Retry"); var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => { //The QueueB is always failed. bool canAck; var retryCount = 0; if (ea.BasicProperties.Headers != null && ea.BasicProperties.Headers.ContainsKey("retryCount")) { retryCount = (int)ea.BasicProperties.Headers["retryCount"]; _logger.LogWarning($"[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {++retryCount} retry started..."); } try { Handle(); canAck = true; } catch (Exception ex) { _logger.LogCritical(ex, "Error!"); if (CanRetry(retryCount)) { SetupRetry(retryCount, "Exchange_Retry", "RouteB_Retry", ea); canAck = true; } else { canAck = false; } } try { if (canAck) { _channel.BasicAck(ea.DeliveryTag, false); } else { _channel.BasicNack(ea.DeliveryTag, false, false); } } catch (AlreadyClosedException ex) { _logger.LogCritical(ex, "RabbitMQ is closed!"); } }; _channel.BasicConsume("QueueB", false, consumer);
1. 聲明Exchange和Queue
_channel.ExchangeDeclare("Exchange", "direct"); _channel.QueueDeclare("QueueAudit", true, false, false); _channel.QueueBind("QueueAudit", "Exchange", "RouteA"); _channel.QueueBind("QueueAudit", "Exchange", "RouteB");
2. 排除死信Exchange轉發過來的重複消息
if (ea.BasicProperties.Headers == null || !ea.BasicProperties.Headers.ContainsKey("x-death")) { ... }
3. 生成消息實體
var message = new Message { MessageId = ea.BasicProperties.MessageId, Body = ea.Body, Exchange = ea.Exchange, Route = ea.RoutingKey };
4. RabbitMQ會用bytes來存儲字符串,所以,要把頭中bytes轉回字符串
if (ea.BasicProperties.Headers != null) { var headers = new Dictionary<string, object>(); foreach (var header in ea.BasicProperties.Headers) { if (header.Value is byte[] bytes) { headers[header.Key] = Encoding.UTF8.GetString(bytes); } else { headers[header.Key] = header.Value; } } message.Headers = headers; }
5. 把Unix格式的Timestamp轉成UTC時間
if (ea.BasicProperties.Timestamp.UnixTime > 0) { message.TimestampUnix = ea.BasicProperties.Timestamp.UnixTime; var offset = DateTimeOffset.FromUnixTimeMilliseconds(ea.BasicProperties.Timestamp.UnixTime); message.Timestamp = offset.UtcDateTime; }
6. 消息存入MongoDB
_mongoDbContext.Collection<Message>().InsertOne(message, cancellationToken: cancellationToken);
MongoDB記錄:
重試記錄:
1. 經過消息Id檢索消息
2. 經過頭消息檢索消息
3. 消息重發,會從新生成MessageId
1. 消息處理成功,執行Ack,RabbitMQ會把消息從隊列中刪除。
2. 消息處理失敗,執行Nack或者Reject:
a) 當requeue=true時,消息會從新回到隊列,而後當前消費者會立刻再取回這條消息;
b) 當requeue=false時,若是Exchange有設置Dead Letter Exchange,則消息會去到Dead Letter Exchange;
c) 當requeue=false時,若是Exchange沒設置Dead Letter Exchange,則消息從隊列中刪除,效果與Ack相同。
3. Nack與Reject的區別在於:Nack能夠批量操做,Reject只能單條操做。
1. 重連(Reconnect)
2. 恢復鏈接監聽(Listeners)
3. 從新打開通道(Channels)
4. 恢復通道監聽(Listeners)
5. 恢復basic.qos,publisher confirms以及transaction設置
1. 從新聲明交換機(Exchanges)
2. 從新聲明隊列(Queues)
3. 恢復全部綁定(Bindings)
4. 恢復全部消費者(Consumers)
1. 臨時異常,如數據庫網絡閃斷、http請求臨時失效等
經過短期重試(如1秒後)的方式處理,也能夠考慮Nack/Reject來實現重試(時效性更高)。
2. 時序異常,如A任務依賴於B任務,但可能因爲調度或消費者分配的緣由,致使A任務先於B任務執行
經過長時間重試(如1分鐘、30分鐘、1小時、1天等),等待B任務先執行完的方式處理。
3. 業務異常,因爲系統測試不充分,上線後發現某幾個或某幾種消息沒法正常處理
等系統修正後,經過消息重發的方式處理。
4. 系統異常,業務中間件沒法正常操做,如網絡中斷、數據庫宕機等
等系統恢復後,經過消息重發的方式處理。
5. 非法異常,一些僞造、攻擊類型的消息
屢次重試失敗後,消息從隊列中被刪除,也能夠針對此業務作進一步處理。