RabbitMQ 的消息確認機制以下:
java
從圖中咱們能夠看出:git
這兩個機制都是收到 TCP 協議的啓發,它們對於數據安全相當重要。
下面就分別從生產者、消費者兩個方面結合實例來認識消息確認機制。github
備註:web
咱們先來看一下RabbitMQ 消息投遞和接收的一個完整鏈路以下:
spring
消息投遞的鏈路用文字表示:producer->rabbitmq broker cluster->exchange->queue->consumer
segmentfault
因爲:安全
在編碼時咱們能夠用兩個選項用來控制消息投遞的可靠性:服務器
confirmCallback
;returnCallback
咱們能夠利用這兩個 callback 接口來控制消息的一致性和處理一部分的異常狀況。微信
server.port=10420 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 開啓發送確認 spring.rabbitmq.publisher-confirms=true # 開啓發送失敗退回(消息有沒有找到合適的隊列) spring.rabbitmq.publisher-returns=true
在 RabbitConfig 配置類裏,定義 RabbitTemplate Bean,使用 callback 接口:網絡
/** * RabbitMQ配置 * * @author lyf * @公衆號 全棧在路上 * @GitHub https://github.com/liuyongfei1 * @date 2020-05-17 17:20 **/ @Slf4j @Configuration public class RabbitConfig { @Autowired CachingConnectionFactory cachingConnectionFactory; @Bean RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); // 消息只要被 rabbitmq broker 接收到就會執行 confirmCallback // 若是是 cluster 模式,須要全部 broker 接收到纔會調用 confirmCallback // 被 broker 接收到只能表示 message 已經到達服務器,並不能保證消息必定會被投遞到目標 queue 裏 rabbitTemplate.setConfirmCallback((data, ack, cause) -> { String msgId = data.getId(); if (ack) { log.info(msgId + ": 消息發送成功"); } else { log.info(msgId + ": 消息發送失敗"); } }); // confirm 模式只能保證消息到達 broker,不能保證消息準確投遞到目標 queue 裏。 // 在有些業務場景下,咱們須要保證消息必定要投遞到目標 queue 裏,此時就須要用到 return 退回模式 // 這樣若是未能投遞到目標 queue 裏將調用 returnCallback,能夠記錄下詳細到投遞數據,按期的巡檢或者自動糾錯都須要這些數據 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.info(MessageFormat.format("消息發送失敗,ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode, replyText, exchange, routingKey)); // TODO 作消息發送失敗時的處理邏輯 }); return rabbitTemplate; } /** * 聲明隊列 * 參數說明: * durable 是否持久化,默認是false(持久化隊列則數據會被存儲在磁盤上,當消息代理重啓時數據不會丟失;暫存隊列只對當前鏈接有效) * exclusive 默認是false,只能被當前建立的鏈接使用,並且當鏈接關閉後隊列即被刪除。此參考優先級高於durable * autoDelete 默認是false,是否自動刪除,當沒有生產者或者消費者使用此隊列,該隊列會自動刪除。 * 通常設置一下隊列的持久化就好,其他兩個就是默認false * * @return Queue **/ @Bean Queue myQueue() { return new Queue(QueueConstants.QUEUE\_NAME, true); } // 設置交換機,類型爲 direct @Bean DirectExchange myExchange() { return new DirectExchange(QueueConstants.QUEUE\_EXCHANGE\_NAME, true, false); } // 綁定:將交換機和隊列綁定,並設置路由匹配鍵 @Bean Binding queueBinding() { return BindingBuilder.bind(myQueue()).to(myExchange()).with(QueueConstants.QUEUE\_ROUTING\_KEY\_NAME); }
在 ProducerController 裏,主要乾了如下幾件事:
sendDirectMessage
,經過請求該接口,能夠實現生產者發送消息的功能;CorrelationData
,該對象內部只有一個 id 屬性,用來表示消息的惟一性;/** * 消息生產端 * @公衆號 全棧在路上 * @GitHub https://github.com/liuyongfei1 * @author lyf * @date 2020-05-17 18:30 **/ @RestController public class ProducerController { /\*\* \* RabbitTemplate提供了發送/接收消息的方法 \*/ @Autowired RabbitTemplate rabbitTemplate; /** * 生產消息 * * @Author Liuyongfei * @Date 上午12:12 2020/5/20 * @param test * @param test2 * @return java.lang.String **/ @GetMapping("/sendDirectMessage") public String sendDirectMessage(String test,Integer test2) { // 生成消息的惟一id String msgId = UUID.randomUUID().toString(); String messageData = "hello,this is rabbitmq demo message"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); // 定義要發送的消息對象 Map<String,Object\> messageObj = new HashMap<>(); messageObj.put("msgId",msgId); messageObj.put("messageData",messageData); messageObj.put("createTime",createTime); rabbitTemplate.convertAndSend(QueueConstants.QUEUE\_EXCHANGE\_NAME,QueueConstants.QUEUE\_ROUTING\_KEY\_NAME, messageObj,new CorrelationData(msgId)); return "message send ok"; } }
setConfirmCallback
方法內部打上斷點;至此,生產者消息確認結束,且經過運行的實例,咱們可以得出結論:本次生產的消息已經正確無誤的投遞到了隊列中去。
消費者確認指的就是 RabbitMQ 須要確認消息到底有沒有被收到,來肯定要不要將該條消息從隊列中刪除掉。這就須要消費者來告訴 RabbitMQ,有如下兩種方式:
消費者在消費消息的時候,若是設定應答模式爲自動,則消費者收到消息後,消息就會當即被 RabbitMQ 從 隊列中刪除掉。
所以,在實際開發者,咱們基本上是將消費應答模式設置爲手動確認更爲穩當一些。
消費者在收到消息後:
server.port=10421 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 開啓 ACK(消費者接收到消息時手動確認) spring.rabbitmq.listener.simple.acknowledge-mode=manual
ConsumerController 裏主要乾了如下幾件事兒:
@RabbitListener
來監聽隊列;deliveryTag
;channel.basicAck
來確認消息已經消費;channel.basicNack
把消費失敗的消息從新放入到隊列中去。/** * 消息消費端 * @公衆號 全棧在路上 * @GitHub https://github.com/liuyongfei1 * @author Liuyongfei * @date 2020-05-21 18:00 **/ @Component public class ConsumerController { @RabbitListener(queues = {QueueConstants.QUEUE\_NAME}) public void handler(Message message, Channel channel) throws IOException { System.out.println("收到消息:" + message.toString()); MessageHeaders headers = message.getHeaders(); Long tag = (Long) headers.get(AmqpHeaders.DELIVERY\_TAG); try { // 手動確認消息已消費 channel.basicAck(tag,false); } catch (IOException e) { // 把消費失敗的消息從新放入到隊列 channel.basicNack(tag, false, true); e.printStackTrace(); } } }
至此,消費者消息確認結束。你們能夠在 ConsumerController 裏添加一些測試代碼來觸發異常,體驗一下 channel.basicNack
的做用。這裏我就再也不一一測試。