Rabbitmq 延時消息

原理網上一大堆,比我說的清楚,你們能夠去看看,這裏就不說了。html

安裝配置

注意 plugins 的處理,由於須要安裝一個 rabbitmq_delayed_message_exchange 插件,在 3.7.13 版本中沒法使用 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 進行安裝,會提示找不到,因此只能本身上 https://www.rabbitmq.com/plugins.html 去下載。java

version: '3.1'
services:
  rabbitmq:
    restart: always
    image: rabbitmq:management
    container_name: rabbitmq
    ports:
      - 5672:5672
      - 15672:15672
    environment:
      TZ: Asia/Shanghai
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: admin
    volumes:
      - ./data:/var/lib/rabbitmq
      - ./data:/etc/rabbitmq
      - ./plugins:/plugins

在處理plugins以後,注意配置一下 enabled_pluginsui

[rabbitmq_management,rabbitmq_delayed_message_exchange].

使用

配置delayExchange,隊列,而且綁定。插件

@Configuration
public class RabbitConfig {

    @Bean
    public FanoutExchange delayExchange() {
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        FanoutExchange topicExchange = new FanoutExchange("delay_exchange", true, false, args);
        topicExchange.setDelayed(true);
        return topicExchange;
    }

    @Bean
    public Queue queue() {
        Queue queue = new Queue("delay_queue");
        return queue;
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(delayExchange());
    }
}

發送消息rest

rabbitTemplate.convertAndSend("delay_exchange", queue, msg, message -> {
    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    // 這裏的時間用來設置延時多久
    message.getMessageProperties().setHeader("x-delay", 30000);
    return message;
});

接收消息code

@Component
public class MessageReceiver {
    @RabbitListener(queues = "delay_queue")
    public void receive(String msg) {
        System.out.println("接收到的消息:"+msg);
    }
}
相關文章
相關標籤/搜索