DelayQueue實現Java延時任務

最近公司須要實現一個訂單超時自動關閉的功能,由Java這塊來實現 java

一開始我覺得就是定時任務,深刻了解了以後發現並非,官方名稱應該叫延時任務,到時間以後 執行傳過來的回調函數redis

這個功能我一共前先後後寫了三版,寫完第三版以後回頭看初版寫的,簡直就是****(髒話)json

第二版是採用多線程輪詢機制實現的 可是針對到時併發執行有很大問題 雖然實際項目中尚未達到高併發 但仍是一直想實現一個完美的方案 因而有了第三版緩存

第三版使用了DelayQueue 其實實現原理是同樣的 只是到時執行的入口任務的結構改了一下 這兩版我都有使用redis作備份 以防項目掛掉任務丟失 解決了多任務同一時間併發問題 和redis儲存結構問題安全

若是有更好的方案 但願您能夠不吝嗇的與我分享一下多線程

PS:RedisService的方案本身寫就能夠了 我使用的是 StringRedisTemplate + Jedis併發

/**
 * @Author : Yanqiang
 * @Date : 2019/1/18
 * @Param : [db]
 * @return : void
 * @Description : 切換不一樣db
 */
public void  switchRedisDb( int db){
    JedisConnectionFactory jedisConnectionFactory = (JedisConnectionFactory) stringRedisTemplate.getConnectionFactory();
    jedisConnectionFactory.setDatabase(db);
    stringRedisTemplate.setConnectionFactory(jedisConnectionFactory);
    ValueOperations valueOperations = stringRedisTemplate.opsForValue();
}
/**
 *  寫入緩存
 */
public void setStringRedis(final String key, String value){
    ValueOperations<String, String> operations = stringRedisTemplate.opsForValue();
    operations.set(key, value);
}
/**
 * 獲取全部key
 * @return
 */
public Set<String> getKeys(){
    Set<String> keys = redisTemplate.keys("*");
    return keys;
}

下面是第二版dom

  參數:  key:時間戳    callback:回調函數ide

  接口:    executeTimer(String date, String callback) 添加任務函數

              running() 執行任務

/**
 * @ClassName : MyTask
 * @Author : Yanqiang
 * @Date : 2019/1/16
 * @Description : 任務實體類
 */
public class MyTask implements Runnable {

    private String  date;//到時時間戳 
    private String  callback;//到時回調函數 

    public MyTask(String date, String callback) {
        this.date = date;
        this.callback = callback;
    }

    /**
     * @Author : Yanqiang
     * @Date : 2019/1/17
     * @Param : []
     * @return : void
     * @Description : 任務
     */
    @Override
    public void run() {
        String result = HttpClientUtil.httpGetMethodString(callback);//httpclient調用
        Thread t1 = new Thread(() -> {
            //把重試交給子線程處理 防止佔用線程池
            retryHttpClient(date, result, callback);
        });
        t1.start();
        System.out.println("已執行 ===編號:"+date+" 路徑:"+callback+"====");
    }

