rabbitmq利用死信隊列+TTL 實現延遲隊列

適用場景:訂單超時未支付,假若適用定時器的話,那麼數據量大的話,輪詢查詢數據,首先IO開銷大,其次任務時間要求高,掃描越頻繁性能可能就越低。
延遲隊列顧名思義延遲消費數據,那麼先解釋下延遲隊列涉及的關鍵概念
一、消息的TTL(Time To Live)
RabbitMQ容許爲消息和隊列設置TTL(生存時間),若對消息設置了ttl,若是超過了ttl配置則消息死了,稱之爲死信.請注意,路由到多個隊列的消息可能會在其所在的每一個隊列中的不一樣時間或根本不會消亡(不一樣隊列分別設置ttl)。故一個隊列中的消息死亡對其餘隊列中相同消息的生命沒有影響。對隊列設置就是隊列沒有消費者連着的保留時間。java

1.1 對消息設置統一TTLgit

@Bean
    public Queue delayTTLQueue() {
        Map<String,Object> paramMap = new HashMap<>();
        paramMap.put("x-dead-letter-exchange",RabbitMqConfig.DELAY_EXCHANGE_NAME);
        paramMap.put("x-dead-letter-routing-key",RabbitMqConfig.DELAY_QUEUE_ROUTING_KEY);
        paramMap.put("x-message-ttl",3000);
        return new Queue(RabbitMqConfig.QUEUE_TTL_NAME,true,false,false,paramMap);
    }

1.2 對消息分別設置TTL,對比上一個少了個x-messgae-ttl參數設置github

/**
     * 延遲隊列,超時時間不一致
     * @return
     */
    @Bean
    public Queue delayTTLQueue2() {
        Map<String,Object> paramMap = new HashMap<>();
        paramMap.put("x-dead-letter-exchange",RabbitMqConfig.DELAY_EXCHANGE_NAME);
        paramMap.put("x-dead-letter-routing-key",RabbitMqConfig.DELAY_QUEUE_ROUTING_KEY);
        return new Queue(RabbitMqConfig.QUEUE_TTL_NAME2,true,false,false,paramMap);
    }

 注意:生產者發送消息時,給他們設置了不一樣時間的過時app

/**
     * 延遲時間不一致
     * @param message
     * @throws InterruptedException
     */
    public void sendPerQueueTTL2(Object message) throws InterruptedException {
        DelayConsumer.latch = new CountDownLatch(3);
        for (int i = 1; i <= 3; i++) {
            long expiration = i * 1000;
            rabbitTemplate.convertAndSend(RabbitMqConfig.QUEUE_TTL_EXCHANGE_NAME,RabbitMqConfig.QUEUE_TTL_ROUTING_KEY2,
                    message+" the expiration time is "+expiration,new ExpirationMessagePostProcessor(expiration));
        }
        DelayConsumer.latch.await();
    }

二、死信交換機DLX(Dead Letter Exchanges)ide

一個消息在知足以下條件下,會進死信交換機,記住這裏是交換機而不是隊列,一個交換機能夠對應不少隊列。
2.1 消息被Consumer拒收了,而且reject方法的參數裏requeue是false。也就是說不會被再次放在隊列裏,被其餘消費者使用。
2.2 上面的消息的TTL到了,消息過時了。
2.3 隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。
死信交換機就是普通的交換機,只是由於咱們把過時的消息扔進去,因此叫死信交換機,並非說死信交換機是某種特定的交換機性能

大體流程圖測試

三、代碼示例ui

3.1 queue配置code

/**
     * 實際消費隊列
     * @return
     */
    @Bean
    public Queue delayQueue() {
        return new Queue(RabbitMqConfig.DELAY_PROCESS_QUEUE_NAME,true,false,false);
    }

    /**
     * 延遲隊列
     * @return
     */
    @Bean
    public Queue delayTTLQueue() {
        Map<String,Object> paramMap = new HashMap<>();
        paramMap.put("x-dead-letter-exchange",RabbitMqConfig.DELAY_EXCHANGE_NAME);
        paramMap.put("x-dead-letter-routing-key",RabbitMqConfig.DELAY_QUEUE_ROUTING_KEY);
        paramMap.put("x-message-ttl",3000);
        return new Queue(RabbitMqConfig.QUEUE_TTL_NAME,true,false,false,paramMap);
    }

    /**
     * 延遲隊列,超時時間不一致
     * @return
     */
    @Bean
    public Queue delayTTLQueue2() {
        Map<String,Object> paramMap = new HashMap<>();
        paramMap.put("x-dead-letter-exchange",RabbitMqConfig.DELAY_EXCHANGE_NAME);
        paramMap.put("x-dead-letter-routing-key",RabbitMqConfig.DELAY_QUEUE_ROUTING_KEY);
        return new Queue(RabbitMqConfig.QUEUE_TTL_NAME2,true,false,false,paramMap);
    }

