最近公司須要實現一個訂單超時自動關閉的功能,由Java這塊來實現 java
一開始我覺得就是定時任務,深刻了解了以後發現並非,官方名稱應該叫延時任務,到時間以後 執行傳過來的回調函數redis
這個功能我一共前先後後寫了三版,寫完第三版以後回頭看初版寫的,簡直就是****(髒話)json
第二版是採用多線程輪詢機制實現的 可是針對到時併發執行有很大問題 雖然實際項目中尚未達到高併發 但仍是一直想實現一個完美的方案 因而有了第三版緩存
第三版使用了DelayQueue 其實實現原理是同樣的 只是到時執行的入口 和 任務的結構改了一下 這兩版我都有使用redis作備份 以防項目掛掉任務丟失 解決了多任務同一時間併發問題 和redis儲存結構問題安全
若是有更好的方案 但願您能夠不吝嗇的與我分享一下多線程
PS:RedisService的方案本身寫就能夠了 我使用的是 StringRedisTemplate + Jedis併發
/** * @Author : Yanqiang * @Date : 2019/1/18 * @Param : [db] * @return : void * @Description : 切換不一樣db */ public void switchRedisDb( int db){ JedisConnectionFactory jedisConnectionFactory = (JedisConnectionFactory) stringRedisTemplate.getConnectionFactory(); jedisConnectionFactory.setDatabase(db); stringRedisTemplate.setConnectionFactory(jedisConnectionFactory); ValueOperations valueOperations = stringRedisTemplate.opsForValue(); } /** * 寫入緩存 */ public void setStringRedis(final String key, String value){ ValueOperations<String, String> operations = stringRedisTemplate.opsForValue(); operations.set(key, value); }
/** * 獲取全部key * @return */ public Set<String> getKeys(){ Set<String> keys = redisTemplate.keys("*"); return keys; }
下面是第二版dom
參數: key:時間戳 callback:回調函數ide
接口: executeTimer(String date, String callback) 添加任務函數
running() 執行任務
/** * @ClassName : MyTask * @Author : Yanqiang * @Date : 2019/1/16 * @Description : 任務實體類 */ public class MyTask implements Runnable { private String date;//到時時間戳 private String callback;//到時回調函數 public MyTask(String date, String callback) { this.date = date; this.callback = callback; } /** * @Author : Yanqiang * @Date : 2019/1/17 * @Param : [] * @return : void * @Description : 任務 */ @Override public void run() { String result = HttpClientUtil.httpGetMethodString(callback);//httpclient調用 Thread t1 = new Thread(() -> { //把重試交給子線程處理 防止佔用線程池 retryHttpClient(date, result, callback); }); t1.start(); System.out.println("已執行 ===編號:"+date+" 路徑:"+callback+"===="); } /** * @Author : Yanqiang * @Date : 2019/1/17 * @Param : [firstResult, httpUrl] * @return : void * @Description : 重試機制 三次 5秒 10秒 15秒 若是還返回失敗 跳過不作處理 */ public void retryHttpClient(String time,String firstResult, String httpUrl) { RedisService redisService = ApplicationContextProvider.getBean(RedisService.class); JSONObject jsonResult = JSONObject.fromObject(firstResult); if (null != jsonResult) { int errno = (int) jsonResult.get("errno"); //取結果判斷是否執行成功 if (errno != 0) { try { //5秒 10秒 15秒 for (int i = 1; i <= 3; i++) { Thread.sleep(5000 * i); String result = HttpClientUtil.httpGetMethodString(httpUrl); //logger.info("=====執行重試 地址: " + httpUrl + " 返回:" + result + "======"); JSONObject retryResult = JSONObject.fromObject(result); if (null != retryResult) { int retryErrno = (int) retryResult.get("errno"); //若是執行成功 跳出重試 if (retryErrno == 0) { break; }else { //把錯誤的存入redis DB 3庫 redisService.switchRedisDb(3); redisService.setStringRedis(time,callback); //切回redis DB 2 redisService.switchRedisDb(2); } } } } catch (InterruptedException e) { e.printStackTrace(); } } } } }
@Component @Service public class TimerReviseService extends Thread implements CommandLineRunner { private final static Logger logger = LoggerFactory.getLogger(TimerReviseService.class); @Resource RedisService redisService; //線程安全 而且有序的map (key大小排序) ConcurrentSkipListMap<String, String> map = new ConcurrentSkipListMap<>(); /** * 超出核心線程的任務會建立新的線程來執行, * 最多線程爲最大線程數,核心線程爲核心線程數, * 在任務完成後空閒超過5秒就會被回收 */ ThreadPoolExecutor executor = new ThreadPoolExecutor( 3,//核心線程數 100,//最大線程數 10,//空閒時間爲5秒 TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); /** * 添加任務 */ @Transactional public BaseResult executeTimer(String date, String callback) { BaseResult baseResult = new BaseResult(); map.put(date, callback);//扔進任務 redisService.set(date, callback);//扔進redis String log = "key:" + date + " value:" + callback + " 加入 隊列與redis; "; logger.info(log); baseResult.setData(log); return baseResult; } @Override public void run(String... strings) { this.running(); } public void running() { //項目啓動先從redis讀取緩存數據 ConcurrentSkipListMap<String, String> redisMap = new ConcurrentSkipListMap<>(); Set<String> rediskeys = redisService.getKeys(); if (null != rediskeys && rediskeys.size()>0){ for (String key:rediskeys) { String value = (String) redisService.get(key); redisMap.put(key,value); } map.putAll(redisMap); } //問題就在這裏 若是兩個甚至幾十個任務同一時間 沒辦法同時執行 並且使用毫秒級的時間戳作key不能保證徹底不重複 Thread t1 = new Thread(() -> { logger.info("===開始輪詢是否有延遲任務==="); while (true) { try { if (map.size()>0){ Map.Entry<String, String> stringStringEntry = map.firstEntry();//取出第一個 也就是key最小 最先應該執行的任務 String key = stringStringEntry.getKey(); if (stampToMillis(key) < 0) {//與當前時間相比較 算出相差幾秒 <=0 執行 String value = stringStringEntry.getValue(); executor.execute(new MyTask(key,value));//使用線程池執行 map.remove(key);//任務隊列中刪除 redisService.remove(key);//redis中刪除 } } sleep(1000);//1秒查一次 } catch (Exception e) { logger.error(StatusCode.TIMER_SERVER_RUN_ERROR.getMsg()); e.printStackTrace(); } } }); t1.start(); } /** * @return : long * @Author : Yanqiang * @Date : 2019/1/8 * @Param : [stampStr] * @Description : 計算兩個時間戳相差秒數 */ public long stampToMillis(String stampStr) { long thisTime = new Long(stampStr); long systemTime = new Date().getTime(); long c = 0; if (stampStr.length() == 13) { c = (thisTime - systemTime) / 1000; } else { long systemTime10 = new Date().getTime() / 1000; c = (thisTime - systemTime10); } return c; }
如下是使用了DelayQueue的方案
網上的方案都拆分的比較細 好多個類 我把它們簡化集合到一塊兒了
1.優化了key方案
2.內存中使用對象 而不是map 多任務分離 到時互不影響
/** * @Author : Yanqiang * @Date : 2019/1/24 * @Param : * @return : * @Description : 任務調度系統 * 與上面相同 只是重寫了Delayed 的 compareTo(),getDelay(),hashCode(),equals()方法 */ public class Task implements Runnable,Delayed { private final static Logger logger = LoggerFactory.getLogger(Task.class); //到期時間 private final long time; //redisKey private final String redisKey; //問題對象 private final String task; private static final AtomicLong atomic = new AtomicLong(0); private final long n; public Task(long timeout, String t,String redisKey) { this.time = timeout; this.task = t; this.n = atomic.getAndIncrement(); this.redisKey = redisKey; } /** * 返回與此對象相關的剩餘延遲時間,以給定的時間單位表示 */ @Override public long getDelay(TimeUnit unit) { String thisTime = String.valueOf(time); long systemTime = new Date().getTime(); long c = 0; if (thisTime.length() == 13) { c = (time - systemTime) / 1000; } else { long systemTime10 = new Date().getTime() / 1000; c = (time - systemTime10); } return unit.convert(c, TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed other) { if (other == this) return 0; if (other instanceof Task) { Task x = (Task) other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (n < x.n) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0) ? 0 : ((d < 0) ? -1 : 1); } public String getTask() { return this.task; } @Override public int hashCode() { return task.hashCode(); } @Override public boolean equals(Object object) { if (object instanceof Task) { return object.hashCode() == hashCode() ? true : false; } return false; } @Override public void run() { String result = HttpClientUtil.httpGetMethodString(task); RedisService redisService = ApplicationContextProvider.getBean(RedisService.class); redisService.remove(redisKey); Thread t1 = new Thread(() -> { //把重試交給子線程處理 防止佔用線程池 retryHttpClient(result, task); }); t1.start(); logger.info("已執行 ===編號:"+time+" 路徑:"+task+"===="); } /** * @Author : Yanqiang * @Date : 2019/1/17 * @Param : [firstResult, httpUrl] * @return : void * @Description : 重試機制 三次 5秒 10秒 15秒 若是還返回失敗 跳過不作處理 */ public void retryHttpClient(String firstResult, String httpUrl) { RedisService redisService = ApplicationContextProvider.getBean(RedisService.class); JSONObject jsonResult = JSONObject.fromObject(firstResult); if (null != jsonResult) { int errno = (int) jsonResult.get("errno"); //取結果判斷是否執行成功 if (errno != 0) { try { //5秒 10秒 15秒 for (int i = 1; i <= 3; i++) { Thread.sleep(5000 * i); String result = HttpClientUtil.httpGetMethodString(httpUrl); //logger.info("=====執行重試 地址: " + httpUrl + " 返回:" + result + "======"); JSONObject retryResult = JSONObject.fromObject(result); if (null != retryResult) { int retryErrno = (int) retryResult.get("errno"); //若是執行成功 跳出重試 if (retryErrno == 0) { break; }else { if (i==3) { redisService.switchRedisDb(3); redisService.setStringRedis(redisKey, task); redisService.switchRedisDb(2); } } } } } catch (InterruptedException e) { e.printStackTrace(); } } } } public long getTime() { return time; } }
/** * @ClassName : TaskQueueDaemonThread * @Author : Yanqiang * @Date : 2019/1/24 * @Description : */ @Service public class TaskQueueDaemonThread implements CommandLineRunner { private final static Logger logger = LoggerFactory.getLogger(TaskQueueDaemonThread.class); @Resource RedisService redisService; //建立一個DelayQueue private DelayQueue<Task> queue = new DelayQueue<>(); private TaskQueueDaemonThread() { } private static class LazyHolder { private static TaskQueueDaemonThread taskQueueDaemonThread = new TaskQueueDaemonThread(); } public static TaskQueueDaemonThread getInstance() { return LazyHolder.taskQueueDaemonThread; } Executor executor = Executors.newFixedThreadPool(100); //守護線程 private Thread daemonThread; //初始化守護線程 public void init() { //項目啓動先從redis讀取緩存數據 Set<String> rediskeys = redisService.getKeys(); if (null != rediskeys && rediskeys.size() > 0) { for (String key : rediskeys) { String value = (String) redisService.get(key); //拿到key解析 去除 -後四位隨機數 StringBuffer buffer = new StringBuffer(key); String substring = buffer.substring(0, key.length() - 5); //放入隊列 queue.put(new Task(Long.valueOf(substring), value, key)); } } daemonThread = new Thread(() -> execute()); daemonThread.setDaemon(true); //daemonThread.setName("任務隊列守護進程線程"); daemonThread.start(); } private void execute() { while (true) { try { //從queue中取值 取出以後 queue中會刪除此值 Task task = queue.take(); if (task != null) { executor.execute(task);//多線程執行任務 redisService.remove(String.valueOf(task.getTime()));//redis中刪除 } } catch (Exception e) { e.printStackTrace(); continue;//發生異常 跳過這次 } } } /** * 添加任務, * time 延遲時間 * task 任務 * 用戶爲問題設置延遲時間 */ public BaseResult executeTimer(String time, String callback) { BaseResult baseResult = new BaseResult(); //key = 原key + 四位隨機數 String randomNum = String.valueOf((long) (Math.random() * 8999) + 1000); String randomAndKey = time + "+" + randomNum; //建立一個任務,將任務放在延遲的隊列中 queue.put(new Task(Long.valueOf(time), callback, randomAndKey)); redisService.set(randomAndKey, callback); String log = "key:" + time + " value:" + callback + " 加入隊列與redis key:" + randomAndKey; logger.info(log); baseResult.setData(log); return baseResult; } /** * @Author : Yanqiang * @Date : 2019/1/24 * @Param : [strings] * @return : void * @Description : 啓動入口 */ @Override public void run(String... strings) { //this.init(); } }