目前常見的應用軟件都有消息的延遲推送的影子,應用也極爲普遍,例如:java
在上面兩種場景中,若是咱們使用下面兩種傳統解決方案無疑大大下降了系統的總體性能和吞吐量:redis
在 RabbitMQ 3.6.x 以前咱們通常採用死信隊列+TTL過時時間來實現延遲隊列,咱們這裏不作過多介紹,能夠參考以前文章來了解:TTL、死信隊列spring
在 RabbitMQ 3.6.x 開始,RabbitMQ 官方提供了延遲隊列的插件,能夠下載放置到 RabbitMQ 根目錄下的 plugins 下。延遲隊列插件下載數據庫
首先咱們建立交換機和消息隊列,application.properties 中配置與上一篇文章相同。api
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class MQConfig { public static final String LAZY_EXCHANGE = "Ex.LazyExchange"; public static final String LAZY_QUEUE = "MQ.LazyQueue"; public static final String LAZY_KEY = "lazy.#"; @Bean public TopicExchange lazyExchange(){ //Map<String, Object> pros = new HashMap<>(); //設置交換機支持延遲消息推送 //pros.put("x-delayed-message", "topic"); TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros); exchange.setDelayed(true); return exchange; } @Bean public Queue lazyQueue(){ return new Queue(LAZY_QUEUE, true); } @Bean public Binding lazyBinding(){ return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY); } }
咱們在 Exchange 的聲明中能夠設置exchange.setDelayed(true)
來開啓延遲隊列,也能夠設置爲如下內容傳入交換機聲明的方法中,由於第一種方式的底層就是經過這種方式來實現的。app
//Map<String, Object> pros = new HashMap<>(); //設置交換機支持延遲消息推送 //pros.put("x-delayed-message", "topic"); TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
發送消息時咱們須要指定延遲推送的時間,咱們這裏在發送消息的方法中傳入參數 new MessagePostProcessor()
是爲了得到 Message
對象,由於須要藉助 Message
對象的api 來設置延遲時間。jvm
import com.anqi.mq.config.MQConfig; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; @Component public class MQSender { @Autowired private RabbitTemplate rabbitTemplate; //confirmCallback returnCallback 代碼省略,請參照上一篇 public void sendLazy(Object message){ rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 時間戳 全局惟一 CorrelationData correlationData = new CorrelationData("12345678909"+new Date()); //發送消息時指定 header 延遲時間 rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //設置消息持久化 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); //message.getMessageProperties().setHeader("x-delay", "6000"); message.getMessageProperties().setDelay(6000); return message; } }, correlationData); } }
咱們能夠觀察 setDelay(Integer i)
底層代碼,也是在 header 中設置 x-delay。等同於咱們手動設置 headeride
message.getMessageProperties().setHeader("x-delay", "6000");
post
/** * Set the x-delay header. * @param delay the delay. * @since 1.6 */ public void setDelay(Integer delay) { if (delay == null || delay < 0) { this.headers.remove(X_DELAY); } else { this.headers.put(X_DELAY, delay); } }
消費端進行消費
import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Map; @Component public class MQReceiver { @RabbitListener(queues = "MQ.LazyQueue") @RabbitHandler public void onLazyMessage(Message msg, Channel channel) throws IOException{ long deliveryTag = msg.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag, true); System.out.println("lazy receive " + new String(msg.getBody())); }
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @SpringBootTest @RunWith(SpringRunner.class) public class MQSenderTest { @Autowired private MQSender mqSender; @Test public void sendLazy() throws Exception { String msg = "hello spring boot"; mqSender.sendLazy(msg + ":"); } }
果真在 6 秒後收到了消息 lazy receive hello spring boot:
轉載請註明出處,謝謝。https://www.cnblogs.com/haixiang/p/10966985.html