生產端向rabbitmq發送消息時,因爲網絡等緣由可能致使消息發送失敗。因此,rabbitmq必須有機制確保消息能準確到達mq,若是不能到達,必須反饋給生產端進行重發。redis
RabbitMQ消息的可靠性投遞主要兩種實現:
一、經過實現消費的重試機制,經過@Retryable來實現重試,能夠設置重試次數和重試頻率;
二、生產端實現消息可靠性投遞。spring
兩種方法消費端均可能收到重複消息,要求消費端必須實現冪等性消費。數據庫
rabbitmq的消息投遞的過程爲:
producer ——> rabbitmq broker cluster ——> exchange ——> queue ——> consumer緩存
一、生產端發送消息到rabbitmq broker cluster後,異步接受從rabbitmq返回的ack確認信息。安全
二、生產端收到返回的ack確認消息後,根據ack是true仍是false,調用confirmCallback接口進行處理。網絡
在application.yml中開啓生產端confirm模式併發
spring: rabbitmq: publisher-confirms: true
實現ConfirmCallback接口中的confirm方法,ack爲true表示消息發送成功,ack爲false表示消息發送失敗app
@Component @Slf4j public class RabbitTemplateConfig implements ConfirmCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); // 指定 ConfirmCallback } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { //try to resend msg } else { //delete msg in db } } }
注意:在confirmCallback回調接口中是沒有消息數據的,因此即便消息發送失敗,生產端也沒法在這個回調接口中直接重發,confirmCallback只能起到一個通知的做用。異步
若是rabbitmq返回ack失敗,生產端也沒法確認消息是否真的發送成功,也會形成數據丟失。最好的辦法是使用rabbitmq的事務機制,可是rabbitmq的事務機制效率極低,每秒鐘處理的消息僅幾百條,不適合併發量大的場景。 ide
另一種實現思路:
一、生產端保存每次發送的消息,若是發送成功就刪除消息;
二、若是發送失敗就取出消息從新發送;
三、若是超時尚未收到mq返回的ack,一樣取出消息從新發送。
這樣就能夠避免消息丟失的風險。
以使用redis保存消息msg爲例,具體實現方案爲:
一、生產端在發送消息以前,生成ack惟一確認的id;
二、以ackId爲鍵,消息爲value,保存進redis緩存,設置超時時間;
三、redis實現超時觸發接口,當key過時時,重發消息並再次執行第2步;
四、生產端實現ConfirmCallback接口;
五、ConfirmCallback接口觸發時,若ack爲true,則直接刪除這次ackId對應的msg;若ack爲false,則將該ackId對應的msg取出重發;
網上另外的實現方案:
不經過設置redis超時時間觸發超時事件進行重發,而是取出消息放入一個ackFailList中,而後開啓定時任務,掃描ackFailList,重發失敗的msg。
網上的這套方案思路上和上一個方案差很少,可是是採用的額外的List來保存發送失敗的消息,因爲List保存在內存中,不具有持久化的功能,因此這樣並不安全,若是生產端程序異常退出將致使消息丟失。能夠考慮保存到數據庫中。
生產端經過實現ReturnCallback接口,啓動消息失敗返回,消息路由不到隊列時會觸發該回調接口。
在application.yml中開啓return模式
spring: rabbitmq: publisher-returns: true
實現ReturnCallback接口,能夠獲取消息主體內容,實現消息重發
@Component @Slf4j public class RabbitTemplateConfig implements ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setReturnCallback(this); //指定 ReturnCallback } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息主體 message : {}", message); log.info("消息主體 message : {}", replyCode); log.info("描述:{}", replyText); log.info("消息使用的交換器 exchange : {}", exchange); log.info("消息使用的路由鍵 routing : {}", routingKey); } }