    /**
     * @Author : Yanqiang
     * @Date : 2019/1/17
     * @Param : [firstResult, httpUrl]
     * @return : void
     * @Description : 重試機制 三次 5秒 10秒 15秒 若是還返回失敗 跳過不作處理
     */
    public void retryHttpClient(String time,String firstResult, String httpUrl) {
        RedisService redisService = ApplicationContextProvider.getBean(RedisService.class);
        JSONObject jsonResult = JSONObject.fromObject(firstResult);
        if (null != jsonResult) {
            int errno = (int) jsonResult.get("errno");
            //取結果判斷是否執行成功
            if (errno != 0) {
                try {
                    //5秒 10秒 15秒
                    for (int i = 1; i <= 3; i++) {
                        Thread.sleep(5000 * i);
                        String result = HttpClientUtil.httpGetMethodString(httpUrl);
                        //logger.info("=====執行重試 地址: " + httpUrl + "  返回:" + result + "======");
                        JSONObject retryResult = JSONObject.fromObject(result);
                        if (null != retryResult) {
                            int retryErrno = (int) retryResult.get("errno");
                            //若是執行成功 跳出重試
                            if (retryErrno == 0) {
                                break;
                            }else {
                                //把錯誤的存入redis DB 3庫
                                redisService.switchRedisDb(3);
                                redisService.setStringRedis(time,callback);
                                //切回redis DB 2
                                redisService.switchRedisDb(2);
                            }
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
@Component
@Service
public class TimerReviseService extends Thread implements CommandLineRunner {
    private final static Logger logger = LoggerFactory.getLogger(TimerReviseService.class);
    @Resource
    RedisService redisService;

//線程安全 而且有序的map (key大小排序)
ConcurrentSkipListMap<String, String> map = new ConcurrentSkipListMap<>();

/**
 * 超出核心線程的任務會建立新的線程來執行,
 * 最多線程爲最大線程數,核心線程爲核心線程數,
 * 在任務完成後空閒超過5秒就會被回收
 */
ThreadPoolExecutor executor = new ThreadPoolExecutor(
        3,//核心線程數
        100,//最大線程數
        10,//空閒時間爲5秒
        TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>());

/**
 *   添加任務
 */
@Transactional
public BaseResult executeTimer(String date, String callback) {
    BaseResult baseResult = new BaseResult();
    map.put(date, callback);//扔進任務
    redisService.set(date, callback);//扔進redis
    String log = "key:" + date + " value:" + callback + " 加入 隊列與redis; ";
    logger.info(log);
    baseResult.setData(log);
    return baseResult;
}


@Override
public void run(String... strings) {
  this.running();
}


public void running() {
    //項目啓動先從redis讀取緩存數據
    ConcurrentSkipListMap<String, String> redisMap = new ConcurrentSkipListMap<>();
    Set<String> rediskeys = redisService.getKeys();
    if (null != rediskeys && rediskeys.size()>0){
        for (String key:rediskeys) {
            String value = (String) redisService.get(key);
            redisMap.put(key,value);
        }
        map.putAll(redisMap);
    }
//問題就在這裏 若是兩個甚至幾十個任務同一時間 沒辦法同時執行 並且使用毫秒級的時間戳作key不能保證徹底不重複
    Thread t1 = new Thread(() -> {
        logger.info("===開始輪詢是否有延遲任務===");
        while (true) {
            try {
                if (map.size()>0){
                    Map.Entry<String, String> stringStringEntry = map.firstEntry();//取出第一個 也就是key最小 最先應該執行的任務
                    String key = stringStringEntry.getKey();
                    if (stampToMillis(key) < 0) {//與當前時間相比較 算出相差幾秒 <=0 執行
                        String value = stringStringEntry.getValue();
                        executor.execute(new MyTask(key,value));//使用線程池執行
                        map.remove(key);//任務隊列中刪除
                        redisService.remove(key);//redis中刪除
                    }
                }
                sleep(1000);//1秒查一次
            } catch (Exception e) {
                logger.error(StatusCode.TIMER_SERVER_RUN_ERROR.getMsg());
                e.printStackTrace();
            }
        }
    });
    t1.start();
}

/**
 * @return : long
 * @Author : Yanqiang
 * @Date : 2019/1/8
 * @Param : [stampStr]
 * @Description : 計算兩個時間戳相差秒數
 */
public long stampToMillis(String stampStr) {
    long thisTime = new Long(stampStr);
    long systemTime = new Date().getTime();
    long c = 0;
    if (stampStr.length() == 13) {
        c = (thisTime - systemTime) / 1000;
    } else {
        long systemTime10 = new Date().getTime() / 1000;
        c = (thisTime - systemTime10);
    }
    return c;
}

 如下是使用了DelayQueue的方案

     網上的方案都拆分的比較細 好多個類  我把它們簡化集合到一塊兒了

    1.優化了key方案

    2.內存中使用對象 而不是map 多任務分離 到時互不影響

/**
 * @Author : Yanqiang
 * @Date : 2019/1/24
 * @Param :
 * @return :
 * @Description : 任務調度系統
 *                與上面相同 只是重寫了Delayed 的 compareTo(),getDelay(),hashCode(),equals()方法 
 */
public class Task implements Runnable,Delayed {
    private final static Logger logger = LoggerFactory.getLogger(Task.class);

    //到期時間
    private final long time;
    //redisKey
    private final String redisKey;
    //問題對象
    private final String task;

    private static final AtomicLong atomic = new AtomicLong(0);

    private final long n;

    public Task(long timeout, String t,String redisKey) {
        this.time = timeout;
        this.task = t;
        this.n = atomic.getAndIncrement();
        this.redisKey = redisKey;
    }

    /**
     * 返回與此對象相關的剩餘延遲時間,以給定的時間單位表示
     */
    @Override
    public long getDelay(TimeUnit unit) {
        String thisTime = String.valueOf(time);
        long systemTime = new Date().getTime();
        long c = 0;
        if (thisTime.length() == 13) {
            c = (time - systemTime) / 1000;
        } else {
            long systemTime10 = new Date().getTime() / 1000;
            c = (time - systemTime10);
        }
        return unit.convert(c, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        if (other == this)
            return 0;
        if (other instanceof Task) {
            Task x = (Task) other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
            else if (n < x.n)
                return -1;
            else
                return 1;
        }
        long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }

    public String getTask() {
        return this.task;
    }

    @Override
    public int hashCode() {
        return task.hashCode();
    }

    @Override
    public boolean equals(Object object) {
        if (object instanceof Task) {
            return object.hashCode() == hashCode() ? true : false;
        }
        return false;
    }


    @Override
    public void run() {
        String result = HttpClientUtil.httpGetMethodString(task);
        RedisService redisService = ApplicationContextProvider.getBean(RedisService.class);
        redisService.remove(redisKey);
        Thread t1 = new Thread(() -> {
            //把重試交給子線程處理 防止佔用線程池
            retryHttpClient(result, task);
        });
        t1.start();
        logger.info("已執行 ===編號:"+time+" 路徑:"+task+"====");
    }

    /**
     * @Author : Yanqiang
     * @Date : 2019/1/17
     * @Param : [firstResult, httpUrl]
     * @return : void
     * @Description : 重試機制 三次 5秒 10秒 15秒 若是還返回失敗 跳過不作處理
     */
    public void retryHttpClient(String firstResult, String httpUrl) {
        RedisService redisService = ApplicationContextProvider.getBean(RedisService.class);
        JSONObject jsonResult = JSONObject.fromObject(firstResult);
        if (null != jsonResult) {
            int errno = (int) jsonResult.get("errno");
            //取結果判斷是否執行成功
            if (errno != 0) {
                try {
                    //5秒 10秒 15秒
                    for (int i = 1; i <= 3; i++) {
                        Thread.sleep(5000 * i);
                        String result = HttpClientUtil.httpGetMethodString(httpUrl);
                        //logger.info("=====執行重試 地址: " + httpUrl + "  返回:" + result + "======");
                        JSONObject retryResult = JSONObject.fromObject(result);
                        if (null != retryResult) {
                            int retryErrno = (int) retryResult.get("errno");
                            //若是執行成功 跳出重試
                            if (retryErrno == 0) {
                                break;
                            }else {
                                if (i==3) {
                                    redisService.switchRedisDb(3);
                                    redisService.setStringRedis(redisKey, task);
                                    redisService.switchRedisDb(2);
                                }
                            }
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public long getTime() {
        return time;
    }
}
/**
 * @ClassName : TaskQueueDaemonThread
 * @Author : Yanqiang
 * @Date : 2019/1/24
 * @Description : 
 */
@Service
public class TaskQueueDaemonThread implements CommandLineRunner {
    private final static Logger logger = LoggerFactory.getLogger(TaskQueueDaemonThread.class);
    @Resource
    RedisService redisService;

    //建立一個DelayQueue
    private DelayQueue<Task> queue = new DelayQueue<>();

    private TaskQueueDaemonThread() {
    }


    private static class LazyHolder {
        private static TaskQueueDaemonThread taskQueueDaemonThread = new TaskQueueDaemonThread();
    }

    public static TaskQueueDaemonThread getInstance() {
        return LazyHolder.taskQueueDaemonThread;
    }

    Executor executor = Executors.newFixedThreadPool(100);
    //守護線程
    private Thread daemonThread;

    //初始化守護線程
    public void init() {
        //項目啓動先從redis讀取緩存數據
        Set<String> rediskeys = redisService.getKeys();
        if (null != rediskeys && rediskeys.size() > 0) {
            for (String key : rediskeys) {
                String value = (String) redisService.get(key);
                //拿到key解析 去除 -後四位隨機數
                StringBuffer buffer = new StringBuffer(key);
                String substring = buffer.substring(0, key.length() - 5);
                //放入隊列
                queue.put(new Task(Long.valueOf(substring), value, key));
            }
        }
        daemonThread = new Thread(() -> execute());
        daemonThread.setDaemon(true);
        //daemonThread.setName("任務隊列守護進程線程");
        daemonThread.start();
    }

    private void execute() {
        while (true) {
            try {
                //從queue中取值 取出以後 queue中會刪除此值
                Task task = queue.take();
                if (task != null) {
                    executor.execute(task);//多線程執行任務
                    redisService.remove(String.valueOf(task.getTime()));//redis中刪除
                }
            } catch (Exception e) {
                e.printStackTrace();
                continue;//發生異常 跳過這次
            }
        }
    }


    /**
     * 添加任務,
     * time 延遲時間
     * task 任務
     * 用戶爲問題設置延遲時間
     */
    public BaseResult executeTimer(String time, String callback) {
        BaseResult baseResult = new BaseResult();
        //key =  原key + 四位隨機數
        String randomNum = String.valueOf((long) (Math.random() * 8999) + 1000);
        String randomAndKey = time + "+" + randomNum;
        //建立一個任務,將任務放在延遲的隊列中
        queue.put(new Task(Long.valueOf(time), callback, randomAndKey));
        redisService.set(randomAndKey, callback);
        String log = "key:" + time + " value:" + callback + " 加入隊列與redis key:" + randomAndKey;
        logger.info(log);
        baseResult.setData(log);
        return baseResult;
    }


    /**
     * @Author : Yanqiang
     * @Date : 2019/1/24
     * @Param : [strings]
     * @return : void
     * @Description : 啓動入口
     */
    @Override
    public void run(String... strings) {
        //this.init();
    }
}
相關文章
相關標籤/搜索