redis實現簡單的分佈式任務調度

實現方法:java

一、經過zset數據結構實現任務排序,value=任務信息,score=任務觸發時間(unix時間戳)redis

二、ZRANGEBYSCORE task_key -inf  currentTIme,  獲取當前時間之前的全部任務 (消費者定時每隔1秒執行一次獲取)。spring

三、經過redis事務或者腳本實現 獲取任務後同時刪除任務ZREMRANGEBYSCORE task_kety -inf  currentTIme, 解決多臺機器重複消費任務的問題springboot

 

value格式本身設計,舉例子:  70#eating_job,  獲取到value解析爲userid=70的用戶執行eating_job任務數據結構

 

附上獲取任務redis代碼(springboot)ui

public Set<String> zgetAndRemove(String key,Double min,Double max){
        List<Object> txResults = redisTemplate.execute(new SessionCallback<List<Object>>() {
            public List<Object> execute(RedisOperations operations) throws DataAccessException {
                operations.multi();
                operations.opsForZSet().rangeByScore(key,min,max);
                operations.opsForZSet().removeRangeByScore(key,min,max);
                return operations.exec();
            }
        });
        Set<String> rs = null;
        if(txResults != null && txResults.size() == 2){
            rs = (Set<String>)txResults.get(0);
        }
        return rs;
    }

 

/**
     * 每秒從redis中獲取當前要執行的任務
     */
    @Scheduled(cron = "*/1 * * * * ?")
    private void configureTasks() {
        double min = 0d;
        double max = System.currentTimeMillis() / 1000;
        Set<String> task = redisUtil.zgetAndRemove(GlobalConst.DOG_TIMER_REDIS_KEY,min,max);
        for(String member : task){
            try {
                logger.info("{}準備執行",member);
                String[] data = member.split(GlobalConst.DOG_ACTION_REDIS_MEMBER_SPLIT);
                if (data != null && data.length >= 2) {
                    String uid = data[0];
                    String action = data[1];
                    triggerAction(uid, action);
                }
                else{
                    logger.error("{}發現異常的定時任務數據",member);
                }
            }catch (Exception e){
                logger.error("{}動做觸發異常",member);
            }
        }

    }
相關文章
相關標籤/搜索