在電商、支付等領域,每每會有這樣的場景,用戶下單後放棄支付了,那這筆訂單會在指定的時間段後進行關閉操做,細心的你必定發現了像某寶、某東都有這樣的邏輯,並且時間很準確,偏差在1s內;那他們是怎麼實現的呢?redis
通常的作法有以下幾種算法
定時任務關閉訂單spring
rocketmq延遲隊列windows
rabbitmq死信隊列數組
時間輪算法服務器
redis過時監聽網絡
通常狀況下,最不推薦的方式就是關單方式就是定時任務方式,緣由咱們能夠看下面的圖來講明mybatis
咱們假設,關單時間爲下單後10分鐘,定時任務間隔也是10分鐘;經過上圖咱們看出,若是在第1分鐘下單,在第20分鐘的時候才能被掃描到執行關單操做,這樣偏差達到10分鐘,這在不少場景下是不可接受的,另外須要頻繁掃描主訂單號形成網絡IO和磁盤IO的消耗,對實時交易形成必定的衝擊,因此PASSapp
延遲消息 生產者把消息發送到消息服務器後,並不但願被當即消費,而是等待指定時間後才能夠被消費者消費,這類消息一般被稱爲延遲消息。 在RocketMQ開源版本中,支持延遲消息,可是不支持任意時間精度的延遲消息,只支持特定級別的延遲消息。 消息延遲級別分別爲1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18個級別。dom
發送延遲消息(生產者)
/** * 推送延遲消息 * @param topic * @param body * @param producerGroup * @return boolean */ public boolean sendMessage(String topic, String body, String producerGroup) { try { Message recordMsg = new Message(topic, body.getBytes()); producer.setProducerGroup(producerGroup); //設置消息延遲級別,我這裏設置14,對應就是延時10分鐘 // "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" recordMsg.setDelayTimeLevel(14); // 發送消息到一個Broker SendResult sendResult = producer.send(recordMsg); // 經過sendResult返回消息是否成功送達 log.info("發送延遲消息結果:======sendResult:{}", sendResult); DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); log.info("發送時間:{}", format.format(new Date())); return true; } catch (Exception e) { e.printStackTrace(); log.error("延遲消息隊列推送消息異常:{},推送內容:{}", e.getMessage(), body); } return false; }
消費延遲消息(消費者)
/** * 接收延遲消息 * * @param topic * @param consumerGroup * @param messageHandler */ public void messageListener(String topic, String consumerGroup, MessageListenerConcurrently messageHandler) { ThreadPoolUtil.execute(() -> { try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup(consumerGroup); consumer.setVipChannelEnabled(false); consumer.setNamesrvAddr(address); //設置消費者拉取消息的策略,*表示消費該topic下的全部消息,也能夠指定tag進行消息過濾 consumer.subscribe(topic, "*"); //消費者端啓動消息監聽,一旦生產者發送消息被監聽到,就打印消息,和rabbitmq中的handlerDelivery相似 consumer.registerMessageListener(messageHandler); consumer.start(); log.info("啓動延遲消息隊列監聽成功:" + topic); } catch (MQClientException e) { log.error("啓動延遲消息隊列監聽失敗:{}", e.getErrorMessage()); System.exit(1); } }); }
實現監聽類,處理具體邏輯
/** * 延遲消息監聽 * */ @Component public class CourseOrderTimeoutListener implements ApplicationListener<ApplicationReadyEvent> { @Resource private MQUtil mqUtil; @Resource private CourseOrderTimeoutHandler courseOrderTimeoutHandler; @Override public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) { // 訂單超時監聽 mqUtil.messageListener(EnumTopic.ORDER_TIMEOUT, EnumGroup.ORDER_TIMEOUT_GROUP, courseOrderTimeoutHandler); } }
/** * 實現監聽 */ @Slf4j @Component public class CourseOrderTimeoutHandler implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { // 獲得消息體 String body = new String(msg.getBody()); JSONObject userJson = JSONObject.parseObject(body); TCourseBuy courseBuyDetails = JSON.toJavaObject(userJson, TCourseBuy.class); // 處理具體的業務邏輯,,,,, DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); log.info("消費時間:{}", format.format(new Date())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
這種方式相比定時任務好了不少,可是有一個致命的缺點,就是延遲等級只有18種(商業版本支持自定義時間),若是咱們想把關閉訂單時間設置在15分鐘該如何處理呢?顯然不夠靈活。
Rabbitmq自己是沒有延遲隊列的,只能經過Rabbitmq自己隊列的特性來實現,想要Rabbitmq實現延遲隊列,須要使用Rabbitmq的死信交換機(Exchange)和消息的存活時間TTL(Time To Live)
死信交換機 一個消息在知足以下條件下,會進死信交換機,記住這裏是交換機而不是隊列,一個交換機能夠對應不少隊列。
一個消息被Consumer拒收了,而且reject方法的參數裏requeue是false。也就是說不會被再次放在隊列裏,被其餘消費者使用。 上面的消息的TTL到了,消息過時了。
隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。 死信交換機就是普通的交換機,只是由於咱們把過時的消息扔進去,因此叫死信交換機,並非說死信交換機是某種特定的交換機
消息TTL(消息存活時間) 消息的TTL就是消息的存活時間。RabbitMQ能夠對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連着的保留時間,也能夠對每個單獨的消息作單獨的設置。超過了這個時間,咱們認爲這個消息就死了,稱之爲死信。若是隊列設置了,消息也設置了,那麼會取值較小的。因此一個消息若是被路由到不一樣的隊列中,這個消息死亡的時間有可能不同(不一樣的隊列設置)。這裏單講單個消息的TTL,由於它纔是實現延遲任務的關鍵。
byte[] messageBodyBytes = "Hello, world!".getBytes(); AMQP.BasicProperties properties = new AMQP.BasicProperties(); properties.setExpiration("60000"); channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);
能夠經過設置消息的expiration字段或者x-message-ttl屬性來設置時間,二者是同樣的效果。只是expiration字段是字符串參數,因此要寫個int類型的字符串:當上面的消息扔到隊列中後,過了60秒,若是沒有被消費,它就死了。不會被消費者消費到。這個消息後面的,沒有「死掉」的消息對頂上來,被消費者消費。死信在隊列中並不會被刪除和釋放,它會被統計到隊列的消息數中去
處理流程圖
建立交換機(Exchanges)和隊列(Queues)
建立死信交換機
如圖所示,就是建立一個普通的交換機,這裏爲了方便區分,把交換機的名字取爲:delay
建立自動過時消息隊列 這個隊列的主要做用是讓消息定時過時的,好比咱們須要2小時候關閉訂單,咱們就須要把消息放進這個隊列裏面,把消息過時時間設置爲2小時
建立一個一個名爲delay_queue1的自動過時的隊列,固然圖片上面的參數並不會讓消息自動過時,由於咱們並無設置x-message-ttl參數,若是整個隊列的消息有消息都是相同的,能夠設置,這裏爲了靈活,因此並無設置,另外兩個參數x-dead-letter-exchange表明消息過時後,消息要進入的交換機,這裏配置的是delay,也就是死信交換機,x-dead-letter-routing-key是配置消息過時後,進入死信交換機的routing-key,跟發送消息的routing-key一個道理,根據這個key將消息放入不一樣的隊列
建立消息處理隊列 這個隊列纔是真正處理消息的隊列,全部進入這個隊列的消息都會被處理
消息隊列的名字爲delay_queue2 消息隊列綁定到交換機 進入交換機詳情頁面,將建立的2個隊列(delayqueue1和delayqueue2)綁定到交換機上面
自動過時消息隊列的routing key 設置爲delay 綁定delayqueue2
delayqueue2 的key要設置爲建立自動過時的隊列的x-dead-letter-routing-key參數,這樣當消息過時的時候就能夠自動把消息放入delay_queue2這個隊列中了 綁定後的管理頁面以下圖:
固然這個綁定也可使用代碼來實現,只是爲了直觀表現,因此本文使用的管理平臺來操做 發送消息
String msg = "hello word"; MessageProperties messageProperties = newMessageProperties(); messageProperties.setExpiration("6000"); messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes()); Message message = newMessage(msg.getBytes(), messageProperties); rabbitTemplate.convertAndSend("delay", "delay",message);
設置了讓消息6秒後過時 注意:由於要讓消息自動過時,因此必定不能設置delay_queue1的監聽,不能讓這個隊列裏面的消息被接受到,不然消息一旦被消費,就不存在過時了
接收消息 接收消息配置好delay_queue2的監聽就行了
package wang.raye.rabbitmq.demo1; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration publicclassDelayQueue{ /** 消息交換機的名字*/ publicstaticfinalString EXCHANGE = "delay"; /** 隊列key1*/ publicstaticfinalString ROUTINGKEY1 = "delay"; /** 隊列key2*/ publicstaticfinalString ROUTINGKEY2 = "delay_key"; /** * 配置連接信息 * @return */ @Bean publicConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = newCachingConnectionFactory("120.76.237.8",5672); connectionFactory.setUsername("kberp"); connectionFactory.setPassword("kberp"); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); // 必需要設置 return connectionFactory; } /** * 配置消息交換機 * 針對消費者配置 FanoutExchange: 將消息分發到全部的綁定隊列,無routingkey的概念 HeadersExchange :經過添加屬性key-value匹配 DirectExchange:按照routingkey分發到指定隊列 TopicExchange:多關鍵字匹配 */ @Bean publicDirectExchange defaultExchange() { returnnewDirectExchange(EXCHANGE, true, false); } /** * 配置消息隊列2 * 針對消費者配置 * @return */ @Bean publicQueue queue() { returnnewQueue("delay_queue2", true); //隊列持久 } /** * 將消息隊列2與交換機綁定 * 針對消費者配置 * @return */ @Bean @Autowired publicBinding binding() { returnBindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2); } /** * 接受消息的監聽,這個監聽會接受消息隊列1的消息 * 針對消費者配置 * @return */ @Bean @Autowired publicSimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = newSimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設置確認模式手工確認 container.setMessageListener(newChannelAwareMessageListener() { publicvoid onMessage(Message message, com.rabbitmq.client.Channel channel) throwsException{ byte[] body = message.getBody(); System.out.println("delay_queue2 收到消息 : "+ newString(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認消息成功消費 } }); return container; } }
這種方式能夠自定義進入死信隊列的時間;是否是很完美,可是有的小夥伴的狀況是消息中間件就是rocketmq,公司也不可能會用商業版,怎麼辦?那就進入下一節
(1)建立環形隊列,例如能夠建立一個包含3600個slot的環形隊列(本質是個數組)
(2)任務集合,環上每個slot是一個Set 同時,啓動一個timer,這個timer每隔1s,在上述環形隊列中移動一格,有一個Current Index指針來標識正在檢測的slot。
Task結構中有兩個很重要的屬性: (1)Cycle-Num:當Current Index第幾圈掃描到這個Slot時,執行任務 (2)訂單號,要關閉的訂單號(也能夠是其餘信息,好比:是一個基於某個訂單號的任務)
假設當前Current Index指向第0格,例如在3610秒以後,有一個訂單須要關閉,只需: (1)計算這個訂單應該放在哪個slot,當咱們計算的時候如今指向1,3610秒以後,應該是第10格,因此這個Task應該放在第10個slot的Set中 (2)計算這個Task的Cycle-Num,因爲環形隊列是3600格(每秒移動一格,正好1小時),這個任務是3610秒後執行,因此應該繞3610/3600=1圈以後再執行,因而Cycle-Num=1
Current Index不停的移動,每秒移動到一個新slot,這個slot中對應的Set,每一個Task看Cycle-Num是否是0: (1)若是不是0,說明還須要多移動幾圈,將Cycle-Num減1 (2)若是是0,說明立刻要執行這個關單Task了,取出訂單號執行關單(能夠用單獨的線程來執行Task),並把這個訂單信息從Set中刪除便可。 (1)無需再輪詢所有訂單,效率高 (2)一個訂單,任務只執行一次 (3)時效性好,精確到秒(控制timer移動頻率能夠控制精度)
1.修改redis.windows.conf配置文件中notify-keyspace-events的值 默認配置notify-keyspace-events的值爲 "" 修改成 notify-keyspace-events Ex 這樣便開啓了過時事件
2. 建立配置類RedisListenerConfig(配置RedisMessageListenerContainer這個Bean)
package com.zjt.shop.config; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; @Configuration public class RedisListenerConfig { @Autowired private RedisTemplate redisTemplate; /** * @return */ @Bean public RedisTemplate redisTemplateInit() { // key序列化 redisTemplate.setKeySerializer(new StringRedisSerializer()); //val實例化 redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer()); return redisTemplate; } @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); return container; } }
3.繼承KeyExpirationEventMessageListener建立redis過時事件的監聽類
package com.zjt.shop.common.util; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.zjt.shop.modules.order.service.OrderInfoService; import com.zjt.shop.modules.product.entity.OrderInfoEntity; import com.zjt.shop.modules.product.mapper.OrderInfoMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Component; @Slf4j @Component public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener { public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } @Autowired private OrderInfoMapper orderInfoMapper; /** * 針對redis數據失效事件,進行數據處理 * @param message * @param pattern */ @Override public void onMessage(Message message, byte[] pattern) { try { String key = message.toString(); //從失效key中篩選表明訂單失效的key if (key != null && key.startsWith("order_")) { //截取訂單號,查詢訂單,若是是未支付狀態則爲-取消訂單 String orderNo = key.substring(6); QueryWrapper<OrderInfoEntity> queryWrapper = new QueryWrapper<>(); queryWrapper.eq("order_no",orderNo); OrderInfoEntity orderInfo = orderInfoMapper.selectOne(queryWrapper); if (orderInfo != null) { if (orderInfo.getOrderState() == 0) { //待支付 orderInfo.setOrderState(4); //已取消 orderInfoMapper.updateById(orderInfo); log.info("訂單號爲【" + orderNo + "】超時未支付-自動修改成已取消狀態"); } } } } catch (Exception e) { e.printStackTrace(); log.error("【修改支付訂單過時狀態異常】:" + e.getMessage()); } } }
4:測試 經過redis客戶端存一個有效時間爲3s的訂單:
結果:
總結: 以上方法只是我的對於關單的一些想法,可能有些地方有疏漏,請在公衆號直接留言進行指出,固然若是你有更好的關單方式也能夠隨時溝通交流