以前探討了springboot 集成 rabbitmq 以及開啓ack模式 java
傳送門:http://www.javashuo.com/article/p-ujhmvzoi-kt.htmlgit
接着該篇 搞一下 死信隊列spring
死信隊列 聽上去像 消息「死」了 其實也有點這個意思,死信隊列 是 當消息在一個隊列 由於下列緣由:springboot
變成了 「死信」 後 被從新投遞(publish)到另外一個Exchange 該Exchange 就是DLX 而後該Exchange 根據綁定規則 轉發到對應的 隊列上 監聽該隊列 就能夠從新消費 說白了 就是 沒有被消費的消息 換個地方從新被消費app
生產者 --> 消息 --> 交換機 --> 隊列 --> 變成死信 --> DLX交換機 -->隊列 --> 消費者dom
下面咱們模擬一個死信隊列的應用場景 消息延時處理測試
仍是以這個項目爲基礎: https://gitee.com/felord/springboot-messageui
項目中 RabbitConfig 死信相關片斷:編碼
/** * 死信隊列跟交換機類型沒有關係 不必定爲directExchange 不影響該類型交換機的特性. * * @return the exchange */ @Bean("deadLetterExchange") public Exchange deadLetterExchange() { return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build(); } /** * 聲明一個死信隊列. * x-dead-letter-exchange 對應 死信交換機 * x-dead-letter-routing-key 對應 死信隊列 * * @return the queue */ @Bean("deadLetterQueue") public Queue deadLetterQueue() { Map<String, Object> args = new HashMap<>(2); // x-dead-letter-exchange 聲明 死信交換機 args.put(DEAD_LETTER_QUEUE_KEY, "DL_EXCHANGE"); // x-dead-letter-routing-key 聲明 死信路由鍵 args.put(DEAD_LETTER_ROUTING_KEY, "KEY_R"); return QueueBuilder.durable("DL_QUEUE").withArguments(args).build(); } /** * 定義死信隊列轉發隊列. * * @return the queue */ @Bean("redirectQueue") public Queue redirectQueue() { return QueueBuilder.durable("REDIRECT_QUEUE").build(); } /** * 死信路由經過 DL_KEY 綁定鍵綁定到死信隊列上. * * @return the binding */ @Bean public Binding deadLetterBinding() { return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null); } /** * 死信路由經過 KEY_R 綁定鍵綁定到死信隊列上. * * @return the binding */ @Bean public Binding redirectBinding() { return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null); }
說明: spa
deadLetterExchange()聲明瞭一個Direct 類型的Exchange (死信隊列跟交換機沒有關係)
deadLetterQueue() 聲明瞭一個隊列 這個隊列 跟前面咱們聲明的隊列不同 注入了 Map<String,Object> 參數 下面的概念很是重要
x-dead-letter-exchange 來標識一個交換機 x-dead-letter-routing-key 來標識一個綁定鍵(RoutingKey) 這個綁定鍵 是分配給 標識的交換機的 若是沒有特殊指定 聲明隊列的原routingkey , 若是有隊列經過此綁定鍵 綁定到交換機 那麼死信會被該交換機轉發到 該隊列上 經過監聽 可對消息進行消費
能夠打個比方 這個是爲主力隊員 設置了一個替補 若是主力 「死」了 他的活 替補接手 這樣更好理解
deadLetterBinding() 對這個帶參隊列 進行了 和交換機的規則綁定 等下 消費者 先把消息經過交換機投遞到該隊列中去 而後製造條件發生「死信」
redirectBinding() 咱們須要給標識的交換機 以及對其指定的routingkey 來綁定一個所謂的「替補」隊列 用來監聽
流程具體是 消息投遞到 DL_QUEUE 10秒後消息過時 生成死信 而後轉發到 REDIRECT_QUEUE 經過對其的監聽 來消費消息
SendController 增長消費發送接口
/** * 測試死信隊列. * * @param p the p * @return the response entity */ @RequestMapping("/dead") public ResponseEntity deadLetter(String p) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 聲明消息處理器 這個對消息進行處理 能夠設置一些參數 對消息進行一些定製化處理 咱們這裏 來設置消息的編碼 以及消息的過時時間 由於在.net 以及其餘版本過時時間不一致 這裏的時間毫秒值 爲字符串 MessagePostProcessor messagePostProcessor = message -> { MessageProperties messageProperties = message.getMessageProperties(); // 設置編碼 messageProperties.setContentEncoding("utf-8"); // 設置過時時間10*1000毫秒 messageProperties.setExpiration("10000"); return message; }; // 向DL_QUEUE 發送消息 10*1000毫秒後過時 造成死信 rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", p, messagePostProcessor, correlationData); return ResponseEntity.ok(); }
監聽 REDIRECT_QUEUE
/** * 監聽替補隊列 來驗證死信. * * @param message the message * @param channel the channel * @throws IOException the io exception 這裏異常須要處理 */ @RabbitListener(queues = {"REDIRECT_QUEUE"}) public void redirect(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.debug("dead message 10s 後 消費消息 {}",new String (message.getBody())); }
測試死信隊列接口
不出意外 消息會在發出10秒後 才被消費 一下信息證明了這一猜想
相關源碼:https://gitee.com/felord/springboot-message 但願你們點個贊