RabbitMQ 延遲隊列,消息延遲推送

應用場景

image

目前常見的應用軟件都有消息的延遲推送的影子,應用也極爲普遍,例如:java

  • 淘寶七天自動確認收貨。在咱們簽收商品後,物流系統會在七天後延時發送一個消息給支付系統,通知支付系統將款打給商家,這個過程持續七天,就是使用了消息中間件的延遲推送功能。
  • 12306 購票支付確認頁面。咱們在選好票點擊肯定跳轉的頁面中每每都會有倒計時,表明着 30 分鐘內訂單不確認的話將會自動取消訂單。其實在下訂單那一刻開始購票業務系統就會發送一個延時消息給訂單系統,延時30分鐘,告訴訂單系統訂單未完成,若是咱們在30分鐘內完成了訂單,則能夠經過邏輯代碼判斷來忽略掉收到的消息。

在上面兩種場景中,若是咱們使用下面兩種傳統解決方案無疑大大下降了系統的總體性能和吞吐量:面試

  • 使用 redis 給訂單設置過時時間,最後經過判斷 redis 中是否還有該訂單來決定訂單是否已經完成。這種解決方案相較於消息的延遲推送性能較低,由於咱們知道 redis 都是存儲於內存中,咱們遇到惡意下單或者刷單的將會給內存帶來巨大壓力。
  • 使用傳統的數據庫輪詢來判斷數據庫表中訂單的狀態,這無疑增長了IO次數,性能極低。
  • 使用 jvm 原生的 DelayQueue ,也是大量佔用內存,並且沒有持久化策略,系統宕機或者重啓都會丟失訂單信息。

消息延遲推送的實現

image

首先咱們建立交換機和消息隊列redis

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MQConfig {

 public static final String LAZY_EXCHANGE = "Ex.LazyExchange";
 public static final String LAZY_QUEUE = "MQ.LazyQueue";
 public static final String LAZY_KEY = "lazy.#";

 @Bean
 public TopicExchange lazyExchange(){
 //Map<String, Object> pros = new HashMap<>();
 //設置交換機支持延遲消息推送
 //pros.put("x-delayed-message", "topic");
 TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);
 exchange.setDelayed(true);
 return exchange;
 }

 @Bean
 public Queue lazyQueue(){
 return new Queue(LAZY_QUEUE, true);
 }

 @Bean
 public Binding lazyBinding(){
 return BindingBuilder.bind(lazyQueue()).to(lazyExchange()).with(LAZY_KEY);
 }
}

複製代碼

咱們在 Exchange 的聲明中能夠設置exchange.setDelayed(true)來開啓延遲隊列,也能夠設置爲如下內容傳入交換機聲明的方法中,由於第一種方式的底層就是經過這種方式來實現的。spring

//Map<String, Object> pros = new HashMap<>();
 //設置交換機支持延遲消息推送
 //pros.put("x-delayed-message", "topic");
 TopicExchange exchange = new TopicExchange(LAZY_EXCHANGE, true, false, pros);

複製代碼

發送消息時咱們須要指定延遲推送的時間,咱們這裏在發送消息的方法中傳入參數 new MessagePostProcessor() 是爲了得到 Message對象,由於須要藉助 Message對象的api 來設置延遲時間。數據庫

import com.anqi.mq.config.MQConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class MQSender {

 @Autowired
 private RabbitTemplate rabbitTemplate;

 //confirmCallback returnCallback 代碼省略,請參照上一篇

 public void sendLazy(Object message){
 rabbitTemplate.setMandatory(true);
 rabbitTemplate.setConfirmCallback(confirmCallback);
 rabbitTemplate.setReturnCallback(returnCallback);
 //id + 時間戳 全局惟一
 CorrelationData correlationData = new CorrelationData("12345678909"+new Date());

 //發送消息時指定 header 延遲時間
 rabbitTemplate.convertAndSend(MQConfig.LAZY_EXCHANGE, "lazy.boot", message,
 new MessagePostProcessor() {
 @Override
 public Message postProcessMessage(Message message) throws AmqpException {
 //設置消息持久化
 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
 //message.getMessageProperties().setHeader("x-delay", "6000");
 message.getMessageProperties().setDelay(6000);
 return message;
 }
 }, correlationData);
 }
}

複製代碼

咱們能夠觀察 setDelay(Integer i)底層代碼,也是在 header 中設置 x-delay。等同於咱們手動設置 headerapi

message.getMessageProperties().setHeader("x-delay", "6000");bash

/**
 * Set the x-delay header.
 * @param delay the delay.
 * @since 1.6
 */
public void setDelay(Integer delay) {
 if (delay == null || delay < 0) {
 this.headers.remove(X_DELAY);
 }
 else {
 this.headers.put(X_DELAY, delay);
 }
}

複製代碼

消費端進行消費jvm

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;

@Component
public class MQReceiver {

 @RabbitListener(queues = "MQ.LazyQueue")
 @RabbitHandler
 public void onLazyMessage(Message msg, Channel channel) throws IOException{
 long deliveryTag = msg.getMessageProperties().getDeliveryTag();
 channel.basicAck(deliveryTag, true);
 System.out.println("lazy receive " + new String(msg.getBody()));

 }

複製代碼

測試結果ide

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest
@RunWith(SpringRunner.class)
public class MQSenderTest {

 @Autowired
 private MQSender mqSender;

 @Test
 public void sendLazy() throws Exception {
 String msg = "hello spring boot";

 mqSender.sendLazy(msg + ":");
 }
}

複製代碼

果真在 6 秒後收到了消息 lazy receive hello spring boot:post

Java學習、面試;文檔、視頻資源免費獲取

相關文章
相關標籤/搜索