延遲隊列,顧名思義它是一種帶有延遲功能的消息隊列。那麼,是在什麼場景下我才須要這樣的隊列呢?git
咱們先看看如下業務場景:redis
設計主要包含如下幾點:spring
仍是基於有讚的延遲隊列設計,進行優化改造及代碼實現。有贊設計json
ZING:DELAY_QUEUE:JOB_POOL
是一個Hash_Table結構,裏面存儲了全部延遲隊列的信息。KV結構:K=prefix+projectName field = topic+jobId V=CONENT;V由客戶端傳入的數據,消費的時候回傳ZING:DELAY_QUEUE:BUCKET
延遲隊列的有序集合ZSET,存放K=ID和須要的執行時間戳,根據時間戳排序ZING:DELAY_QUEUE:QUEUE
LIST結構,每一個Topic一個LIST,list存放的都是當前須要被消費的JOB
圖片僅供參考,基本能夠描述整個流程的執行過程,圖片源於文末的參考博客中數據結構
ZING:DELAY_QUEUE:JOB_POOL
中插入一條數據,記錄了業務方消費方。ZING:DELAY_QUEUE:BUCKET
也會插入一條記錄,記錄執行的時間戳ZING:DELAY_QUEUE:BUCKET
中查找哪些執行時間戳的RunTimeMillis比如今的時間小,將這些記錄所有刪除;同時會解析出每一個任務的Topic是什麼,而後將這些任務PUSH到TOPIC對應的列表ZING:DELAY_QUEUE:QUEUE
中ZING:DELAY_QUEUE:JOB_POOL
查找數據結構,返回給回調結構,執行回調方法。每一個JOB必須包含如下幾個屬性異步
ZING:DELAY_QUEUE:QUEUE
最簡單的實現方式就是使用定時器進行秒級掃描,爲了保證消息執行的時效性,能夠設置每1S請求Redis一次,判斷隊列中是否有待消費的JOB。可是這樣會存在一個問題,若是queue中一直沒有可消費的JOB,那頻繁的掃描就失去了意義,也浪費了資源,幸虧LIST中有一個BLPOP阻塞原語
,若是list中有數據就會立馬返回,若是沒有數據就會一直阻塞在那裏,直到有數據返回,能夠設置阻塞的超時時間,超時會返回NULL;具體的實現方式及策略會在代碼中進行具體的實現介紹分佈式
技術棧:SpringBoot,Redisson,Redis,分佈式鎖,定時器ide
注意:本項目沒有實現設計方案中的多Queue消費,只開啓了一個QUEUE,這個待之後優化spring-boot
/** * 消息結構 * * @author 睜眼看世界 * @date 2020年1月15日 */ @Data public class Job implements Serializable { private static final long serialVersionUID = 1L; /** * Job的惟一標識。用來檢索和刪除指定的Job信息 */ @NotBlank private String jobId; /** * Job類型。能夠理解成具體的業務名稱 */ @NotBlank private String topic; /** * Job須要延遲的時間。單位:秒。(服務端會將其轉換爲絕對時間) */ private Long delay; /** * Job的內容,供消費者作具體的業務處理,以json格式存儲 */ @NotBlank private String body; /** * 失敗重試次數 */ private int retry = 0; /** * 通知URL */ @NotBlank private String url; }
/** * 消息結構 * * @author 睜眼看世界 * @date 2020年1月15日 */ @Data public class JobDie implements Serializable { private static final long serialVersionUID = 1L; /** * Job的惟一標識。用來檢索和刪除指定的Job信息 */ @NotBlank private String jobId; /** * Job類型。能夠理解成具體的業務名稱 */ @NotBlank private String topic; }
/** * 搬運線程 * * @author 睜眼看世界 * @date 2020年1月17日 */ @Slf4j @Component public class CarryJobScheduled { @Autowired private RedissonClient redissonClient; /** * 啓動定時開啓搬運JOB信息 */ @Scheduled(cron = "*/1 * * * * *") public void carryJobToQueue() { System.out.println("carryJobToQueue --->"); RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK); try { boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (!lockFlag) { throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL); } RScoredSortedSet<Object> bucketSet = redissonClient.getScoredSortedSet(RD_ZSET_BUCKET_PRE); long now = System.currentTimeMillis(); Collection<Object> jobCollection = bucketSet.valueRange(0, false, now, true); List<String> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList()); RList<String> readyQueue = redissonClient.getList(RD_LIST_TOPIC_PRE); readyQueue.addAll(jobList); bucketSet.removeAllAsync(jobList); } catch (InterruptedException e) { log.error("carryJobToQueue error", e); } finally { if (lock != null) { lock.unlock(); } } } }
@Slf4j @Component public class ReadyQueueContext { @Autowired private RedissonClient redissonClient; @Autowired private ConsumerService consumerService; /** * TOPIC消費線程 */ @PostConstruct public void startTopicConsumer() { TaskManager.doTask(this::runTopicThreads, "開啓TOPIC消費線程"); } /** * 開啓TOPIC消費線程 * 將全部可能出現的異常所有catch住,確保While(true)可以不中斷 */ @SuppressWarnings("InfiniteLoopStatement") private void runTopicThreads() { while (true) { RLock lock = null; try { lock = redissonClient.getLock(CONSUMER_TOPIC_LOCK); } catch (Exception e) { log.error("runTopicThreads getLock error", e); } try { if (lock == null) { continue; } // 分佈式鎖時間比Blpop阻塞時間多1S,避免出現釋放鎖的時候,鎖已經超時釋放,unlock報錯 boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (!lockFlag) { continue; } // 1. 獲取ReadyQueue中待消費的數據 RBlockingQueue<String> queue = redissonClient.getBlockingQueue(RD_LIST_TOPIC_PRE); String topicId = queue.poll(60, TimeUnit.SECONDS); if (StringUtils.isEmpty(topicId)) { continue; } // 2. 獲取job元信息內容 RMap<String, Job> jobPoolMap = redissonClient.getMap(JOB_POOL_KEY); Job job = jobPoolMap.get(topicId); // 3. 消費 FutureTask<Boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getUrl(), job.getBody()), job.getTopic() + "-->消費JobId-->" + job.getJobId()); if (taskResult.get()) { // 3.1 消費成功,刪除JobPool和DelayBucket的job信息 jobPoolMap.remove(topicId); } else { int retrySum = job.getRetry() + 1; // 3.2 消費失敗,則根據策略從新加入Bucket // 若是重試次數大於5,則將jobPool中的數據刪除,持久化到DB if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) { jobPoolMap.remove(topicId); continue; } job.setRetry(retrySum); long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000; log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime)); RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE); delayBucket.add(nextTime, topicId); // 3.3 更新元信息失敗次數 jobPoolMap.put(topicId, job); } } catch (Exception e) { log.error("runTopicThreads error", e); } finally { if (lock != null) { try { lock.unlock(); } catch (Exception e) { log.error("runTopicThreads unlock error", e); } } } } } }
/** * 提供給外部服務的操做接口 * * @author why * @date 2020年1月15日 */ @Slf4j @Service public class RedisDelayQueueServiceImpl implements RedisDelayQueueService { @Autowired private RedissonClient redissonClient; /** * 添加job元信息 * * @param job 元信息 */ @Override public void addJob(Job job) { RLock lock = redissonClient.getLock(ADD_JOB_LOCK + job.getJobId()); try { boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (!lockFlag) { throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL); } String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId()); // 1. 將job添加到 JobPool中 RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY); if (jobPool.get(topicId) != null) { throw new BusinessException(ErrorMessageEnum.JOB_ALREADY_EXIST); } jobPool.put(topicId, job); // 2. 將job添加到 DelayBucket中 RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE); delayBucket.add(job.getDelay(), topicId); } catch (InterruptedException e) { log.error("addJob error", e); } finally { if (lock != null) { lock.unlock(); } } } /** * 刪除job信息 * * @param job 元信息 */ @Override public void deleteJob(JobDie jobDie) { RLock lock = redissonClient.getLock(DELETE_JOB_LOCK + jobDie.getJobId()); try { boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS); if (!lockFlag) { throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL); } String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId()); RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY); jobPool.remove(topicId); RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE); delayBucket.remove(topicId); } catch (InterruptedException e) { log.error("addJob error", e); } finally { if (lock != null) { lock.unlock(); } } } }
更多詳細源碼請在下面地址中獲取oop
RedisDelayQueue實現
zing-delay-queue RedissonStarter
redisson-spring-boot-starter 項目應用
zing-pay