RabbitMQ消息可靠性傳輸

做者:threedayman 恆生LIGHT雲社區html

接着上一講 消息中間件之RabbitMQ初識,這筆咱們來說講RabbitMQ中消息丟失的問題。已經怎樣在覈心業務中避免消息丟失。java

血淚故事:商品購物流程中的發貨環節引入了RabbitMQ,某天因爲網絡抖動致使了生產者的消息沒有發送到RabbitMQ中,因爲沒有作消息的可靠性傳輸保證,消息丟失,致使一批客戶遲遲沒收到貨物而引起投訴,給公司形成了不小的損失。網絡

爲了不上述悲劇重演,咱們來了解下在RabbitMQ中咱們須要怎樣保證消息不丟失。異步

消息丟失會發生在何時

消息的傳輸過程大體以下圖ide

1622462671(1).jpg

消息丟失可能發生在優化

  • Producer端 發送到RabbitMQ中因爲網絡異常或者服務異常致使消息發送失敗。
  • RabbitMQ服務端 異常或者重啓致使消息丟失。
  • Consumer端 接收到消息後,消息處理失敗,消息丟失。

固然上一講中有提到在RabbitMQ,生產者發送消息是和Exchange交互,Exchange根據路由規則投遞到具體的Queue中,若是路由規則設置有問題,也會致使消息丟失,但此條不在本文討論重點。url

Producer 消息可靠性保證

爲了不因爲網絡抖動或者RabbitMQ服務端異常致使消息發送失敗的問題。能夠在Producer發送消息的使用引入了一個確認機制(ack),服務端接收到消息以後,會返回給Producer一個成功或者失敗的確認消息。.net

RabbitMQ提供了兩種解決方式:3d

  • 事務機制
  • 發送方確認機制

事務方式,主要方法有如下幾個code

  • channel.txSelect() 將當前的channel設置成事務模式。
  • channel.txCommit()用於提交事務。
  • channel.txRollback()用於事務回滾

下面代碼是簡單示例

try {
channel.txSelect();
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
channel.txRollback();
//發送失敗後續處理,重發或者持久化異常消息稍後重試
}

信號的流轉過程以下圖

1622700218(1).png

圖片來源 RabbitMQ實戰指南

若是事務可以提交成功,則消息必定到達了RabbitMQ中。

1622700353(1).png

圖片來源 RabbitMQ實戰指南

事務機制可以解決消息生產者和RabbitMQ之間消息 確認的問題,只有消息成功被RabbitMQ接收,事務才能提交成功。但事務機制是同步阻塞進行的,回大大下降RabbitMQ的吞吐量,RabbitMQ提供了一種改進方案,即發送方確認機制。

發送方確認機制:

  • channel.confirmSelect(); 將通道設置確認機制
  • channel.addConfirmListener() 爲通道添加ConfirmListener這個回調接口。
  • com.rabbitmq.client.ConfirmListener#handleAck 回調處理正常被RabbitMQ接收的消息。
  • com.rabbitmq.client.ConfirmListener#handleNack回調處理沒有被RabbitMQ正常接收的消息。
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
	public void handleAck(long deliveryTag, boolean multiple) throws IOException {
		if (multiple) {
			confirmSet.headSet(deliveryTag + 1).clear();
		} else {
			confirmSet.remove(deliveryTag);
		}
	}
	public void handleNack(long deliveryTag, boolean multiple) throws IOException {
		System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
		if (multiple) {
			confirmSet.headSet(deliveryTag + 1).clear();
		} else {
			confirmSet.remove(deliveryTag);
		}
		//這裏須要添加消息發送失敗處理的代碼,從新發送或者持久化後補償。
	}
});
//模擬一直髮送消息的場景
while (true) {
	long nextSeqNo = channel.getNextPublishSeqNo();
	channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
	confirmSet.add(nextSeqNo);
}

上面例子演示了異步confirm的形式,在保證生產者消息被RabbitMQ正常接收,又沒有同步阻塞致使明顯下降RabbitMQ吞吐量的問題。

RabbitMQ端

爲避免RabbitMQ服務異常或者重啓致使的消息丟失,須要對作持久化操做,將相關信息保存到磁盤上。要保證消息不丟失須要持久化主隊列、持久化。exchange不持久化,在RabbitMQ服務重啓後,相關的exchange元數據會丟失,不過消息不丟失,但消息不能發送到這個exchange中了。

  • 隊列持久化須要在聲明隊列的時候將durable參數設置爲true。(由於消息是存在與隊列中,若是隊列不持久化,那RabbitMQ重啓後,消息將丟失)
  • 消息持久化經過將投遞模式設置成2(BasicProperties中的deliveryMode)。
channel.queueDeclare(QUEUE_NAME,true,//durable
                     false,false,null);
channel.basicPublish("",QUEUE_NAME, 
                     MessageProperties.PERSISTENT_TEXT_PLAIN,//具體屬性見下面
                     message.getBytes(StandardCharsets.UTF_8));
public static final BasicProperties PERSISTENT_TEXT_PLAIN = 
new BasicProperties("text/plain", 
					null, 
					null, 
					2, //deliveryMode
					0, null, null, null, 
					null, null, null, null, null, null);

Consumer端

