Rabbitmq的可靠消息投遞

1、背景

生產端向rabbitmq發送消息時,因爲網絡等緣由可能致使消息發送失敗。因此,rabbitmq必須有機制確保消息能準確到達mq,若是不能到達,必須反饋給生產端進行重發。redis

RabbitMQ消息的可靠性投遞主要兩種實現:
一、經過實現消費的重試機制,經過@Retryable來實現重試,能夠設置重試次數和重試頻率;
二、生產端實現消息可靠性投遞。spring

兩種方法消費端均可能收到重複消息,要求消費端必須實現冪等性消費。數據庫

2、消息投遞到exchange的確認模式

rabbitmq的消息投遞的過程爲:
producer ——> rabbitmq broker cluster ——> exchange ——> queue ——> consumer緩存

一、生產端發送消息到rabbitmq broker cluster後,異步接受從rabbitmq返回的ack確認信息。安全

二、生產端收到返回的ack確認消息後,根據ack是true仍是false,調用confirmCallback接口進行處理。網絡

 

在application.yml中開啓生產端confirm模式併發

spring: rabbitmq: publisher-confirms: true

 

實現ConfirmCallback接口中的confirm方法,ack爲true表示消息發送成功,ack爲false表示消息發送失敗app

@Component @Slf4j public class RabbitTemplateConfig implements ConfirmCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this);   // 指定 ConfirmCallback
 } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { //try to resend msg
       } else { //delete msg in db
 } } }

注意:在confirmCallback回調接口中是沒有消息數據的,因此即便消息發送失敗,生產端也沒法在這個回調接口中直接重發,confirmCallback只能起到一個通知的做用。異步

3、消息投遞失敗的重發機制

若是rabbitmq返回ack失敗,生產端也沒法確認消息是否真的發送成功,也會形成數據丟失。最好的辦法是使用rabbitmq的事務機制,可是rabbitmq的事務機制效率極低,每秒鐘處理的消息僅幾百條,不適合併發量大的場景。 ide

 

另一種實現思路:

一、生產端保存每次發送的消息,若是發送成功就刪除消息;

二、若是發送失敗就取出消息從新發送;

三、若是超時尚未收到mq返回的ack,一樣取出消息從新發送。

這樣就能夠避免消息丟失的風險。

 

以使用redis保存消息msg爲例,具體實現方案爲:

一、生產端在發送消息以前,生成ack惟一確認的id;

二、以ackId爲鍵,消息爲value,保存進redis緩存,設置超時時間;

三、redis實現超時觸發接口,當key過時時,重發消息並再次執行第2步;

四、生產端實現ConfirmCallback接口;

五、ConfirmCallback接口觸發時,若ack爲true,則直接刪除這次ackId對應的msg;若ack爲false,則將該ackId對應的msg取出重發;

 

網上另外的實現方案:

不經過設置redis超時時間觸發超時事件進行重發,而是取出消息放入一個ackFailList中,而後開啓定時任務,掃描ackFailList,重發失敗的msg。

網上的這套方案思路上和上一個方案差很少,可是是採用的額外的List來保存發送失敗的消息,因爲List保存在內存中,不具有持久化的功能,因此這樣並不安全,若是生產端程序異常退出將致使消息丟失。能夠考慮保存到數據庫中。

4、消息未投遞到queue的退回模式

生產端經過實現ReturnCallback接口,啓動消息失敗返回,消息路由不到隊列時會觸發該回調接口。

 

在application.yml中開啓return模式

spring: rabbitmq: publisher-returns: true

 

實現ReturnCallback接口,能夠獲取消息主體內容,實現消息重發

@Component @Slf4j public class RabbitTemplateConfig implements ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setReturnCallback(this);   //指定 ReturnCallback
 } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息主體 message : {}", message); log.info("消息主體 message : {}", replyCode); log.info("描述:{}", replyText); log.info("消息使用的交換器 exchange : {}", exchange); log.info("消息使用的路由鍵 routing : {}", routingKey); } }
相關文章
相關標籤/搜索