目前常見的應用軟件都有消息的延遲推送的影子,應用也極爲普遍,例如:java
在上面兩種場景中,若是咱們使用下面兩種傳統解決方案無疑大大下降了系統的總體性能和吞吐量:面試
首先咱們建立交換機和消息隊列redis
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MQConfig {
public static final String LAZY_EXCHANGE = "Ex.LazyExchange";
public static final String LAZY_QUEUE = "MQ.LazyQueue";
public static final String LAZY_KEY = "lazy.#";
@Bean
public TopicExchange lazyExchange(){
//Map<String, Object> pros = new HashMap<>();
//設置交換機支持延遲消息推送
//pros.put("x-delayed-message", "topic");
TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Queue lazyQueue(){
return new Queue(LAZY_QUEUE, true);
}
@Bean
public Binding lazyBinding(){
return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
}
}
複製代碼
咱們在 Exchange 的聲明中能夠設置exchange.setDelayed(true)來開啓延遲隊列,也能夠設置爲如下內容傳入交換機聲明的方法中,由於第一種方式的底層就是經過這種方式來實現的。spring
//Map<String, Object> pros = new HashMap<>();
//設置交換機支持延遲消息推送
//pros.put("x-delayed-message", "topic");
TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
複製代碼
發送消息時咱們須要指定延遲推送的時間,咱們這裏在發送消息的方法中傳入參數 new MessagePostProcessor() 是爲了得到 Message對象,由於須要藉助 Message對象的api 來設置延遲時間。數據庫
import com.anqi.mq.config.MQConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class MQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
//confirmCallback returnCallback 代碼省略,請參照上一篇
public void sendLazy(Object message){
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id + 時間戳 全局惟一
CorrelationData correlationData = new CorrelationData("12345678909"+new Date());
//發送消息時指定 header 延遲時間
rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//設置消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//message.getMessageProperties().setHeader("x-delay", "6000");
message.getMessageProperties().setDelay(6000);
return message;
}
}, correlationData);
}
}
複製代碼
咱們能夠觀察 setDelay(Integer i)底層代碼,也是在 header 中設置 x-delay。等同於咱們手動設置 headerapi
message.getMessageProperties().setHeader("x-delay", "6000");bash
/**
* Set the x-delay header.
* @param delay the delay.
* @since 1.6
*/
public void setDelay(Integer delay) {
if (delay == null || delay < 0) {
this.headers.remove(X_DELAY);
}
else {
this.headers.put(X_DELAY, delay);
}
}
複製代碼
消費端進行消費jvm
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
public class MQReceiver {
@RabbitListener(queues = "MQ.LazyQueue")
@RabbitHandler
public void onLazyMessage(Message msg, Channel channel) throws IOException{
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, true);
System.out.println("lazy receive " + new String(msg.getBody()));
}
複製代碼
測試結果ide
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@SpringBootTest
@RunWith(SpringRunner.class)
public class MQSenderTest {
@Autowired
private MQSender mqSender;
@Test
public void sendLazy() throws Exception {
String msg = "hello spring boot";
mqSender.sendLazy(msg + ":");
}
}
複製代碼
果真在 6 秒後收到了消息 lazy receive hello spring boot:post