Rabbit MQ 延遲隊列

問題描述

在訂單系統,當用戶下單後須要在10分鐘內完成支付,不然取消訂單。java

解決方案

  1. 若是咱們使用定時任務來作,那這個失效時間對不許確,當時能夠提升定時任務的執行頻率來減少這個偏差。
  2. 使用延遲隊列,咱們這裏主要將這種方式。

基本概念

所謂的‘延遲隊列「就是消息被髮送之後,不直接被消費者消費,而是等到特定時間後消費者才能拿到消息消費。git

延遲隊列模型

延遲隊列模型.png

詳細說明

RabbitMQ自己不支持延遲隊列,可是咱們可使用死信隊列(DLX)和設置有效時間(TTL)兩個特性來實現延遲隊列。github

先新建隊列order_query並設置消息有效時間是10分鐘,而後綁定一個死信隊列order_dead_query,消費者消費order_dead_query隊列的消息。生成訂單的時候往隊列order_query發一條消息,當10分鐘後這條消息會進入死信隊列order_dead_query裏面並被咱們消費者消費,這時咱們去查詢一下該訂單的支付狀態,若是是已支付不作任何操做,若是是未支付就取消訂單。消息的發送端參考Spring Boot RabbitMQ實踐spring

聲明隊列 RabbitConfig

/**
 * RabbitMQ 配置類
 *
 * @author yuhao.wang
 */
@Configuration
public class RabbitConfig {

    /**
     * 方法rabbitAdmin的功能描述:動態聲明queue、exchange、routing
     *
     * @param connectionFactory
     * @return
     * @author : yuhao.wang
     */
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        //聲明死信隊列(Fanout類型的exchange)
        Queue deadQueue = new Queue(RabbitConstants.QUEUE_NAME_DEAD_QUEUE);
        // 死信隊列交換機
        FanoutExchange deadExchange = new FanoutExchange(RabbitConstants.MQ_EXCHANGE_DEAD_QUEUE);
        rabbitAdmin.declareQueue(deadQueue);
        rabbitAdmin.declareExchange(deadExchange);
        rabbitAdmin.declareBinding(BindingBuilder.bind(deadQueue).to(deadExchange));

        // 發放獎勵隊列交換機
        DirectExchange exchange = new DirectExchange(RabbitConstants.MQ_EXCHANGE_SEND_AWARD);

        //聲明發送優惠券的消息隊列(Direct類型的exchange)
        Queue couponQueue = queue(RabbitConstants.QUEUE_NAME_SEND_COUPON);
        rabbitAdmin.declareQueue(couponQueue);
        rabbitAdmin.declareExchange(exchange);
        rabbitAdmin.declareBinding(BindingBuilder.bind(couponQueue).to(exchange).with(RabbitConstants.MQ_ROUTING_KEY_SEND_COUPON));

        return rabbitAdmin;
    }

    public Queue queue(String name) {
        Map<String, Object> args = new HashMap<>();
        // 設置死信隊列
        args.put("x-dead-letter-exchange", RabbitConstants.MQ_EXCHANGE_DEAD_QUEUE);
        args.put("x-dead-letter-routing-key", RabbitConstants.MQ_ROUTING_KEY_DEAD_QUEUE);
        // 設置消息的過時時間, 單位是毫秒
        args.put("x-message-ttl", 5000);

        // 是否持久化
        boolean durable = true;
        // 僅建立者可使用的私有隊列,斷開後自動刪除
        boolean exclusive = false;
        // 當全部消費客戶端鏈接斷開後,是否自動刪除隊列
        boolean autoDelete = false;
        return new Queue(name, durable, exclusive, autoDelete, args);
    }
}

設置消息的過時時間, 單位是毫秒 args.put("x-message-ttl", 5000);數據庫

消費者消費死信隊列 DeadMessageListener

/**
 * 延遲隊列消費
 *
 * @author yuhao.wang
 */
@Service
public class DeadMessageListener {

    private final Logger logger = LoggerFactory.getLogger(DeadMessageListener.class);

    @RabbitListener(queues = RabbitConstants.QUEUE_NAME_DEAD_QUEUE)
    public void process(SendMessage sendMessage, Channel channel, Message message) throws Exception {
        logger.info("[{}]處理延遲隊列消息隊列接收數據,消息體:{}", RabbitConstants.QUEUE_NAME_SEND_COUPON, JSON.toJSONString(sendMessage));

        System.out.println(message.getMessageProperties().getDeliveryTag());

        try {
            // 參數校驗
            Assert.notNull(sendMessage, "sendMessage 消息體不能爲NULL");

            // TODO 處理消息

            // 確認消息已經消費成功
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            logger.error("MQ消息處理異常,消息體:{}", message.getMessageProperties().getCorrelationIdString(), JSON.toJSONString(sendMessage), e);

            try {
                // TODO 保存消息到數據庫

                // 確認消息已經消費成功
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception dbe) {
                logger.error("保存異常MQ消息到數據庫異常,放到死性隊列,消息體:{}", JSON.toJSONString(sendMessage), dbe);
                // 確認消息將消息放到死信隊列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            }
        }
    }
}

消費端直接監聽死信隊列,達到延遲消費消息的效果spring-boot

源碼

https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releasesui

spring-boot-student-rabbitmq 工程code

相關文章
相關標籤/搜索