一、須要用到插件 rabbitmq_delayed_message_exchange 來實現,插件下載地址:https://www.rabbitmq.com/community-plugins.htmlhtml
二、下載後把插件放到 plugins 裏面,而後到 sbin裏面打開cmd,執行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 命令java
三、插件裝好後,從新啓動mq,而後集成mq。spring
首先,導包app
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
而後,配置文件配置鏈接信息:ide
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.listener.simple.acknowledge-mode=manual
mq 配置:spring-boot
package com.rrg.gz.mq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * mq配置 * @author huangsz 2019/4/25 0025 */ @Configuration public class RabbitPluginConfig { /** * 延時隊列交換機 * 注意這裏的交換機類型:CustomExchange * @return */ @Bean public CustomExchange delayExchange(){ Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange("rrg_delay_exchange","x-delayed-message",true, false,args); } /** * 延時隊列 * @return */ @Bean public Queue delayQueue(){ return new Queue("rrg_delay_queue",true); } /** * 給延時隊列綁定交換機 * @return */ @Bean public Binding cfgDelayBinding(Queue cfgDelayQueue, CustomExchange cfgUserDelayExchange){ return BindingBuilder.bind(cfgDelayQueue).to(cfgUserDelayExchange).with("rrg_delay_key").noargs(); } }
發送消息類、接收類和信息類,信息類是咱們本身時間業務封裝須要消費的信息。測試
package com.rrg.gz.mq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 消息發送者 * * @author huangsz 2019/3/7 0007 */ @Component public class Sender { private static Logger log = LoggerFactory.getLogger(Sender.class); @Autowired private RabbitTemplate rabbitTemplate; public void sendDelayMessage(MqEntity entity, long time) { // 這裏的消息能夠是任意對象,無需額外配置,直接傳便可 log.info("延時隊列生產消息"); this.rabbitTemplate.convertAndSend( "rrg_delay_exchange", "rrg_delay_key", entity, message -> { // 注意這裏時間能夠使long,並且是設置header message.getMessageProperties().setHeader("x-delay",time); return message; } ); log.info("{}ms後執行", time); } }
package com.rrg.gz.mq; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 接受者 * * @author huangsz 2019/3/7 0007 */ @Component public class Receiver { private static Logger log = LoggerFactory.getLogger(Receiver.class); @Autowired private Sender sender; @RabbitListener(queues = "rrg_delay_queue") public void cfgUserReceiveDealy(MqEntity entity, Message message, Channel channel) throws Exception{ log.info("開始接受消息!"); // 通知 MQ 消息已被接收,能夠ACK(從隊列中刪除)了 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("接收消息並打印"); System.out.println(entity); } }
package com.rrg.gz.mq; import java.io.Serializable; /** * 必定要實現 Serializable * @author huangsz 2019/3/7 0007 */ public class MqEntity implements Serializable { private Integer userId; private String msg; public MqEntity() { } public MqEntity(Integer userId, String msg) { this.userId = userId; this.msg = msg; } public Integer getUserId() { return userId; } public void setUserId(Integer userId) { this.userId = userId; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } @Override public String toString() { return "MqEntity{" + "userId=" + userId + ", msg='" + msg + '\'' + '}'; } }
四、寫一個controller測試:ui
@RequestMapping("/test1") public void test(){ MqEntity mqEntity = new MqEntity(1,"30秒後消費"); sender.sendDelayMessage(mqEntity,30000); } @RequestMapping("/test2") public void test2(){ MqEntity mqEntity = new MqEntity(1,"10秒後消費"); sender.sendDelayMessage(mqEntity,10000); }
先執行test1,而後執行test2,這個時候,不須要等test1消費完以後,test2才消費信息。this