RabbitMQ使用教程(四)如何經過持久化保證消息99.99%不丟失?

1. 前情回顧

RabbitMQ使用教程(一)RabbitMQ環境安裝配置及Hello World示例html

RabbitMQ使用教程(二)RabbitMQ用戶管理,角色管理及權限設置java

RabbitMQ使用教程(三)如何保證消息99.99%被髮送成功?git

在上一篇博客中,咱們講解了如何經過RabbitMQ的生產者確認機制,保證消息儘量的成功的發送到RabbitMQ服務器,這只是從源頭下降了消息丟失的概率,並無真正解決以前提到的問題:如何保證RabbitMQ異常狀況(人爲重啓、異常宕機等)下,隊列和消息不丟失?github

2. 本篇概要

要解決該問題,就要用到RabbitMQ中持久化的概念,所謂持久化,就是RabbitMQ會將內存中的數據(Exchange 交換器,Queue 隊列,Message 消息)固化到磁盤,以防異常狀況發生時,數據丟失。spring

其中,RabblitMQ的持久化分爲三個部分:springboot

  1. 交換器(Exchange)的持久化
  2. 隊列(Queue)的持久化
  3. 消息(Message)的持久化

3. 交換器(Exchange)的持久化

在上篇博客中,咱們聲明Exchange的代碼是這樣的:服務器

private final static String EXCHANGE_NAME = "normal-confirm-exchange";

// 建立一個Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");

這種狀況下聲明的Exchange是非持久化的,在RabbitMQ出現異常狀況(重啓,宕機)時,該Exchange會丟失,會影響後續的消息寫入該Exchange,那麼如何設置Exchange爲持久化的呢?答案是設置durable參數。app

durable:設置是否持久化。durable設置爲true表示持久化,反之是非持久化。性能

持久化能夠將交換器存盤,在服務器重啓的時候不會丟失相關信息。ui

設置Exchange持久化:

channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);

此時調用的重載方法爲:

public DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException {
    return this.exchangeDeclare(exchange, (String)type, durable, false, (Map)null);
}

爲了能更好的理解,咱們新建個生產類以下:

package com.zwwhnly.springbootaction.rabbitmq.durable;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DurableProducer {
    private final static String EXCHANGE_NAME = "durable-exchange";
    private final static String QUEUE_NAME = "durable-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立鏈接
        ConnectionFactory factory = new ConnectionFactory();
        // 設置 RabbitMQ 的主機名
        factory.setHost("localhost");
        // 建立一個鏈接
        Connection connection = factory.newConnection();
        // 建立一個通道
        Channel channel = connection.createChannel();
        // 建立一個Exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 發送消息
        String message = "durable exchange test";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

        // 關閉頻道和鏈接
        channel.close();
        connection.close();
    }
}

示例代碼中,咱們新建了1個非持久化的Exchange,1個非持久化的Queue,並將它們作了綁定,此時運行代碼,Exchange和Queue新建成功,消息‘durable exchange test’也被正確地投遞到了隊列中:

此時重啓下RabbitMQ服務,會發現Exchange丟失了:

修改下代碼,將durable參數設置爲ture:

// 建立一個Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);

此時運行完代碼,而後重啓下RabbitMQ服務,會發現Exchange再也不丟失:

4. 隊列(Queue)的持久化

細心的網友可能會發現,雖然如今重啓RabbitMQ服務後,Exchange不丟失了,可是隊列和消息丟失了,那麼如何解決隊列不丟失呢?答案也是設置durable參數。

durable:設置是否持久化。爲true則設置隊列爲持久化。

持久化的隊列會存盤,在服務器重啓的時候能夠保證不丟失相關信息。

簡單修改下上面聲明Queue的代碼,將durable參數設置爲true:

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

此時調用的重載方法以下:

public com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {
    validateQueueNameLength(queue);
    return (com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk)this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Queue.Declare.Builder()).queue(queue).durable(durable).exclusive(exclusive).autoDelete(autoDelete).arguments(arguments).build()).getMethod();
}

運行代碼,而後重啓RabbitMQ服務,會發現隊列如今不丟失了:

5. 消息(Message)的持久化

雖然如今RabbitMQ重啓後,Exchange和Queue都不丟失了,可是存儲在Queue裏的消息卻仍然會丟失,那麼如何保證消息不丟失呢?答案是設置消息的投遞模式爲2,即表明持久化。

修改發送消息的代碼爲:

// 發送消息
String message = "durable exchange test";
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

調用的重載方法爲:

public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException {
    this.basicPublish(exchange, routingKey, false, props, body);
}

運行代碼,而後重啓RabbitMQ服務,發現此時Exchange,Queue,消息都不丟失了:

至此,咱們完美的解決了RabbitMQ重啓後,消息丟失的問題。

最終的代碼以下,你也能夠經過文末的源碼連接下載本文用到的全部源碼:

package com.zwwhnly.springbootaction.rabbitmq.durable;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DurableProducer {
    private final static String EXCHANGE_NAME = "durable-exchange";
    private final static String QUEUE_NAME = "durable-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 建立鏈接
        ConnectionFactory factory = new ConnectionFactory();
        // 設置 RabbitMQ 的主機名
        factory.setHost("localhost");
        // 建立一個鏈接
        Connection connection = factory.newConnection();
        // 建立一個通道
        Channel channel = connection.createChannel();
        // 建立一個Exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 發送消息
        String message = "durable exchange test";
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build();
        channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());

        // 關閉頻道和鏈接
        channel.close();
        connection.close();
    }
}

6. 注意事項

1)理論上能夠將全部的消息都設置爲持久化,可是這樣會嚴重影響RabbitMQ的性能。由於寫入磁盤的速度比寫入內存的速度慢得不止一點點。對於可靠性不是那麼高的消息能夠不採用持久化處理以提升總體的吞吐量。在選擇是否要將消息持久化時,須要在可靠性和吞吐量之間作一個權衡。

2)將交換器、隊列、消息都設置了持久化以後仍然不能百分之百保證數據不丟失,由於當持久化的消息正確存入RabbitMQ以後,還須要一段時間(雖然很短,可是不可忽視)才能存入磁盤之中。若是在這段時間內RabbitMQ服務節點發生了宕機、重啓等異常狀況,消息還沒來得及落盤,那麼這些消息將會丟失。

3)單單隻設置隊列持久化,重啓以後消息會丟失;單單隻設置消息的持久化,重啓以後隊列消失,繼而消息也丟失。單單設置消息持久化而不設置隊列的持久化顯得毫無心義。

7. 源碼

源碼地址:https://github.com/zwwhnly/springboot-action.git,歡迎下載。

8. 參考

《RabbitMQ實戰指南》

相關文章
相關標籤/搜索