在訂單系統,當用戶下單後須要在10分鐘內完成支付,不然取消訂單。java
所謂的‘延遲隊列「就是消息被髮送之後,不直接被消費者消費,而是等到特定時間後消費者才能拿到消息消費。git
RabbitMQ自己不支持延遲隊列,可是咱們可使用死信隊列(DLX)和設置有效時間(TTL)兩個特性來實現延遲隊列。github
先新建隊列order_query並設置消息有效時間是10分鐘,而後綁定一個死信隊列order_dead_query,消費者消費order_dead_query隊列的消息。生成訂單的時候往隊列order_query發一條消息,當10分鐘後這條消息會進入死信隊列order_dead_query裏面並被咱們消費者消費,這時咱們去查詢一下該訂單的支付狀態,若是是已支付不作任何操做,若是是未支付就取消訂單。消息的發送端參考Spring Boot RabbitMQ實踐 。spring
/** * RabbitMQ 配置類 * * @author yuhao.wang */ @Configuration public class RabbitConfig { /** * 方法rabbitAdmin的功能描述:動態聲明queue、exchange、routing * * @param connectionFactory * @return * @author : yuhao.wang */ @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); //聲明死信隊列(Fanout類型的exchange) Queue deadQueue = new Queue(RabbitConstants.QUEUE_NAME_DEAD_QUEUE); // 死信隊列交換機 FanoutExchange deadExchange = new FanoutExchange(RabbitConstants.MQ_EXCHANGE_DEAD_QUEUE); rabbitAdmin.declareQueue(deadQueue); rabbitAdmin.declareExchange(deadExchange); rabbitAdmin.declareBinding(BindingBuilder.bind(deadQueue).to(deadExchange)); // 發放獎勵隊列交換機 DirectExchange exchange = new DirectExchange(RabbitConstants.MQ_EXCHANGE_SEND_AWARD); //聲明發送優惠券的消息隊列(Direct類型的exchange) Queue couponQueue = queue(RabbitConstants.QUEUE_NAME_SEND_COUPON); rabbitAdmin.declareQueue(couponQueue); rabbitAdmin.declareExchange(exchange); rabbitAdmin.declareBinding(BindingBuilder.bind(couponQueue).to(exchange).with(RabbitConstants.MQ_ROUTING_KEY_SEND_COUPON)); return rabbitAdmin; } public Queue queue(String name) { Map<String, Object> args = new HashMap<>(); // 設置死信隊列 args.put("x-dead-letter-exchange", RabbitConstants.MQ_EXCHANGE_DEAD_QUEUE); args.put("x-dead-letter-routing-key", RabbitConstants.MQ_ROUTING_KEY_DEAD_QUEUE); // 設置消息的過時時間, 單位是毫秒 args.put("x-message-ttl", 5000); // 是否持久化 boolean durable = true; // 僅建立者可使用的私有隊列,斷開後自動刪除 boolean exclusive = false; // 當全部消費客戶端鏈接斷開後,是否自動刪除隊列 boolean autoDelete = false; return new Queue(name, durable, exclusive, autoDelete, args); } }
設置消息的過時時間, 單位是毫秒 args.put("x-message-ttl", 5000);數據庫
/** * 延遲隊列消費 * * @author yuhao.wang */ @Service public class DeadMessageListener { private final Logger logger = LoggerFactory.getLogger(DeadMessageListener.class); @RabbitListener(queues = RabbitConstants.QUEUE_NAME_DEAD_QUEUE) public void process(SendMessage sendMessage, Channel channel, Message message) throws Exception { logger.info("[{}]處理延遲隊列消息隊列接收數據,消息體:{}", RabbitConstants.QUEUE_NAME_SEND_COUPON, JSON.toJSONString(sendMessage)); System.out.println(message.getMessageProperties().getDeliveryTag()); try { // 參數校驗 Assert.notNull(sendMessage, "sendMessage 消息體不能爲NULL"); // TODO 處理消息 // 確認消息已經消費成功 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { logger.error("MQ消息處理異常,消息體:{}", message.getMessageProperties().getCorrelationIdString(), JSON.toJSONString(sendMessage), e); try { // TODO 保存消息到數據庫 // 確認消息已經消費成功 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception dbe) { logger.error("保存異常MQ消息到數據庫異常,放到死性隊列,消息體:{}", JSON.toJSONString(sendMessage), dbe); // 確認消息將消息放到死信隊列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } } } }
消費端直接監聽死信隊列,達到延遲消費消息的效果spring-boot
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releasesui
spring-boot-student-rabbitmq 工程code