SpringBoot RabbitMQ 延遲隊列代碼實現

場景

用戶下單後,若是30min未支付,則刪除該訂單,這時候就要能夠用延遲隊列html

 

準備

利用rabbitmq_delayed_message_exchange插件;spring

首先下載該插件:https://www.rabbitmq.com/community-plugins.htmlapp

而後把該插件放到rabbitmq安裝目錄plugins下;spring-boot

進入到sbin目錄下,執行"rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange";測試

 

關閉RabbitMQ服務,而後再啓動(直接重啓該插件可能會不生效)。ui

 

SpringBoot RabbitMQ代碼

application.properties配置文件spa

spring.application.name=spring-boot-rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=nut
spring.rabbitmq.password=nut

 

配置類.net

注意這裏的"x-delayed-type"和"x-delayed-message"插件

/**
 * 延遲隊列配置exchange
 */
@Configuration
public class DelayQueueConfig {

    public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";
    public static final String DELAY_QUEUE = "DELAY_QUEUE";
    public static final String DELAY_ROUTING_KEY = "DELAY_ROUTING_KEY";

    @Bean("delayExchange")
    public Exchange delayExchange() {
        Map<String, Object> args = new HashMap<>(1);
//       x-delayed-type    聲明 延遲隊列Exchange的類型
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message",true, false,args);
    }

    @Bean("delayQueue")
    public Queue delayQueue() {
        return QueueBuilder.durable(DELAY_QUEUE).build();
    }

    /**
     * 將延遲隊列經過routingKey綁定到延遲交換器
     *
     * @return
     */
    @Bean
    public Binding delayQueueBindExchange() {
        return new Binding(DELAY_QUEUE, Binding.DestinationType.QUEUE, DELAY_EXCHANGE, DELAY_ROUTING_KEY, null);
    }

}

 

生產者code

發送消息時,指定延遲的毫秒

/**
 * 延遲隊列發送者
 */
@Component
@Slf4j
public class DelayQueueSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendDelayQueue(int number) {
        log.warn("延遲隊列發送 : {} milliseconds", number);
        // 這裏的Exchange能夠是業務的Exchange,爲了方便測試這裏直接往死信Exchange裏投遞消息
        rabbitTemplate.convertAndSend(
                DelayQueueConfig.DELAY_EXCHANGE,
                DelayQueueConfig.DELAY_ROUTING_KEY,
                number, (message) -> {
                    // 設置延遲的毫秒數
                    message.getMessageProperties().setDelay(number);
                    log.info("Now : {}", ZonedDateTime.now());
                    return message; });
    }
}

 

消費者

/**
 * 延遲隊列消費者
 */
@Component
@Slf4j
@RabbitListener(queues = DelayQueueConfig.DELAY_QUEUE)
public class DelayQueueConsumer {

    @RabbitHandler
    public void receiveDelayMessage(Integer milliseconds){
        log.warn("DelayQueueConsumer Time : {}, and the millis : {}", ZonedDateTime.now(), milliseconds);

    }

}

 

測試

先啓動項目;

而後在測試類中發送消息;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests {

    @Autowired
    private DelayQueueSender delayQueueSender;


    @Test
    public void testDelayQueueSender(){
        delayQueueSender.sendDelayQueue(5000);
    }
}

 

發送消息窗口:

 

消費者受到消息:

 

時間間隔證實延遲隊列發送完成!

 

參考:

http://www.javashuo.com/article/p-bchvtvaj-mv.html

https://blog.csdn.net/youjin/article/details/82586888

https://docs.spring.io/spring-amqp/docs/2.0.0.M2/reference/htmlsingle/#delayed-message-exchange

https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/

相關文章
相關標籤/搜索