RabbitMQ 可靠投遞

RabbitMQ 可靠投遞

標籤: RabbitMQ shovel-plugin ConfirmCallback RabbitMQ消息投遞node


  • 背景
  • confirmCallback 確認模式
  • returnCallback 未投遞到 queue 退回模式
  • shovel-plugin 跨機房可靠投遞

背景

在使用 RabbitMQ 的時候,做爲消息發送方但願杜絕任何消息丟失或者投遞失敗場景。RabbitMQ 爲咱們提供了兩個選項用來控制消息的投遞可靠性模式。服務器

rabbitmq 整個消息投遞的路徑爲:
producer->rabbitmq broker cluster->exchange->queue->consumer網絡

messageproducerrabbitmq broker cluster 則會返回一個 confirmCallback
messageexchange->queue 投遞失敗則會返回一個 returnCallback 。咱們將利用這兩個 callback 控制消息的最終一致性和部分糾錯能力。架構

confirmCallback 確認模式

在建立 connectionFactory 的時候設置 PublisherConfirms(true) 選項,開啓 confirmcallbacktcp

CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherConfirms(true);//開啓confirm模式
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
        if (!ack) {
               log.error("消息發送失敗!" + cause + data.toString());
        } else {
            log.info("消息發送成功,消息ID:" + (data != null ? data.getId() : null));
        }
    });

咱們來看下 ConfirmCallback 接口。插件

public interface ConfirmCallback {

        /**
         * Confirmation callback.
         * @param correlationData correlation data for the callback.
         * @param ack true for ack, false for nack
         * @param cause An optional cause, for nack, when available, otherwise null.
         */
        void confirm(CorrelationData correlationData, boolean ack, String cause);

    }

重點是 CorrelationData 對象,每一個發送的消息都須要配備一個 CorrelationData 相關數據對象,CorrelationData 對象內部只有一個 id 屬性,用來表示當前消息惟一性。code

發送的時候建立一個 CorrelationData 對象。orm

User user = new User();
user.setID(1010101L);
user.setUserName("plen");

rabbitTemplate.convertAndSend(exchange, routing, user,
        message -> {
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
            return message;
        },
new CorrelationData(user.getID().toString()));

這裏將 user ID 設置爲當前消息 CorrelationData id 。固然這裏是純粹 demo,真實場景是須要作業務無關消息 ID 生成,同時要記錄下這個 id 用來糾錯和對帳。server

消息只要被 rabbitmq broker 接收到就會執行 confirmCallback,若是是 cluster 模式,須要全部 broker 接收到纔會調用 confirmCallback對象

broker 接收到只能表示 message 已經到達服務器,並不能保證消息必定會被投遞到目標 queue 裏。因此須要用到接下來的 returnCallback

returnCallback 未投遞到queue退回模式

confrim 模式只能保證消息到達 broker,不能保證消息準確投遞到目標 queue 裏。在有些業務場景下,咱們須要保證消息必定要投遞到目標 queue 裏,此時就須要用到 return 退回模式。

一樣建立 ConnectionFactory 到時候須要設置 PublisherReturns(true) 選項。

CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherReturns(true);//開啓return模式
rabbitTemplate.setMandatory(true);//開啓強制委託模式

rabbitTemplate.setReturnCallback((message, replyCode, replyText,
                    exchange, routingKey) ->
    log.info(MessageFormat.format("消息發送ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode, replyText, exchange, routingKey)));

這樣若是未能投遞到目標 queue 裏將調用 returnCallback ,能夠記錄下詳細到投遞數據,按期的巡檢或者自動糾錯都須要這些數據。

shovel-plugin 跨機房可靠投遞

RabbitMQ 在跨機房集成提供了一個不錯的插件 shovel 。使用 shovel-plugin 插件很是方便,shovel 能夠接受機房之間的網絡斷開、機器下線等不穩定因素。

這裏有兩個 broker

10.211.55.3 rabbit_node1
10.211.55.4 rabbit_node2

咱們但願將發送給 rabbit_node1 plen.queue 的消息傳輸到 rabbit_node2 plen.queue 中。咱們先開啓 rabbit_node1shovel-plugin

先看下當前 RabbitMQ 版本是否安裝了 shovel-plugin,若是有的話直接開啓。

rabbitmq-plugins  list
rabbitmq-plugins  enable rabbitmq_shovel
rabbitmq-plugins  enable rabbitmq_shovel_management

而後就能夠在 Admin 面板裏看到這個設置選項,怎麼設置這裏就不介紹了。主要就是配置下 amqp 協議地址,amqp://user:password@server-name/my-vhost

若是配置沒有問題的話,應該是這樣的一個狀態,說明已經順利鏈接到 rabbit_node2 broker


咱們來看下 rabbit_node1rabbit_node2Connections 面板。
rabbit_node1(10.211.55.3):

rabbit_node2(10.211.55.4):

RabbitMQ shovel-plugin 插件在 rabbit_node1 broker 建立了兩個 tcp 鏈接,端口 39544 鏈接是用來消費 plen.queue 裏的消息,端口 55706 鏈接是用來推送消息給 rabbit_node2

咱們來看下 rabbit_node1 tcp 鏈接狀態:

tcp6       0      0 10.211.55.3:5672        10.211.55.3:39544       ESTABLISHED
tcp        0      0 10.211.55.3:55706       10.211.55.4:5672        ESTABLISHED

rabbit_node2 tcp 鏈接狀態:

tcp6       0      0 10.211.55.4:5672        10.211.55.3:55706       ESTABLISHED

爲了驗證 shovel-plugin 穩定性,咱們將 rabbit_node2 下線。

而後再發送消息,發現消息會如今 rabbit_node1 plen.queue 裏待着,一旦 shovel-plugin 鏈接恢復將消費 rabbit_node1 plen.queue 消息,而後投遞給 rabbit_node2 plen.queue

做者:王清培 (滬江集團資深JAVA架構師)

相關文章
相關標籤/搜索