3.2 exchange 配置blog

/**
     * 死信交換機
     * @return
     */
    @Bean
    public DirectExchange  delayExchange(){
        DirectExchange directExchange = new DirectExchange(RabbitMqConfig.DELAY_EXCHANGE_NAME,true,false);
        return directExchange;
    }

    /**
     * 延遲交換機
     * @return
     */
    @Bean
    public DirectExchange  perQueueTTLExchange(){
        DirectExchange directExchange = new DirectExchange(RabbitMqConfig.QUEUE_TTL_EXCHANGE_NAME,true,false);
        return directExchange;
    }

3.3 exchange和queue綁定關係

/**
     * 綁定死信隊列
     * @return
     */
    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(queueConfig.delayQueue()).to(exchangeConfig.delayExchange()).with(RabbitMqConfig.DELAY_QUEUE_ROUTING_KEY);
    }

    /**
     * 綁定延遲隊列1
     * @return
     */
    @Bean
    public Binding queueTTLBinding() {
        return BindingBuilder.bind(queueConfig.delayTTLQueue()).to(exchangeConfig.perQueueTTLExchange()).with(RabbitMqConfig.QUEUE_TTL_ROUTING_KEY);
    }

    /**
     * 綁定延遲隊列2
     * @return
     */
    @Bean
    public Binding queueTTLBinding2() {
        return BindingBuilder.bind(queueConfig.delayTTLQueue2()).to(exchangeConfig.perQueueTTLExchange()).with(RabbitMqConfig.QUEUE_TTL_ROUTING_KEY2);
    }

3.4 生產者發送消息

public void sendPerQueueTTL(Object message) throws InterruptedException {
        DelayConsumer.latch = new CountDownLatch(3);
        for (int i = 1; i <= 3; i++) {
            rabbitTemplate.convertAndSend(RabbitMqConfig.QUEUE_TTL_EXCHANGE_NAME,RabbitMqConfig.QUEUE_TTL_ROUTING_KEY, message);
        }
        DelayConsumer.latch.await();
    }


    /**
     * 延遲時間不一致
     * @param message
     * @throws InterruptedException
     */
    public void sendPerQueueTTL2(Object message) throws InterruptedException {
        DelayConsumer.latch = new CountDownLatch(3);
        for (int i = 1; i <= 3; i++) {
            long expiration = i * 1000;
            rabbitTemplate.convertAndSend(RabbitMqConfig.QUEUE_TTL_EXCHANGE_NAME,RabbitMqConfig.QUEUE_TTL_ROUTING_KEY2,
                    message+" the expiration time is "+expiration,new ExpirationMessagePostProcessor(expiration));
        }
        DelayConsumer.latch.await();
    }

3.5 消費者消費對應消息

@Component
public class DelayConsumer implements ChannelAwareMessageListener {
    public static CountDownLatch latch;
    public static final String FAIL_MESSAGE = "This message will fail";
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            processMessage(message);
        }
        catch (Exception e) {
            // 若是發生了異常,則將該消息重定向到緩衝隊列,會在必定延遲以後自動重作
            channel.basicPublish(RabbitMqConfig.QUEUE_TTL_EXCHANGE_NAME, RabbitMqConfig.QUEUE_TTL_ROUTING_KEY, null,
                    "The failed message will auto retry after a certain delay".getBytes());
        }

        if (latch != null) {
            latch.countDown();
        }
    }

    /**
     * 模擬消息處理。若是當消息內容爲FAIL_MESSAGE的話,則須要拋出異常
     *
     * @param message
     * @throws Exception
     */
    public void processMessage(Message message) throws Exception {
        String realMessage = new String(message.getBody());
        System.out.println("Received <" + realMessage + ">");
        if (Objects.equals(realMessage, FAIL_MESSAGE)) {
            throw new Exception("Some exception happened");
        }
    }

}

測試結果

設置消息超時一致的測試結果如圖:

設置消息超時時間一致

本文示例代碼 詳見 GitHub 死信隊列+TTL實現延遲隊列

相關文章
相關標籤/搜索