爲保證Consumer端不因消費處理異常或消費者應用重啓致使消息丟失。咱們須要以下操做

  • 關閉默認的自動確認。設置爲手動確認模式。

手動確認模式:RabbitMQ會等待消費者回復確認信號後才從刪除消息。

自動確認模式(默認):RabbitMQ會自動把發出去的消息置爲確認,而後刪除,無論消費者有沒有真正消費到這些消息。

當設置爲手動確認模式,對於RabbitMQ服務端而言隊列中的消息分爲了兩種

  • Ready:等待投遞給消費者的消息。
  • Unacked:已經投遞給消費者,但尚未收到消費者確認新號的消息。

對於Unacked消息,會出現下面幾種狀況:

  • RabbitMQ收到持有消息的消費者的ack信號,RabbitMQ服務端將會刪除該消息。
  • RabbitMQ服務端收到持有消息的消費者nack/reject信號,requeue參數爲true,RabbitMQ會從新將這條消息存入隊列。
  • RabbitMQ服務端收到持有消息的消費者nack/reject信號,requeue參數爲false,若是隊列配置了死信隊列,則消息進入死信隊列,若是沒有配置死信隊列,則消息被RabbitMQ從隊列中刪除。
  • RabbitMQ服務端沒有收到消息持有消費者的確認信號,且消費此消息的消費者沒有斷開鏈接,則服務端會一直等待,沒有超時時間。
  • RabbitMQ服務端沒有收到消息持有消費者的確認信號,且消費此消息的消費者已經斷開鏈接,RabbitMQ會安排該消息從新進入隊列。

消息拒絕可使用Channel類中的basicReject或者basicNack方法,下面咱們來看下他們之間的差別。

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

void basicReject(long deliveryTag, boolean requeue) throws IOException;
  • deliveryTag:64位的長整型值,做爲消息的編號。
  • requeue:是否重入隊列配置項。
  • multiple:是否批量處理未被當前消費者確認的消息。

basicReject一次只能拒絕一條消息。

basicNack當multiple爲false時一次拒絕一條編號爲deliveryTag消息,效果和basicReject同樣。當multiple爲true時表示拒絕deliveryTag編號以前全部未被當前消費者確認的消息。

咱們來看一個代碼示例:

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             long deliveryTag = envelope.getDeliveryTag();
			 try{
				//消息處理業務邏輯處理
				channel.basicAck(deliveryTag, false);
			 }catch(Exception e){
                 //處理失敗處理邏輯
				channel.basicReject(deliveryTag, false);
			 }
         }
     });

經過手動確認模式,RabbitMQ只有在收到持有消息的Consumer的應答信號時,纔會刪除掉消息,保證消息不因Consumer應用異常而致使消息丟失的問題發生。

看了消費端保證消息不丟失的方案,有小夥伴會有疑問,假如RabbitMQ已經把消息投遞給了Consumer,Consumer正常的處理了消息,可是因爲網絡抖動等緣由,RabbitMQ沒有收到Consumer的ack消息,且認爲Consumer已經斷開鏈接,那麼RabbitMQ會從新將消息放入隊列,並投遞給消費者。這樣會致使某些消息重複投遞給Consumer的問題產生。

在此種方案下RabbitMQ確實有可能產生重複消息的問題,咱們將在接下來的文章中去處理這個問題。

該方案只保證消息至少一次投遞(At least Once)

死信隊列

DLX,全名Dead-Letter-Exchange,死信交換器。當一個消息變爲死信(dead message)後,可以被從新DLX上,綁定DLX的隊列就是死信隊列。

消息變成私信有如下幾種可能

  • 消息被拒絕(basicNack/basicReject),而且設置requeue參數爲false;
  • 消息過時。
  • 隊列超過最大長度。

下面經過一個簡化的代碼示例來演示下死信隊列的使用。詳細說明見註釋

//聲明交換器
channe1.exchangeDeclare("exchange.dlx","direct ",true);
channe1.exchangeDeclare( "exchange.normal "," fanout ",true);
Map<String , Object> args = new HashMap<String, Object>( );
//設置消息超時時間
args.put("x-message-ttl " , 10000);
//經過x-dead-letter-exchange參數來執行DLX
args.put( "x-dead-letter-exchange ","exchange.dlx");
//爲DLX指定路由鍵
args.put( "x-dead-letter-routing-key"," routingkey");
channe1.queueDec1are( "queue.norma1 ",true,fa1se,fa1se,args);
channe1.queueBind( "queue.normal ","exchange .normal", "");
channe1.queueDec1are( "queue.d1x ", true , false , false , null) ;
channe1.queueBind( "queue.dlx","exchange.dlx ", routingkey");
channe1.basicPublish( "exchange.normal" , "rk" ,
MessageProperties.PERSISTENT_TEXT_PLAIN,"dlx".getBytes()) ;

消息流程見下圖

1622719517(1).jpg

對於RabbitMQ來講,經過分析死信隊列中的消息,能夠用於改善和優化系統。

總結:消息丟失可能發生在生產端、服務端、消費端。對於重要業務咱們能夠經過上面介紹的方式來確保消息不丟失。你們也能夠留言討論下,在使用RabbitMQ過程當中遇到過哪些坑。

參考文檔

  1. RabbitMQ實戰指南
  2. https://www.rabbitmq.com/reliability.html#what-can-fail
相關文章
相關標籤/搜索