RabbitMQ 持久化

默認狀況下,exchange、queue、message 等數據都是存儲在內存中的,這意味着若是 RabbitMQ 重啓、關閉、宕機時全部的信息都將丟失。bash

RabbitMQ 提供了持久化來解決這個問題,持久化後,若是 RabbitMQ 發送 重啓、關閉、宕機,下次起到時 RabbitMQ 會從硬盤中恢復exchange、queue、message 等數據。服務器

持久化

RabbitMQ 持久化包含3個部分post

  • exchange 持久化,在聲明時指定 durable 爲 true
  • queue 持久化,在聲明時指定 durable 爲 true
  • message 持久化,在投遞時指定 delivery_mode=2(1是非持久化)

queue 的持久化能保證自己的元數據不會因異常而丟失,可是不能保證內部的 message 不會丟失。要確保 message 不丟失,還須要將 message 也持久化性能

若是 exchange 和 queue 都是持久化的,那麼它們之間的 binding 也是持久化的。ui

若是 exchange 和 queue 二者之間有一個持久化,一個非持久化,就不容許創建綁定。spa

注意:一旦肯定了 exchange 和 queue 的 durable,就不能修改了。若是非要修改,惟一的辦法就是刪除原來的 exchange 或 queue 後,重現建立 code

拓展

若是將全部的消息都進行持久化操做,這樣會嚴重影響 RabbitMQ 的性能。寫入磁盤的速度可比寫入內存的速度要慢不少。因此須要在可靠性和吞吐量之間作權衡。隊列

將 exchange、queue 和 message 都進行持久化操做後,也不能保證消息必定不會丟失,消息存入RabbitMQ 以後,還須要一段時間才能存入硬盤。RabbitMQ 並不會爲每條消息都進行同步存盤,若是在這段時間,服務器宕機或者重啓,消息還沒來得及保存到磁盤當中,就會丟失。對於這種狀況,能夠引入 RabiitMQ 鏡像隊列機制。內存

代碼實現

經過代碼實現 RabbitMQ 持久化路由

原生的實現方式

原生的 RabbitMQ 客戶端須要完成三個步驟。

第一步,設置交換器的持久化

// 三個參數分別爲 交換器名、交換器類型、是否持久化
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);  
複製代碼

第二步,設置隊列的持久化

// 參數1 queue :隊列名  
// 參數2 durable :是否持久化  
// 參數3 exclusive :僅建立者可使用的私有隊列,斷開後自動刪除  
// 參數4 autoDelete : 當全部消費客戶端鏈接斷開後,是否自動刪除隊列  
// 參數5 arguments  
channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
複製代碼

第三步,設置消息的持久化

// 參數1 exchange :交換器  
// 參數2 routingKey : 路由鍵  
// 參數3 props : 消息的其餘參數,其中 MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化  
// 參數4 body : 消息體  
channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());  
複製代碼

Spring RabbitMQ 的實現方式

Spring RabbitMQ 是對原生的 RabbitMQ 客戶端的封裝。通常狀況下,咱們只須要定義 exchange 和 queue 的持久化。

配置交換機持久化

/**
 * value      交換機名稱
 * type       交換機類型,默認 direct
 * durable    是否持久化,默認 true
 * autoDelete 是否自動刪除,默認 false
 * internal   是否爲內部交換機,默認爲 false
 */
@Exchange(value = "exchangeName", type = "direct", durable = "true", autoDelete = "false", internal = "false")
複製代碼

配置隊列持久化

/**
 * value      隊列名稱
 * durable    是否持久化
 * exclusive  否爲獨佔隊列
 * autoDelete 是否自動刪除
 */
@Queue(value = "queryName", durable = "true", exclusive = "false", autoDelete = "false")
複製代碼

一個用例

/**
 * 消費消息
 */
@Component
public class ConsumerMessageListener {
    /**
     * 監聽指定隊列
     *
     * @param message 消息體
     * @param headers 消息頭
     * @param channel 通道
     * @return
     * @RabbitListener 指定了 exchange 、key、Queue 後,若是 Rabbitmq 沒有會去建立
     */
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = "exchangeName", type = "direct", durable = "true", autoDelete = "false", internal = "false"),
            key = "routingKeyValue",
            value = @Queue(value = "queryName", durable = "true", exclusive = "false", autoDelete = "false")
    ))
    public void listenerMessage(String message, @Headers Map<String, Object> headers, Channel channel)
            throws IOException {
        System.out.println(message);
        System.out.println(headers);
        //手動 ack
        channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false);
    }
}
複製代碼

正常思路想 exchange 和 queue 的持久化應該在消息發送端配置,其實也能夠配置在消息消費端,RabbitListener 回去檢查 exchange 和 queue,若是不存在則建立

相關文章

RabbitMQ 之消息的可靠性投遞

RabbitMQ 之消息的可靠性消費

相關文章
相關標籤/搜索