在定時任務中爲了加快處理速度,通常都會使用多線程處理業務。須要注意一下事項:java
1. 定時任務是否容許上一個定時任務未結束,下一個定時任務能夠啓動,經過Scheduled中的配置在決定。服務器
2. 主線程已經關閉,線程池中的線程還在運行問題。線程池的關閉方法問題多線程
3. 定時任務有大量數據,致使服務沒法中止問題。線程
4. 如何獲取線程的處理結果code
以下代碼是示例,stop狀態的使用和線程池shutdown的處理邏輯須要依據本身的業務來處理。orm
@PreDestroy public void destory(){ stop = true; } //線程中止狀態, 經過註解檢測到服務器中止時修改stop狀態 boolean stop = false; //服務器啓動後延遲1分鐘執行,定時任務結束後延遲1分鐘執行下一次 @Scheduled(initialDelay = 60*1000L, fixedDelay = 60*1000L) public void scheduling(){ List<String> dataList = new ArrayList<>(); for (int i = 0; i < 1000; i++) { dataList.add("data_"+i); } int threadSize = 10; ExecutorService esPool = Executors.newFixedThreadPool(threadSize); //接收線程的完成時間 或者其餘返回結果 CompletionService<String> ecs = new ExecutorCompletionService<String>(esPool); ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(dataList); logger.info("===============start {}==================", System.currentTimeMillis()); //啓動線程時修改退出線程狀態 stop = false; for (int i = 0; i < threadSize; i++) { ecs.submit(()->{ long count = 0; //線程處理加try catch 防止拋出異常中斷線程,可能會致使線程池中全部的線程都中斷,無可用線程 try{ // !queue.isEmpty()比queue.size()>0效率高不少 .size() 是要遍歷一遍集合的 while (!stop && !queue.isEmpty()){ String data = queue.poll(); //500 能夠在60秒內完成處理,正常退出 //改爲 1000 若是不使用下面的收集結果代碼,60秒內沒法處理完,會強制shutdown 拋出異常 Thread.sleep(1000L); logger.info("data {} ok.",data); count++; } }catch (Exception e){ logger.error("",e); } //這裏範圍線程處理的結果 return System.currentTimeMillis()+"_"+count; }); } //獲取線程的返回結果 會阻塞主線程,直到線程池中全部的線程返回結果 /*try { for (int i = 0; i < threadSize; i++) { String threadTime = ecs.take().get(); logger.info("thread run ok time:{}"+threadTime); } } catch (InterruptedException e) { e.printStackTrace(); }catch (ExecutionException e) { e.printStackTrace(); }*/ //關閉線程池 try { esPool.shutdown(); logger.info("esPoll shutdown:{}", DateUtil.format(new Date(),DateUtil.PATTERN_DEFAULT)); //線程池阻塞,到指定的時間退出,若是全部線程執行完成返回true 不然返回false boolean await = esPool.awaitTermination(60*1000L,TimeUnit.MILLISECONDS); logger.info("esPool.awaitTermination 1:{}, {}",await,DateUtil.format(new Date(),DateUtil.PATTERN_DEFAULT)); if(!await) { stop = true; await = esPool.awaitTermination(10*1000L,TimeUnit.MILLISECONDS); logger.info("esPool.awaitTermination 2:{}, {}",await,DateUtil.format(new Date(),DateUtil.PATTERN_DEFAULT)); } if(!await){ logger.info("wait 60s not stop, shutdownNow"); // 超時的時候向線程池中全部的線程發出中斷(interrupted)。 // 讓線程池中的全部線程當即中斷。 會拋出異常 esPool.shutdownNow(); } } catch (InterruptedException e) { //awaitTermination方法被中斷的時候也停止線程池中所有的線程的執行。 esPool.shutdownNow(); logger.error("awaitTermination",e); } logger.info("===============end {}==================", System.currentTimeMillis()); }