原理網上一大堆,比我說的清楚,你們能夠去看看,這裏就不說了。html
注意 plugins 的處理,由於須要安裝一個 rabbitmq_delayed_message_exchange
插件,在 3.7.13
版本中沒法使用 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
進行安裝,會提示找不到,因此只能本身上 https://www.rabbitmq.com/plugins.html
去下載。java
version: '3.1' services: rabbitmq: restart: always image: rabbitmq:management container_name: rabbitmq ports: - 5672:5672 - 15672:15672 environment: TZ: Asia/Shanghai RABBITMQ_DEFAULT_USER: admin RABBITMQ_DEFAULT_PASS: admin volumes: - ./data:/var/lib/rabbitmq - ./data:/etc/rabbitmq - ./plugins:/plugins
在處理plugins以後,注意配置一下 enabled_plugins
ui
[rabbitmq_management,rabbitmq_delayed_message_exchange].
配置delayExchange,隊列,而且綁定。插件
@Configuration public class RabbitConfig { @Bean public FanoutExchange delayExchange() { HashMap<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); FanoutExchange topicExchange = new FanoutExchange("delay_exchange", true, false, args); topicExchange.setDelayed(true); return topicExchange; } @Bean public Queue queue() { Queue queue = new Queue("delay_queue"); return queue; } @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(delayExchange()); } }
發送消息rest
rabbitTemplate.convertAndSend("delay_exchange", queue, msg, message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 這裏的時間用來設置延時多久 message.getMessageProperties().setHeader("x-delay", 30000); return message; });
接收消息code
@Component public class MessageReceiver { @RabbitListener(queues = "delay_queue") public void receive(String msg) { System.out.println("接收到的消息:"+msg); } }