做者:threedayman 恆生LIGHT雲社區html
接着上一講 消息中間件之RabbitMQ初識,這筆咱們來說講RabbitMQ中消息丟失的問題。已經怎樣在覈心業務中避免消息丟失。java
血淚故事:商品購物流程中的發貨環節引入了RabbitMQ,某天因爲網絡抖動致使了生產者的消息沒有發送到RabbitMQ中,因爲沒有作消息的可靠性傳輸保證,消息丟失,致使一批客戶遲遲沒收到貨物而引起投訴,給公司形成了不小的損失。網絡
爲了不上述悲劇重演,咱們來了解下在RabbitMQ中咱們須要怎樣保證消息不丟失。異步
消息丟失會發生在何時
消息的傳輸過程大體以下圖ide
消息丟失可能發生在優化
- Producer端 發送到RabbitMQ中因爲網絡異常或者服務異常致使消息發送失敗。
- RabbitMQ服務端 異常或者重啓致使消息丟失。
- Consumer端 接收到消息後,消息處理失敗,消息丟失。
固然上一講中有提到在RabbitMQ,生產者發送消息是和Exchange交互,Exchange根據路由規則投遞到具體的Queue中,若是路由規則設置有問題,也會致使消息丟失,但此條不在本文討論重點。url
Producer 消息可靠性保證
爲了不因爲網絡抖動或者RabbitMQ服務端異常致使消息發送失敗的問題。能夠在Producer發送消息的使用引入了一個確認機制(ack),服務端接收到消息以後,會返回給Producer一個成功或者失敗的確認消息。.net
RabbitMQ提供了兩種解決方式:3d
- 事務機制
- 發送方確認機制
事務方式,主要方法有如下幾個code
- channel.txSelect() 將當前的channel設置成事務模式。
- channel.txCommit()用於提交事務。
- channel.txRollback()用於事務回滾
下面代碼是簡單示例
try { channel.txSelect(); channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); channel.txCommit(); } catch (Exception e) { e.printStackTrace(); channel.txRollback(); //發送失敗後續處理,重發或者持久化異常消息稍後重試 }
信號的流轉過程以下圖
圖片來源 RabbitMQ實戰指南
若是事務可以提交成功,則消息必定到達了RabbitMQ中。
圖片來源 RabbitMQ實戰指南
事務機制可以解決消息生產者和RabbitMQ之間消息 確認的問題,只有消息成功被RabbitMQ接收,事務才能提交成功。但事務機制是同步阻塞進行的,回大大下降RabbitMQ的吞吐量,RabbitMQ提供了一種改進方案,即發送方確認機制。
發送方確認機制:
- channel.confirmSelect(); 將通道設置確認機制
- channel.addConfirmListener() 爲通道添加ConfirmListener這個回調接口。
- com.rabbitmq.client.ConfirmListener#handleAck 回調處理正常被RabbitMQ接收的消息。
- com.rabbitmq.client.ConfirmListener#handleNack回調處理沒有被RabbitMQ正常接收的消息。
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { public void handleAck(long deliveryTag, boolean multiple) throws IOException { if (multiple) { confirmSet.headSet(deliveryTag + 1).clear(); } else { confirmSet.remove(deliveryTag); } } public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple); if (multiple) { confirmSet.headSet(deliveryTag + 1).clear(); } else { confirmSet.remove(deliveryTag); } //這裏須要添加消息發送失敗處理的代碼,從新發送或者持久化後補償。 } }); //模擬一直髮送消息的場景 while (true) { long nextSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); confirmSet.add(nextSeqNo); }
上面例子演示了異步confirm的形式,在保證生產者消息被RabbitMQ正常接收,又沒有同步阻塞致使明顯下降RabbitMQ吞吐量的問題。
RabbitMQ端
爲避免RabbitMQ服務異常或者重啓致使的消息丟失,須要對作持久化操做,將相關信息保存到磁盤上。要保證消息不丟失須要持久化主隊列、持久化。exchange不持久化,在RabbitMQ服務重啓後,相關的exchange元數據會丟失,不過消息不丟失,但消息不能發送到這個exchange中了。
- 隊列持久化須要在聲明隊列的時候將durable參數設置爲true。(由於消息是存在與隊列中,若是隊列不持久化,那RabbitMQ重啓後,消息將丟失)
- 消息持久化經過將投遞模式設置成2(BasicProperties中的deliveryMode)。
channel.queueDeclare(QUEUE_NAME,true,//durable false,false,null); channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,//具體屬性見下面 message.getBytes(StandardCharsets.UTF_8));
public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain", null, null, 2, //deliveryMode 0, null, null, null, null, null, null, null, null, null);
Consumer端
爲保證Consumer端不因消費處理異常或消費者應用重啓致使消息丟失。咱們須要以下操做
- 關閉默認的自動確認。設置爲手動確認模式。
手動確認模式:RabbitMQ會等待消費者回復確認信號後才從刪除消息。
自動確認模式(默認):RabbitMQ會自動把發出去的消息置爲確認,而後刪除,無論消費者有沒有真正消費到這些消息。
當設置爲手動確認模式,對於RabbitMQ服務端而言隊列中的消息分爲了兩種
- Ready:等待投遞給消費者的消息。
- Unacked:已經投遞給消費者,但尚未收到消費者確認新號的消息。
對於Unacked消息,會出現下面幾種狀況:
- RabbitMQ收到持有消息的消費者的ack信號,RabbitMQ服務端將會刪除該消息。
- RabbitMQ服務端收到持有消息的消費者nack/reject信號,requeue參數爲true,RabbitMQ會從新將這條消息存入隊列。
- RabbitMQ服務端收到持有消息的消費者nack/reject信號,requeue參數爲false,若是隊列配置了死信隊列,則消息進入死信隊列,若是沒有配置死信隊列,則消息被RabbitMQ從隊列中刪除。
- RabbitMQ服務端沒有收到消息持有消費者的確認信號,且消費此消息的消費者沒有斷開鏈接,則服務端會一直等待,沒有超時時間。
- RabbitMQ服務端沒有收到消息持有消費者的確認信號,且消費此消息的消費者已經斷開鏈接,RabbitMQ會安排該消息從新進入隊列。
消息拒絕可使用Channel類中的basicReject或者basicNack方法,下面咱們來看下他們之間的差別。
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException; void basicReject(long deliveryTag, boolean requeue) throws IOException;
- deliveryTag:64位的長整型值,做爲消息的編號。
- requeue:是否重入隊列配置項。
- multiple:是否批量處理未被當前消費者確認的消息。
basicReject一次只能拒絕一條消息。
basicNack當multiple爲false時一次拒絕一條編號爲deliveryTag消息,效果和basicReject同樣。當multiple爲true時表示拒絕deliveryTag編號以前全部未被當前消費者確認的消息。
咱們來看一個代碼示例:
boolean autoAck = false; channel.basicConsume(queueName, autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); try{ //消息處理業務邏輯處理 channel.basicAck(deliveryTag, false); }catch(Exception e){ //處理失敗處理邏輯 channel.basicReject(deliveryTag, false); } } });
經過手動確認模式,RabbitMQ只有在收到持有消息的Consumer的應答信號時,纔會刪除掉消息,保證消息不因Consumer應用異常而致使消息丟失的問題發生。
看了消費端保證消息不丟失的方案,有小夥伴會有疑問,假如RabbitMQ已經把消息投遞給了Consumer,Consumer正常的處理了消息,可是因爲網絡抖動等緣由,RabbitMQ沒有收到Consumer的ack消息,且認爲Consumer已經斷開鏈接,那麼RabbitMQ會從新將消息放入隊列,並投遞給消費者。這樣會致使某些消息重複投遞給Consumer的問題產生。
在此種方案下RabbitMQ確實有可能產生重複消息的問題,咱們將在接下來的文章中去處理這個問題。
該方案只保證消息至少一次投遞(At least Once)
死信隊列
DLX,全名Dead-Letter-Exchange,死信交換器。當一個消息變爲死信(dead message)後,可以被從新DLX上,綁定DLX的隊列就是死信隊列。
消息變成私信有如下幾種可能
- 消息被拒絕(basicNack/basicReject),而且設置requeue參數爲false;
- 消息過時。
- 隊列超過最大長度。
下面經過一個簡化的代碼示例來演示下死信隊列的使用。詳細說明見註釋
//聲明交換器 channe1.exchangeDeclare("exchange.dlx","direct ",true); channe1.exchangeDeclare( "exchange.normal "," fanout ",true); Map<String , Object> args = new HashMap<String, Object>( ); //設置消息超時時間 args.put("x-message-ttl " , 10000); //經過x-dead-letter-exchange參數來執行DLX args.put( "x-dead-letter-exchange ","exchange.dlx"); //爲DLX指定路由鍵 args.put( "x-dead-letter-routing-key"," routingkey"); channe1.queueDec1are( "queue.norma1 ",true,fa1se,fa1se,args); channe1.queueBind( "queue.normal ","exchange .normal", ""); channe1.queueDec1are( "queue.d1x ", true , false , false , null) ; channe1.queueBind( "queue.dlx","exchange.dlx ", routingkey"); channe1.basicPublish( "exchange.normal" , "rk" , MessageProperties.PERSISTENT_TEXT_PLAIN,"dlx".getBytes()) ;
消息流程見下圖
對於RabbitMQ來講,經過分析死信隊列中的消息,能夠用於改善和優化系統。
總結:消息丟失可能發生在生產端、服務端、消費端。對於重要業務咱們能夠經過上面介紹的方式來確保消息不丟失。你們也能夠留言討論下,在使用RabbitMQ過程當中遇到過哪些坑。
參考文檔