默認狀況下,exchange、queue、message 等數據都是存儲在內存中的,這意味着若是 RabbitMQ 重啓、關閉、宕機時全部的信息都將丟失。bash
RabbitMQ 提供了持久化來解決這個問題,持久化後,若是 RabbitMQ 發送 重啓、關閉、宕機,下次起到時 RabbitMQ 會從硬盤中恢復exchange、queue、message 等數據。服務器
RabbitMQ 持久化包含3個部分post
queue 的持久化能保證自己的元數據不會因異常而丟失,可是不能保證內部的 message 不會丟失。要確保 message 不丟失,還須要將 message 也持久化性能
若是 exchange 和 queue 都是持久化的,那麼它們之間的 binding 也是持久化的。ui
若是 exchange 和 queue 二者之間有一個持久化,一個非持久化,就不容許創建綁定。spa
注意:一旦肯定了 exchange 和 queue 的 durable,就不能修改了。若是非要修改,惟一的辦法就是刪除原來的 exchange 或 queue 後,重現建立 code
若是將全部的消息都進行持久化操做,這樣會嚴重影響 RabbitMQ 的性能。寫入磁盤的速度可比寫入內存的速度要慢不少。因此須要在可靠性和吞吐量之間作權衡。隊列
將 exchange、queue 和 message 都進行持久化操做後,也不能保證消息必定不會丟失,消息存入RabbitMQ 以後,還須要一段時間才能存入硬盤。RabbitMQ 並不會爲每條消息都進行同步存盤,若是在這段時間,服務器宕機或者重啓,消息還沒來得及保存到磁盤當中,就會丟失。對於這種狀況,能夠引入 RabiitMQ 鏡像隊列機制。內存
經過代碼實現 RabbitMQ 持久化路由
原生的 RabbitMQ 客戶端須要完成三個步驟。
第一步,設置交換器的持久化
// 三個參數分別爲 交換器名、交換器類型、是否持久化
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
複製代碼
第二步,設置隊列的持久化
// 參數1 queue :隊列名
// 參數2 durable :是否持久化
// 參數3 exclusive :僅建立者可使用的私有隊列,斷開後自動刪除
// 參數4 autoDelete : 當全部消費客戶端鏈接斷開後,是否自動刪除隊列
// 參數5 arguments
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
複製代碼
第三步,設置消息的持久化
// 參數1 exchange :交換器
// 參數2 routingKey : 路由鍵
// 參數3 props : 消息的其餘參數,其中 MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化
// 參數4 body : 消息體
channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
複製代碼
Spring RabbitMQ 是對原生的 RabbitMQ 客戶端的封裝。通常狀況下,咱們只須要定義 exchange 和 queue 的持久化。
配置交換機持久化
/**
* value 交換機名稱
* type 交換機類型,默認 direct
* durable 是否持久化,默認 true
* autoDelete 是否自動刪除,默認 false
* internal 是否爲內部交換機,默認爲 false
*/
@Exchange(value = "exchangeName", type = "direct", durable = "true", autoDelete = "false", internal = "false")
複製代碼
配置隊列持久化
/**
* value 隊列名稱
* durable 是否持久化
* exclusive 否爲獨佔隊列
* autoDelete 是否自動刪除
*/
@Queue(value = "queryName", durable = "true", exclusive = "false", autoDelete = "false")
複製代碼
一個用例
/**
* 消費消息
*/
@Component
public class ConsumerMessageListener {
/**
* 監聽指定隊列
*
* @param message 消息體
* @param headers 消息頭
* @param channel 通道
* @return
* @RabbitListener 指定了 exchange 、key、Queue 後,若是 Rabbitmq 沒有會去建立
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "exchangeName", type = "direct", durable = "true", autoDelete = "false", internal = "false"),
key = "routingKeyValue",
value = @Queue(value = "queryName", durable = "true", exclusive = "false", autoDelete = "false")
))
public void listenerMessage(String message, @Headers Map<String, Object> headers, Channel channel)
throws IOException {
System.out.println(message);
System.out.println(headers);
//手動 ack
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false);
}
}
複製代碼
正常思路想 exchange 和 queue 的持久化應該在消息發送端配置,其實也能夠配置在消息消費端,RabbitListener 回去檢查 exchange 和 queue,若是不存在則建立