SpringBoot+RabbitMq實現延時消息隊列

背景:
在一些應用場景中,程序並不須要同步執行,例如用戶註冊以後的郵件或者短信通知提醒。這種場景的實現則是在當前線程,開啓一個新線
 程,當前線程在開啓新線程以後會繼續往下執行,無需等待新線程執行完成。
   但例如一些須要延時的場景則不僅是開啓新線程執行如此簡單了。譬如提交訂單後在15分鐘內沒有完成支付,訂單須要關閉,這種情
 況,是否只開啓一個異步線程就不適用了呢。

那麼就單單實如今提交訂單後的15分鐘內,若是沒有完成支付,系統關閉訂單。有哪些可行的方案呢。html


方案:
  1. 使用定時任務輪詢訂單表,查詢出訂單建立了15分鐘以上而且未支付的訂單,若是有查詢出此類訂單則執行關閉。java

    缺點:假設每1分鐘輪詢一次,則會存在秒級偏差,若是秒級輪詢,則會極其消耗性能,影響程序的健壯性。
  2. 提交訂單時開啓一個新線程,而新線程直接休眠15分鐘,休眠結束後開始執行訂單關閉web

    缺點:若是在線程休眠時,重啓了整個服務,那麼會怎樣呢?
  3. 使用延時消息隊列spring

    缺點:須要額外部署消息中間件

綜上考慮:使用延時消息隊列則成爲最佳選擇,消息延時發佈以後,保存在消息中間件中,在15分鐘後纔會正式發佈至隊列,延時隊列監聽器在15分鐘後監聽到消息時,纔開始執行,而這期間,即便項目重啓也沒有關係。json


以springboot爲基礎框架,集成rabbitmq實現延時隊列

注意:這裏不採用網上流傳的死信隊列轉發,而是採用rabbitmq3.7+版本的延時隊列插件,因此務必安裝3.7+版本並啓用延時隊列插件。

增長amqp依賴

<parent>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-parent</artifactId>
             <version>1.5.4.RELEASE</version>
     </parent>
     <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

修改application.yml文件,配置rabbitmq,而且開啓消息的手動應答

spring:
    rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: admin
        password: admin
        listener:
            direct:
                acknowledge-mode: MANUAL
            simple:
                acknowledge-mode: MANUAL

配置隊列,路由,交換機

package cn.rongyuan.config;

import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;



/**
 * @title rabbitmq配置類
 * @author zengzp
 * @time 2018年8月20日 上午10:46:43
 * @Description 
 */
@Configuration
public class RabbitConfig {
    
    // 支付超時延時交換機
    public static final String Delay_Exchange_Name = "delay.exchange";

    // 超時訂單關閉隊列
    public static final String Timeout_Trade_Queue_Name = "close_trade";

    
    @Bean
    public Queue delayPayQueue() {
        return new Queue(RabbitConfig.Timeout_Trade_Queue_Name, true);
    }
    
    
    // 定義廣播模式的延時交換機 無需綁定路由
    @Bean
    FanoutExchange delayExchange(){
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        FanoutExchange topicExchange = new FanoutExchange(RabbitConfig.Delay_Exchange_Name, true, false, args);
        topicExchange.setDelayed(true);
        return topicExchange;
    }
    
    // 綁定延時隊列與交換機
    @Bean  
    public Binding delayPayBind() {  
        return BindingBuilder.bind(delayPayQueue()).to(delayExchange());  
    }
    
    // 定義消息轉換器
    @Bean
    Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    
    // 定義消息模板用於發佈消息,而且設置其消息轉換器
    @Bean
    RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }
    @Bean
    RabbitAdmin rabbitAdmin(final ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

}

在提交訂單時發佈消息至延時隊列而且指定延時時長

@Autowired
    RabbitTemplate rabbitTemplate;
    // 經過廣播模式發佈延時消息 延時30分鐘 持久化消息 消費後銷燬 這裏無需指定路由,會廣播至每一個綁定此交換機的隊列
    rabbitTemplate.convertAndSend(RabbitConfig.Delay_Exchange_Name, "", trade.getTradeCode(), message ->{
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        message.getMessageProperties().setDelay(30 * (60*1000));   // 毫秒爲單位,指定此消息的延時時長
        return message;
    });

消費端監聽延時隊列

package cn.rongyuan.mq.consumer;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

import cn.rongyuan.config.RabbitConfig;
import cn.rongyuan.service.TradeService;
import cn.rongyuan.util.ExceptionUtil;


/**
 * @title 消息消費端
 * @author zengzp
 * @time 2018年8月20日 上午11:00:26
 * @Description 
 */
@Component
public class PayTimeOutConsumer {
    
    @Autowired
    TradeService tradeService;
    
    private Logger logger = LoggerFactory.getLogger(getClass());
    
    @RabbitListener(queues = RabbitConfig.Timeout_Trade_Queue_Name)
    public void process(String tradeCode, Message message, Channel channel) throws IOException{
        try {
            logger.info("開始執行訂單[{}]的支付超時訂單關閉......", tradeCode);
            tradeService.cancelTrade(tradeCode);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            logger.info("超時訂單處理完畢");
        } catch (Exception e) {
            logger.error("超時訂單處理失敗:{}", ExceptionUtil.getMessage(e));
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
        } 
    }

}

參考資料:
一、spring amqp 官方文檔:https://docs.spring.io/spring-amqp/docs/2.0.0.M2/reference/htmlsingle/#delayed-message-exchange
  二、rabbitmq 官方文檔:http://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/
相關文章
相關標籤/搜索