在咱們的工做中,不少地方使用延遲隊列,好比訂單到期沒有付款取消訂單,制訂一個提醒的任務等都須要延遲隊列,那麼咱們須要實現延遲隊列。咱們本文的梗概以下,同窗們能夠選擇性閱讀。java
1. 實現一個簡單的延遲隊列。git
咱們知道目前JAVA能夠有DelayedQueue,咱們首先開一個DelayQueue的結構類圖。DelayQueue實現了Delay、BlockingQueue接口。也就是DelayQueue是一種阻塞隊列。github
咱們在看一下Delay的類圖。Delayed接口也實現了Comparable接口,也就是咱們使用Delayed的時候須要實現CompareTo方法。由於隊列中的數據須要排一下前後,根據咱們本身的實現。Delayed接口裏邊有一個方法就是getDelay方法,用於獲取延遲時間,判斷是否時間已經到了延遲的時間,若是到了延遲的時間就能夠從隊列裏邊獲取了。web
咱們建立一個Message類,實現了Delayed接口,咱們主要把getDelay和compareTo進行實現。在Message的構造方法的地方傳入延遲的時間,單位是毫秒,計算好觸發時間fireTime。同時按照延遲時間的升序進行排序。我重寫了裏邊的toString方法,用於將Message按照我寫的方法進行輸出。redis
package com.hqs.delayQueue.bean; import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * @author huangqingshi * @Date 2020-04-18 */ public class Message implements Delayed { private String body; private long fireTime; public String getBody() { return body; } public long getFireTime() { return fireTime; } public Message(String body, long delayTime) { this.body = body; this.fireTime = delayTime + System.currentTimeMillis(); } public long getDelay(TimeUnit unit) { return unit.convert(this.fireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS)); } @Override public String toString() { return System.currentTimeMillis() + ":" + body; } public static void main(String[] args) throws InterruptedException { System.out.println(System.currentTimeMillis() + ":start"); BlockingQueue<Message> queue = new DelayQueue<>(); Message message1 = new Message("hello", 1000 * 5L); Message message2 = new Message("world", 1000 * 7L); queue.put(message1); queue.put(message2); while (queue.size() > 0) { System.out.println(queue.take()); } } }
裏邊的main方法裏邊聲明瞭兩個Message,一個延遲5秒,一個延遲7秒,時間到了以後會將接取出而且打印。輸出的結果以下,正是咱們所指望的。spring
1587218430786:start 1587218435789:hello 1587218437793:world
這個方法實現起來真的很是簡單。可是缺點也是很明顯的,就是數據在內存裏邊,數據比較容易丟失。那麼咱們須要採用Redis實現分佈式的任務處理。docker
2. 使用Redis的list實現分佈式延遲隊列。數據庫
本地須要安裝一個Redis,我本身是使用Docker構建一個Redis,很是快速,命令也沒多少。咱們直接啓動Redis而且暴露6379端口。進入以後直接使用客戶端命令便可查看和調試數據。apache
docker pull redis docker run -itd --name redisLocal -p 6379:6379 redis docker exec -it redisLocal /bin/bash redis-cli
我本地採用spring-boot的方式鏈接redis,pom文件列一下,供你們參考。緩存
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.hqs</groupId> <artifactId>delayQueue</artifactId> <version>0.0.1-SNAPSHOT</version> <name>delayQueue</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
加上Redis的配置放到application.properties裏邊便可實現Redis鏈接,很是的方便。
# redis
redis.host=127.0.0.1
redis.port=6379
redis.password=
redis.maxIdle=100
redis.maxTotal=300
redis.maxWait=10000
redis.testOnBorrow=true
redis.timeout=100000
接下來實現一個基於Redis的list數據類型進行實現的一個類。咱們使用RedisTemplate操做Redis,這個裏邊封裝好咱們所須要的Redis的一些方法,用起來很是方便。這個類容許延遲任務作多有10W個,也是避免數據量過大對Redis形成影響。若是在線上使用的時候也須要考慮延遲任務的多少。太多幾百萬幾千萬的時候可能數據量很是大,咱們須要計算Redis的空間是否夠。這個代碼也是很是的簡單,一個用於存放須要延遲的消息,採用offer的方法。另一個是啓動一個線程, 若是消息時間到了,那麼就將數據lpush到Redis裏邊。
package com.hqs.delayQueue.cache; import com.hqs.delayQueue.bean.Message; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import java.util.concurrent.BlockingQueue; /** * @author huangqingshi * @Date 2020-04-18 */ @Slf4j public class RedisListDelayedQueue{ private static final int MAX_SIZE_OF_QUEUE = 100000; private RedisTemplate<String, String> redisTemplate; private String queueName; private BlockingQueue<Message> delayedQueue; public RedisListDelayedQueue(RedisTemplate<String, String> redisTemplate, String queueName, BlockingQueue<Message> delayedQueue) { this.redisTemplate = redisTemplate; this.queueName = queueName; this.delayedQueue = delayedQueue; init(); } public void offerMessage(Message message) { if(delayedQueue.size() > MAX_SIZE_OF_QUEUE) { throw new IllegalStateException("超過隊列要求最大值,請檢查"); } try { log.info("offerMessage:" + message); delayedQueue.offer(message); } catch (Exception e) { log.error("offMessage異常", e); } } public void init() { new Thread(() -> { while(true) { try { Message message = delayedQueue.take(); redisTemplate.opsForList().leftPush(queueName, message.toString()); } catch (InterruptedException e) { log.error("取消息錯誤", e); } } }).start(); } }
接下來咱們看一下,咱們寫一個測試的controller。你們看一下這個請求/redis/listDelayedQueue的代碼位置。咱們也是生成了兩個消息,而後把消息放到隊列裏邊,另外咱們在啓動一個線程任務,用於將數據從Redis的list中獲取。方法也很是簡單。
package com.hqs.delayQueue.controller; import com.hqs.delayQueue.bean.Message; import com.hqs.delayQueue.cache.RedisListDelayedQueue; import com.hqs.delayQueue.cache.RedisZSetDelayedQueue; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.ResponseBody; import java.util.Set; import java.util.concurrent.*; /** * @author huangqingshi * @Date 2020-04-18 */ @Slf4j @Controller public class DelayQueueController { private static final int CORE_SIZE = Runtime.getRuntime().availableProcessors(); //注意RedisTemplate用的String,String,後續全部用到的key和value都是String的 @Autowired RedisTemplate<String, String> redisTemplate; private static ThreadPoolExecutor taskExecPool = new ThreadPoolExecutor(CORE_SIZE, CORE_SIZE, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); @GetMapping("/redisTest") @ResponseBody public String redisTest() { redisTemplate.opsForValue().set("a","b",60L, TimeUnit.SECONDS); System.out.println(redisTemplate.opsForValue().get("a")); return "s"; } @GetMapping("/redis/listDelayedQueue") @ResponseBody public String listDelayedQueue() { Message message1 = new Message("hello", 1000 * 5L); Message message2 = new Message("world", 1000 * 7L); String queueName = "list_queue"; BlockingQueue<Message> delayedQueue = new DelayQueue<>(); RedisListDelayedQueue redisListDelayedQueue = new RedisListDelayedQueue(redisTemplate, queueName, delayedQueue); redisListDelayedQueue.offerMessage(message1); redisListDelayedQueue.offerMessage(message2); asyncListTask(queueName); return "success"; } @GetMapping("/redis/zSetDelayedQueue") @ResponseBody public String zSetDelayedQueue() { Message message1 = new Message("hello", 1000 * 5L); Message message2 = new Message("world", 1000 * 7L); String queueName = "zset_queue"; BlockingQueue<Message> delayedQueue = new DelayQueue<>(); RedisZSetDelayedQueue redisZSetDelayedQueue = new RedisZSetDelayedQueue(redisTemplate, queueName, delayedQueue); redisZSetDelayedQueue.offerMessage(message1); redisZSetDelayedQueue.offerMessage(message2); asyncZSetTask(queueName); return "success"; } public void asyncListTask(String queueName) { taskExecPool.execute(() -> { for(;;) { String message = redisTemplate.opsForList().rightPop(queueName); if(message != null) { log.info(message); } } }); } public void asyncZSetTask(String queueName) { taskExecPool.execute(() -> { for(;;) { Long nowTimeInMs = System.currentTimeMillis(); System.out.println("nowTimeInMs:" + nowTimeInMs); Set<String> messages = redisTemplate.opsForZSet().rangeByScore(queueName, 0, nowTimeInMs); if(messages != null && messages.size() != 0) { redisTemplate.opsForZSet().removeRangeByScore(queueName, 0, nowTimeInMs); for (String message : messages) { log.info("asyncZSetTask:" + message + " " + nowTimeInMs); } log.info(redisTemplate.opsForZSet().zCard(queueName).toString()); } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }); } }
我就不把運行結果寫出來了,感興趣的同窗本身自行試驗。固然這個方法也是從內存中拿出數據,到時間以後放到Redis裏邊,仍是會存在程序啓動的時候,任務進行丟失。咱們繼續看另一種方法更好的進行這個問題的處理。
3. 使用Redis的zSet實現分佈式延遲隊列。
咱們須要再寫一個ZSet的隊列處理。下邊的offerMessage主要是把消息直接放入緩存中。採用Redis的ZSET的zadd方法。zadd(key, value, score) 即將key=value的數據賦予一個score, 放入緩存中。score就是計算出來延遲的毫秒數。
package com.hqs.delayQueue.cache; import com.hqs.delayQueue.bean.Message; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import java.util.concurrent.BlockingQueue; /** * @author huangqingshi * @Date 2020-04-18 */ @Slf4j public class RedisZSetDelayedQueue { private static final int MAX_SIZE_OF_QUEUE = 100000; private RedisTemplate<String, String> redisTemplate; private String queueName; private BlockingQueue<Message> delayedQueue; public RedisZSetDelayedQueue(RedisTemplate<String, String> redisTemplate, String queueName, BlockingQueue<Message> delayedQueue) { this.redisTemplate = redisTemplate; this.queueName = queueName; this.delayedQueue = delayedQueue; } public void offerMessage(Message message) { if(delayedQueue.size() > MAX_SIZE_OF_QUEUE) { throw new IllegalStateException("超過隊列要求最大值,請檢查"); } long delayTime = message.getFireTime() - System.currentTimeMillis(); log.info("zset offerMessage" + message + delayTime); redisTemplate.opsForZSet().add(queueName, message.toString(), message.getFireTime()); } }
上邊的Controller方法已經寫好了測試的方法。/redis/zSetDelayedQueue,裏邊主要使用ZSet的zRangeByScore(key, min, max)。主要是從score從0,當前時間的毫秒數獲取。取出數據後再採用removeRangeByScore,將數據刪除。這樣數據能夠直接寫到Redis裏邊,而後取出數據後直接處理。這種方法比前邊的方法稍微好一些,可是實際上還存在一些問題,由於依賴Redis,若是Redis內存不足或者連不上的時候,系統將變得不可用。
4. 總結一下,另外還有哪些能夠延遲隊列。
上面的方法其實仍是存在問題的,好比系統重啓的時候仍是會形成任務的丟失。因此咱們在生產上使用的時候,咱們還須要將任務保存起來,好比放到數據庫和文件存儲系統將數據存儲起來,這樣作到double-check,雙重檢查,最終達到任務的99.999%可以處理。
其實還有不少東西能夠實現延遲隊列。
1) RabbitMQ就能夠實現此功能。這個消息隊列能夠把數據保存起來而且進行處理。
2)Kafka也能夠實現這個功能。
3)Netty的HashedWheelTimer也能夠實現這個功能。
有興趣的同窗能夠進一步研究這些內容的實現。
最後放上個人代碼: https://github.com/stonehqs/delayQueue