分佈式架構學習:RabbitMQ可靠性投遞與生產實踐

本章重點:

可靠性投遞
1.確保消息發送到RabbitMQ服務器
2.確保消息被正確的路由
3.確保消息在隊列正確地存儲
4.確保消息從隊列正確地投遞到消費者
5.消費者回調
6.補償機制
7.消息冪等性
8.消息的順序性
複製代碼

可靠性投遞

首先須要明確,效率和可靠性是沒法兼得的,若是要保證每個環節都成功,勢必會對消息的收發效率形成影響,如過是一些業務實時性要求不是特別高的場合,能夠犧牲可靠性來換取效率。bash

  1. ①表明消息從生產者發送到Exchange
  2. ②表明消息從Exchange路由到Queue
  3. ③ 表明消息在Queue中存儲;
  4. ④ 表明消費者訂閱Queue並消費消息。5.

1.確保消息發送到RabbitMQ服務器

可能由於網絡或者Broker的問題致使①失敗,而生產者是沒法得知消息是否正確發送到Broker的。服務器

有兩種解決方案:網絡

  • 第一種是Transaction事務模式
  • 第二種是Confirm確認模式

1.在經過channel.txSelect方法開啓事務以後,咱們即可以發佈消息給RabbitMQ了,若是事務提交成功,則消息必定 到達了RabbitMQ中,若是在事務提交執行以前因爲RabbitMQ異常崩潰或者其餘緣由拋出異常,這個時候咱們即可以將其捕獲,進而經過執行channel.txRollback方法來實現事務回滾。使用事務機制的話會「吸乾」RabbitMQ的性 能,通常不建議使用。ui

2.生產者經過調用channel.confirmSelect方法(即Confirm.Select命令)將信道設置爲confirm模式。一旦消息被投遞到全部匹配的隊列以後,RabbitMQ就會發送一個確認(Basic.Ack)給生產者(包含消息的惟一ID),這就使得生產者知曉消息已經正確到達了目的地了。spa

2.確保消息被正確的路由

可能由於路由關鍵字錯誤,或者隊列不存在,或者隊列名稱錯誤致使②失敗。日誌

  1. 使用mandatory參數和ReturnListener,能夠實現消息沒法路由的時候返回給生產者。
  2. 另外一種方式就是使用備份交換機(alternate-exchange),沒法路由的消息會發送到這個交換機上。
Map<String,Object> arguments = new HashMap<String,Object>();
// 指定交換機的備份交換機
arguments.put("alternate-exchange","ALTERNATE_EXCHANGE");
channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);
複製代碼

3.確保消息在隊列正確地存儲

可能由於系統宕機、重啓、關閉等等狀況致使存儲在隊列的消息丟失,即③出現問題。code

解決方案:

1.隊列持久化cdn

// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
複製代碼

2.交換機持久化blog

// String exchange, boolean durable
channel.exchangeDeclare("MY_EXCHANGE","true");
複製代碼

3.消息持久化隊列

AMQP.BasicProperties properties = new AMQP.BasicProperties
.Builder()
// 2表明持久化,其餘表明瞬態
.deliveryMode(2)
.build();
channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());
複製代碼

4.確保消息從隊列正確地投遞到消費者

若是消費者收到消息後將來得及處理即發生異常,或者處理過程當中發生異常,會致使④失敗。 爲了保證消息從隊列可靠性到達消費者,RabbitMQ提供了消息確認機制(message acknowledgement),消費者在訂閱隊列時,能夠指定autoAck參數,當autoAck等於false時,RabbitMQ會等待消費者顯示地回覆確認消息才從隊列中刪除該消息。

若是消息消費失敗,也能夠調用Basic.Reject或者BasicNack來拒絕當前消息而不是確認,若是requere參數爲true,能夠把這條消息從新存入隊列,以便發送給下一個消費者。

5.消費者回調

消費者處理消息以後,能夠再發送一條消息給生產者,或者調用生產者地API,告知消息處理完畢。

6.補償機制

對於必定時間沒有響應地消息,能夠設置一個定時重發地機制,可是要控制次數,好比最多重複三次,不然會形成消息堆積。

7.消息冪等性

服務端是沒有這種控制的,只能在消費端控制。

如何避免消息的重複消費?

消息重複消費可能會有兩個緣由:

  1. 生產者的問題。環節①重複發送消息,好比在開啓Confirm模式但未收到確認
  2. 環節④出了問題,因爲消費者未發送ACK或者其它緣由,消息重複投遞

對於重複發送的消息,能夠對每一條消息生成一個惟一的業務id,經過日誌或者建表來作重複控制。

8.消息的順序性

消息的順序性是指消費者消費消息的順序跟生產者投遞消息的順序是一致的。

在RabbitMQ中,一個隊列有多個消費者時,因爲不一樣的消費者消費消息的速度是不同的,順序沒法保證

相關文章
相關標籤/搜索