標籤: RabbitMQ shovel-plugin ConfirmCallback RabbitMQ消息投遞node
在使用 RabbitMQ 的時候,做爲消息發送方但願杜絕任何消息丟失或者投遞失敗場景。RabbitMQ 爲咱們提供了兩個選項用來控制消息的投遞可靠性模式。服務器
rabbitmq 整個消息投遞的路徑爲:
producer->rabbitmq broker cluster->exchange->queue->consumer網絡
message 從 producer 到 rabbitmq broker cluster 則會返回一個 confirmCallback 。
message 從 exchange->queue 投遞失敗則會返回一個 returnCallback 。咱們將利用這兩個 callback 控制消息的最終一致性和部分糾錯能力。架構
在建立 connectionFactory 的時候設置 PublisherConfirms(true) 選項,開啓 confirmcallback 。tcp
CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setPublisherConfirms(true);//開啓confirm模式
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory); rabbitTemplate.setConfirmCallback((data, ack, cause) -> { if (!ack) { log.error("消息發送失敗!" + cause + data.toString()); } else { log.info("消息發送成功,消息ID:" + (data != null ? data.getId() : null)); } });
咱們來看下 ConfirmCallback 接口。插件
public interface ConfirmCallback { /** * Confirmation callback. * @param correlationData correlation data for the callback. * @param ack true for ack, false for nack * @param cause An optional cause, for nack, when available, otherwise null. */ void confirm(CorrelationData correlationData, boolean ack, String cause); }
重點是 CorrelationData 對象,每一個發送的消息都須要配備一個 CorrelationData 相關數據對象,CorrelationData 對象內部只有一個 id 屬性,用來表示當前消息惟一性。code
發送的時候建立一個 CorrelationData 對象。orm
User user = new User(); user.setID(1010101L); user.setUserName("plen"); rabbitTemplate.convertAndSend(exchange, routing, user, message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); return message; }, new CorrelationData(user.getID().toString()));
這裏將 user ID 設置爲當前消息 CorrelationData id 。固然這裏是純粹 demo,真實場景是須要作業務無關消息 ID 生成,同時要記錄下這個 id 用來糾錯和對帳。server
消息只要被 rabbitmq broker 接收到就會執行 confirmCallback,若是是 cluster 模式,須要全部 broker 接收到纔會調用 confirmCallback。對象
被 broker 接收到只能表示 message 已經到達服務器,並不能保證消息必定會被投遞到目標 queue 裏。因此須要用到接下來的 returnCallback 。
confrim 模式只能保證消息到達 broker,不能保證消息準確投遞到目標 queue 裏。在有些業務場景下,咱們須要保證消息必定要投遞到目標 queue 裏,此時就須要用到 return 退回模式。
一樣建立 ConnectionFactory 到時候須要設置 PublisherReturns(true) 選項。
CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setPublisherReturns(true);//開啓return模式
rabbitTemplate.setMandatory(true);//開啓強制委託模式 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info(MessageFormat.format("消息發送ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode, replyText, exchange, routingKey)));
這樣若是未能投遞到目標 queue 裏將調用 returnCallback ,能夠記錄下詳細到投遞數據,按期的巡檢或者自動糾錯都須要這些數據。
RabbitMQ 在跨機房集成提供了一個不錯的插件 shovel 。使用 shovel-plugin 插件很是方便,shovel 能夠接受機房之間的網絡斷開、機器下線等不穩定因素。
這裏有兩個 broker :
10.211.55.3 rabbit_node1
10.211.55.4 rabbit_node2
咱們但願將發送給 rabbit_node1 plen.queue 的消息傳輸到 rabbit_node2 plen.queue 中。咱們先開啓 rabbit_node1 的 shovel-plugin。
先看下當前 RabbitMQ 版本是否安裝了 shovel-plugin,若是有的話直接開啓。
rabbitmq-plugins list rabbitmq-plugins enable rabbitmq_shovel rabbitmq-plugins enable rabbitmq_shovel_management
而後就能夠在 Admin 面板裏看到這個設置選項,怎麼設置這裏就不介紹了。主要就是配置下 amqp 協議地址,amqp://user:password@server-name/my-vhost 。
若是配置沒有問題的話,應該是這樣的一個狀態,說明已經順利鏈接到 rabbit_node2 broker 。
咱們來看下 rabbit_node1 和 rabbit_node2 的 Connections 面板。
rabbit_node1(10.211.55.3):
rabbit_node2(10.211.55.4):
RabbitMQ shovel-plugin 插件在 rabbit_node1 broker 建立了兩個 tcp 鏈接,端口 39544 鏈接是用來消費 plen.queue 裏的消息,端口 55706 鏈接是用來推送消息給 rabbit_node2 。
咱們來看下 rabbit_node1 tcp 鏈接狀態:
tcp6 0 0 10.211.55.3:5672 10.211.55.3:39544 ESTABLISHED tcp 0 0 10.211.55.3:55706 10.211.55.4:5672 ESTABLISHED
rabbit_node2 tcp 鏈接狀態:
tcp6 0 0 10.211.55.4:5672 10.211.55.3:55706 ESTABLISHED
爲了驗證 shovel-plugin 穩定性,咱們將 rabbit_node2 下線。
而後再發送消息,發現消息會如今 rabbit_node1 plen.queue 裏待着,一旦 shovel-plugin 鏈接恢復將消費 rabbit_node1 plen.queue 消息,而後投遞給 rabbit_node2 plen.queue 。
做者:王清培 (滬江集團資深JAVA架構師)