背景:
在一些應用場景中,程序並不須要同步執行,例如用戶註冊以後的郵件或者短信通知提醒。這種場景的實現則是在當前線程,開啓一個新線 程,當前線程在開啓新線程以後會繼續往下執行,無需等待新線程執行完成。 但例如一些須要延時的場景則不僅是開啓新線程執行如此簡單了。譬如提交訂單後在15分鐘內沒有完成支付,訂單須要關閉,這種情 況,是否只開啓一個異步線程就不適用了呢。
那麼就單單實如今提交訂單後的15分鐘內,若是沒有完成支付,系統關閉訂單。有哪些可行的方案呢。html
方案:
使用定時任務輪詢訂單表,查詢出訂單建立了15分鐘以上而且未支付的訂單,若是有查詢出此類訂單則執行關閉。java
缺點:假設每1分鐘輪詢一次,則會存在秒級偏差,若是秒級輪詢,則會極其消耗性能,影響程序的健壯性。
提交訂單時開啓一個新線程,而新線程直接休眠15分鐘,休眠結束後開始執行訂單關閉web
缺點:若是在線程休眠時,重啓了整個服務,那麼會怎樣呢?
使用延時消息隊列spring
缺點:須要額外部署消息中間件
綜上考慮:使用延時消息隊列則成爲最佳選擇,消息延時發佈以後,保存在消息中間件中,在15分鐘後纔會正式發佈至隊列,延時隊列監聽器在15分鐘後監聽到消息時,纔開始執行,而這期間,即便項目重啓也沒有關係。json
注意:這裏不採用網上流傳的死信隊列轉發,而是採用rabbitmq3.7+版本的延時隊列插件,因此務必安裝3.7+版本並啓用延時隊列插件。
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.4.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: admin password: admin listener: direct: acknowledge-mode: MANUAL simple: acknowledge-mode: MANUAL
package cn.rongyuan.config; import java.util.HashMap; import java.util.Map; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @title rabbitmq配置類 * @author zengzp * @time 2018年8月20日 上午10:46:43 * @Description */ @Configuration public class RabbitConfig { // 支付超時延時交換機 public static final String Delay_Exchange_Name = "delay.exchange"; // 超時訂單關閉隊列 public static final String Timeout_Trade_Queue_Name = "close_trade"; @Bean public Queue delayPayQueue() { return new Queue(RabbitConfig.Timeout_Trade_Queue_Name, true); } // 定義廣播模式的延時交換機 無需綁定路由 @Bean FanoutExchange delayExchange(){ Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); FanoutExchange topicExchange = new FanoutExchange(RabbitConfig.Delay_Exchange_Name, true, false, args); topicExchange.setDelayed(true); return topicExchange; } // 綁定延時隊列與交換機 @Bean public Binding delayPayBind() { return BindingBuilder.bind(delayPayQueue()).to(delayExchange()); } // 定義消息轉換器 @Bean Jackson2JsonMessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } // 定義消息模板用於發佈消息,而且設置其消息轉換器 @Bean RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jsonMessageConverter()); return rabbitTemplate; } @Bean RabbitAdmin rabbitAdmin(final ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } }
@Autowired RabbitTemplate rabbitTemplate; // 經過廣播模式發佈延時消息 延時30分鐘 持久化消息 消費後銷燬 這裏無需指定路由,會廣播至每一個綁定此交換機的隊列 rabbitTemplate.convertAndSend(RabbitConfig.Delay_Exchange_Name, "", trade.getTradeCode(), message ->{ message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); message.getMessageProperties().setDelay(30 * (60*1000)); // 毫秒爲單位,指定此消息的延時時長 return message; });
package cn.rongyuan.mq.consumer; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; import cn.rongyuan.config.RabbitConfig; import cn.rongyuan.service.TradeService; import cn.rongyuan.util.ExceptionUtil; /** * @title 消息消費端 * @author zengzp * @time 2018年8月20日 上午11:00:26 * @Description */ @Component public class PayTimeOutConsumer { @Autowired TradeService tradeService; private Logger logger = LoggerFactory.getLogger(getClass()); @RabbitListener(queues = RabbitConfig.Timeout_Trade_Queue_Name) public void process(String tradeCode, Message message, Channel channel) throws IOException{ try { logger.info("開始執行訂單[{}]的支付超時訂單關閉......", tradeCode); tradeService.cancelTrade(tradeCode); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); logger.info("超時訂單處理完畢"); } catch (Exception e) { logger.error("超時訂單處理失敗:{}", ExceptionUtil.getMessage(e)); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } } }
參考資料:
一、spring amqp 官方文檔:https://docs.spring.io/spring-amqp/docs/2.0.0.M2/reference/htmlsingle/#delayed-message-exchange 二、rabbitmq 官方文檔:http://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/