用戶下單後,若是30min未支付,則刪除該訂單,這時候就要能夠用延遲隊列html
利用rabbitmq_delayed_message_exchange插件;spring
首先下載該插件:https://www.rabbitmq.com/community-plugins.htmlapp
而後把該插件放到rabbitmq安裝目錄plugins下;spring-boot
進入到sbin目錄下,執行"rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange";測試
關閉RabbitMQ服務,而後再啓動(直接重啓該插件可能會不生效)。ui
application.properties配置文件spa
spring.application.name=spring-boot-rabbitmq spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=nut spring.rabbitmq.password=nut
配置類.net
注意這裏的"x-delayed-type"和"x-delayed-message"插件
/** * 延遲隊列配置exchange */ @Configuration public class DelayQueueConfig { public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE"; public static final String DELAY_QUEUE = "DELAY_QUEUE"; public static final String DELAY_ROUTING_KEY = "DELAY_ROUTING_KEY"; @Bean("delayExchange") public Exchange delayExchange() { Map<String, Object> args = new HashMap<>(1); // x-delayed-type 聲明 延遲隊列Exchange的類型 args.put("x-delayed-type", "direct"); return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message",true, false,args); } @Bean("delayQueue") public Queue delayQueue() { return QueueBuilder.durable(DELAY_QUEUE).build(); } /** * 將延遲隊列經過routingKey綁定到延遲交換器 * * @return */ @Bean public Binding delayQueueBindExchange() { return new Binding(DELAY_QUEUE, Binding.DestinationType.QUEUE, DELAY_EXCHANGE, DELAY_ROUTING_KEY, null); } }
生產者code
發送消息時,指定延遲的毫秒
/** * 延遲隊列發送者 */ @Component @Slf4j public class DelayQueueSender { @Autowired private RabbitTemplate rabbitTemplate; public void sendDelayQueue(int number) { log.warn("延遲隊列發送 : {} milliseconds", number); // 這裏的Exchange能夠是業務的Exchange,爲了方便測試這裏直接往死信Exchange裏投遞消息 rabbitTemplate.convertAndSend( DelayQueueConfig.DELAY_EXCHANGE, DelayQueueConfig.DELAY_ROUTING_KEY, number, (message) -> { // 設置延遲的毫秒數 message.getMessageProperties().setDelay(number); log.info("Now : {}", ZonedDateTime.now()); return message; }); } }
消費者
/** * 延遲隊列消費者 */ @Component @Slf4j @RabbitListener(queues = DelayQueueConfig.DELAY_QUEUE) public class DelayQueueConsumer { @RabbitHandler public void receiveDelayMessage(Integer milliseconds){ log.warn("DelayQueueConsumer Time : {}, and the millis : {}", ZonedDateTime.now(), milliseconds); } }
先啓動項目;
而後在測試類中發送消息;
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitmqApplicationTests { @Autowired private DelayQueueSender delayQueueSender; @Test public void testDelayQueueSender(){ delayQueueSender.sendDelayQueue(5000); } }
發送消息窗口:
消費者受到消息:
時間間隔證實延遲隊列發送完成!
參考:
http://www.javashuo.com/article/p-bchvtvaj-mv.html
https://blog.csdn.net/youjin/article/details/82586888
https://docs.spring.io/spring-amqp/docs/2.0.0.M2/reference/htmlsingle/#delayed-message-exchange
https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/