springboot rabbitmq 之死信隊列(延遲消費消息)

以前探討了springboot 集成 rabbitmq  以及開啓ack模式   java

傳送門:http://www.javashuo.com/article/p-ujhmvzoi-kt.htmlgit

接着該篇 搞一下 死信隊列spring

  • 概念

死信隊列 聽上去像 消息「死」了     其實也有點這個意思,死信隊列  是 當消息在一個隊列 由於下列緣由:springboot

  1. 消息被拒絕(basic.reject/ basic.nack)而且再也不從新投遞 requeue=false
  2. 消息超期 (rabbitmq  Time-To-Live -> messageProperties.setExpiration()) 
  3. 隊列超載

變成了 「死信」 後    被從新投遞(publish)到另外一個Exchange   該Exchange 就是DLX     而後該Exchange 根據綁定規則 轉發到對應的 隊列上  監聽該隊列  就能夠從新消費     說白了 就是  沒有被消費的消息  換個地方從新被消費app

              生產者   -->  消息 --> 交換機  --> 隊列  --> 變成死信  --> DLX交換機 -->隊列 --> 消費者dom

  • springboot rabbitmq 死信隊列實踐

下面咱們模擬一個死信隊列的應用場景   消息延時處理測試

仍是以這個項目爲基礎: https://gitee.com/felord/springboot-messageui

 項目中 RabbitConfig  死信相關片斷:編碼

/**
     * 死信隊列跟交換機類型沒有關係 不必定爲directExchange  不影響該類型交換機的特性.
     *
     * @return the exchange
     */
    @Bean("deadLetterExchange")
    public Exchange deadLetterExchange() {
        return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
    }

    /**
     * 聲明一個死信隊列.
     * x-dead-letter-exchange   對應  死信交換機
     * x-dead-letter-routing-key  對應 死信隊列
     *
     * @return the queue
     */
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    聲明  死信交換機
        args.put(DEAD_LETTER_QUEUE_KEY, "DL_EXCHANGE");
//       x-dead-letter-routing-key    聲明 死信路由鍵
        args.put(DEAD_LETTER_ROUTING_KEY, "KEY_R");
        return QueueBuilder.durable("DL_QUEUE").withArguments(args).build();
    }

    /**
     * 定義死信隊列轉發隊列.
     *
     * @return the queue
     */
    @Bean("redirectQueue")
    public Queue redirectQueue() {
        return QueueBuilder.durable("REDIRECT_QUEUE").build();
    }

    /**
     * 死信路由經過 DL_KEY 綁定鍵綁定到死信隊列上.
     *
     * @return the binding
     */
    @Bean
    public Binding deadLetterBinding() {
        return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);

    }

    /**
     * 死信路由經過 KEY_R 綁定鍵綁定到死信隊列上.
     *
     * @return the binding
     */
    @Bean
    public Binding redirectBinding() {
        return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null);
    }

說明: spa

deadLetterExchange()聲明瞭一個Direct 類型的Exchange (死信隊列跟交換機沒有關係

deadLetterQueue() 聲明瞭一個隊列   這個隊列 跟前面咱們聲明的隊列不同    注入了 Map<String,Object> 參數    下面的概念很是重要   

x-dead-letter-exchange 來標識一個交換機  x-dead-letter-routing-key  來標識一個綁定鍵(RoutingKey)  這個綁定鍵 是分配給 標識的交換機的   若是沒有特殊指定 聲明隊列的原routingkey , 若是有隊列經過此綁定鍵 綁定到交換機    那麼死信會被該交換機轉發到 該隊列上  經過監聽 可對消息進行消費  

能夠打個比方  這個是爲主力隊員 設置了一個替補   若是主力 「死」了   他的活 替補接手  這樣更好理解

deadLetterBinding() 對這個帶參隊列 進行了 和交換機的規則綁定   等下 消費者 先把消息經過交換機投遞到該隊列中去   而後製造條件發生「死信」 

redirectBinding() 咱們須要給標識的交換機  以及對其指定的routingkey 來綁定一個所謂的「替補」隊列 用來監聽

 

流程具體是  消息投遞到  DL_QUEUE  10秒後消息過時 生成死信    而後轉發到 REDIRECT_QUEUE  經過對其的監聽  來消費消息     

SendController 增長消費發送接口 

/**
     * 測試死信隊列.
     *
     * @param p the p
     * @return the response entity
     */
    @RequestMapping("/dead")
    public ResponseEntity deadLetter(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//        聲明消息處理器  這個對消息進行處理  能夠設置一些參數   對消息進行一些定製化處理   咱們這裏  來設置消息的編碼  以及消息的過時時間  由於在.net 以及其餘版本過時時間不一致   這裏的時間毫秒值 爲字符串
        MessagePostProcessor messagePostProcessor = message -> {
            MessageProperties messageProperties = message.getMessageProperties();
//            設置編碼
            messageProperties.setContentEncoding("utf-8");
//            設置過時時間10*1000毫秒
            messageProperties.setExpiration("10000");
            return message;
        };
//         向DL_QUEUE 發送消息  10*1000毫秒後過時 造成死信
        rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", p, messagePostProcessor, correlationData);
        return ResponseEntity.ok();
    }

 

監聽 REDIRECT_QUEUE 

/**
     * 監聽替補隊列 來驗證死信.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception  這裏異常須要處理
     */
    @RabbitListener(queues = {"REDIRECT_QUEUE"})
    public void redirect(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        log.debug("dead message  10s 後 消費消息 {}",new String (message.getBody()));
    }

 

測試死信隊列接口

不出意外  消息會在發出10秒後 才被消費     一下信息證明了這一猜想

 

相關源碼:https://gitee.com/felord/springboot-message      但願你們點個贊

相關文章
相關標籤/搜索