SpringBoot中併發定時任務的實現、動態定時任務的實現(看這一篇就夠了)

原創不易,如需轉載,請註明出處http://www.javashuo.com/article/p-zqnnuvrr-cx.html,不然將追究法律責任!!! html

1、在JAVA開發領域,目前能夠經過如下幾種方式進行定時任務

一、單機部署模式

  • Timer:jdk中自帶的一個定時調度類,能夠簡單的實現按某一頻度進行任務執行。提供的功能比較單一,沒法實現複雜的調度任務。
  • ScheduledExecutorService:也是jdk自帶的一個基於線程池設計的定時任務類。其每一個調度任務都會分配到線程池中的一個線程執行,因此其任務是併發執行的,互不影響。
  • Spring Task:Spring提供的一個任務調度工具,支持註解和配置文件形式,支持Cron表達式,使用簡單但功能強大。
  • Quartz:一款功能強大的任務調度器,能夠實現較爲複雜的調度功能,如每個月一號執行、天天凌晨執行、每週五執行等等,還支持分佈式調度,就是配置稍顯複雜。

二、分佈式集羣模式(很少介紹,簡單提一下)

問題:

I、如何解決定時任務的屢次執行?
II、如何解決任務的單點問題,實現任務的故障轉移?

問題I的簡單思考:

一、固定執行定時任務的機器(能夠有效避免屢次執行的狀況 ,缺點就是單點故障問題)。
二、藉助Redis的過時機制和分佈式鎖。
三、藉助mysql的鎖機制等。

成熟的解決方案:

一、Quartz:能夠去看看這篇文章[Quartz分佈式]( https://www.cnblogs.com/jiafuwei/p/6145280.html)。
二、elastic-job:(https://github.com/elasticjob/elastic-job-lite)噹噹開發的彈性分佈式任務調度系統,採用zookeeper實現分佈式協調,實現任務高可用以及分片。
三、xxl-job:(https://github.com/xuxueli/xxl-job)是大衆點評員發佈的分佈式任務調度平臺,是一個輕量級分佈式任務調度框架。
四、saturn:(https://github.com/vipshop/Saturn) 是惟品會提供一個分佈式、容錯和高可用的做業調度服務框架。

2、SpringTask實現定時任務(這裏是基於springboot)

一、簡單的定時任務實現

使用方式:

使用@EnableScheduling註解開啓對定時任務的支持。
使用@Scheduled 註解便可,基於corn、fixedRate、fixedDelay等一些定時策略來實現定時任務。

使用缺點:

一、多個定時任務使用的是同一個調度線程,因此任務是阻塞執行的,執行效率不高。
二、其次若是出現任務阻塞,致使一些場景的定時計算沒有實際意義,好比天天12點的一個計算任務被阻塞到1點去執行,會致使結果並不是咱們想要的。

使用優勢:

一、配置簡單
二、適用於單個後臺線程執行週期任務,而且保證順序一致執行的場景

源碼分析:

//默認使用的調度器
if(this.taskScheduler == null) {  
    this.localExecutor = Executors.newSingleThreadScheduledExecutor();
    this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
//能夠看到SingleThreadScheduledExecutor指定的核心線程爲1,說白了就是單線程執行
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}
//利用了DelayedWorkQueue延時隊列做爲任務的存放隊列,這樣即可以實現任務延遲執行或者定時執行
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

  

二、實現併發的定時任務

使用方式:

  • 方式一:由1中咱們知道之因此定時任務是阻塞執行,是配置的線程池決定的,那就好辦了,換一個不就好了!直接上代碼:java

    @Configuration
      public class ScheduledConfig implements SchedulingConfigurer {
    
          @Autowired
          private TaskScheduler myThreadPoolTaskScheduler;
    
          @Override
          public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
              //簡單粗暴的方式直接指定
              //scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));
              //也能夠自定義的線程池,方便線程的使用與維護,這裏很少說了
              scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);
          }
      }
    
      @Bean(name = "myThreadPoolTaskScheduler")
      public TaskScheduler getMyThreadPoolTaskScheduler() {
          ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
          taskScheduler.setPoolSize(10);
          taskScheduler.setThreadNamePrefix("Haina-Scheduled-");
          taskScheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
          //調度器shutdown被調用時等待當前被調度的任務完成
          taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
          //等待時長
          taskScheduler.setAwaitTerminationSeconds(60);
          return taskScheduler;
      }
  • 方式二:方式一的本質改變了任務調度器默認使用的線程池,接下來這種是不改變調度器的默認線程池,而是把當前任務交給一個異步線程池去執行mysql

    • 首先使用@EnableAsync 啓用異步任務
    • 而後在定時任務的方法加上@Async便可,默認使用的線程池爲SimpleAsyncTaskExecutor(該線程池默認來一個任務建立一個線程,就會不斷建立大量線程,極有可能壓爆服務器內存。固然它有本身的限流機制,這裏就很少說了,有興趣的本身翻翻源碼~)
    • 項目中爲了更好的控制線程的使用,咱們能夠自定義咱們本身的線程池,使用方式@Async("myThreadPool")

    廢話太多,直接上代碼:git

    @Scheduled(fixedRate = 1000*10,initialDelay = 1000*20)
      @Async("myThreadPoolTaskExecutor")
      //@Async
      public void scheduledTest02(){
          System.out.println(Thread.currentThread().getName()+"--->xxxxx--->"+Thread.currentThread().getId());
      }
    
      //自定義線程池
      @Bean(name = "myThreadPoolTaskExecutor")
      public TaskExecutor  getMyThreadPoolTaskExecutor() {
          ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
          taskExecutor.setCorePoolSize(20);
          taskExecutor.setMaxPoolSize(200);
          taskExecutor.setQueueCapacity(25);
          taskExecutor.setKeepAliveSeconds(200);
          taskExecutor.setThreadNamePrefix("Haina-ThreadPool-");
          // 線程池對拒絕任務(無線程可用)的處理策略,目前只支持AbortPolicy、CallerRunsPolicy;默認爲後者
          taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
          //調度器shutdown被調用時等待當前被調度的任務完成
          taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
          //等待時長
          taskExecutor.setAwaitTerminationSeconds(60);
          taskExecutor.initialize();
          return taskExecutor;
      }
  • 線程池的使用心得(後續有專門文章來探討)github

    • java中提供了ThreadPoolExecutor和ScheduledThreadPoolExecutor,對應與spring中的ThreadPoolTaskExecutor和ThreadPoolTaskScheduler,可是在原有的基礎上增長了新的特性,在spring環境下更容易使用和控制。
    • 使用自定義的線程池可以避免一些默認線程池形成的內存溢出、阻塞等等問題,更貼合本身的服務特性
    • 使用自定義的線程池便於對項目中線程的管理、維護以及監控。
    • 即使在非spring環境下也不要使用java默認提供的那幾種線程池,坑不少,阿里代碼規約不說了嗎,得相信大廠!!!

3、動態定時任務的實現

問題:

  • 使用@Scheduled註解來完成設置定時任務,可是有時候咱們每每須要對週期性的時間的設置會作一些改變,或者要動態的啓停一個定時任務,那麼這個時候使用此註解就不太方便了,緣由在於這個註解中配置的cron表達式必須是常量,那麼當咱們修改定時參數的時候,就須要中止服務,從新部署。spring

    解決辦法:

  • 方式一:實現SchedulingConfigurer接口,重寫configureTasks方法,從新制定Trigger,核心方法就是addTriggerTask(Runnable task, Trigger trigger) ,不過須要注意的是,此種方式修改了配置值後,須要在下一次調度結束後,纔會更新調度器,並不會在修改配置值時實時更新,實時更新須要在修改配置值時額外增長相關邏輯處理。sql

    @Configuration
      public class ScheduledConfig implements SchedulingConfigurer {
    
      @Autowired
      private TaskScheduler myThreadPoolTaskScheduler;
    
      @Override
      public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
          //scheduledTaskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));
          scheduledTaskRegistrar.setTaskScheduler(myThreadPoolTaskScheduler);
          //能夠實現動態調整定時任務的執行頻率
          scheduledTaskRegistrar.addTriggerTask(
                  //1.添加任務內容(Runnable)
                  () -> System.out.println("cccccccccccccccc--->" + Thread.currentThread().getId()),
                  //2.設置執行週期(Trigger)
                  triggerContext -> {
                      //2.1 從數據庫動態獲取執行週期
                      String cron = "0/2 * * * * ? ";
                      //2.2 合法性校驗.
      //                    if (StringUtils.isEmpty(cron)) {
      //                        // Omitted Code ..
      //                    }
                          //2.3 返回執行週期(Date)
                          return new CronTrigger(cron).nextExecutionTime(triggerContext);
                      }
              );
      }
      }
  • 方式二:使用threadPoolTaskScheduler類可實現動態添加刪除功能,固然也可實現執行頻率的調整數據庫

    首先,咱們要認識下這個調度類,它實際上是對java中ScheduledThreadPoolExecutor的一個封裝改進後的產物,主要改進有如下幾點:
          一、提供默認配置,由於是ScheduledThreadPoolExecutor,因此只有poolSize這一個默認參數。
          二、支持自定義任務,經過傳入Trigger參數。
          三、對任務出錯處理進行優化,若是是重複性的任務,不拋出異常,經過日誌記錄下來,不影響下次運行,若是是隻執行一次的任務,將異常往上拋。
      順便說下ThreadPoolTaskExecutor相對於ThreadPoolExecutor的改進點:
          一、提供默認配置,原生的ThreadPoolExecutor的除了ThreadFactory和RejectedExecutionHandler其餘沒有默認配置
          二、實現AsyncListenableTaskExecutor接口,支持對FutureTask添加success和fail的回調,任務成功或失敗的時候回執行對應回調方法。
          三、由於是spring的工具類,因此拋出的RejectedExecutionException也會被轉換爲spring框架的TaskRejectedException異常(這個無所謂)
          四、提供默認ThreadFactory實現,直接經過參數重載配置

    扯了這麼多,仍是直接上代碼:segmentfault

    @Component
      public class DynamicTimedTask {
    
          private static final Logger logger = LoggerFactory.getLogger(DynamicTimedTask.class);
    
          //利用建立好的調度類統一管理
          //@Autowired
          //@Qualifier("myThreadPoolTaskScheduler")
          //private ThreadPoolTaskScheduler myThreadPoolTaskScheduler;
    
    
          //接受任務的返回結果
          private ScheduledFuture<?> future;
    
          @Autowired
          private ThreadPoolTaskScheduler threadPoolTaskScheduler;
    
          //實例化一個線程池任務調度類,可使用自定義的ThreadPoolTaskScheduler
          @Bean
          public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
              ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
              return new ThreadPoolTaskScheduler();
          }
    
    
          /**
           * 啓動定時任務
           * @return
           */
          public boolean startCron() {
              boolean flag = false;
              //從數據庫動態獲取執行週期
              String cron = "0/2 * * * * ? ";
              future = threadPoolTaskScheduler.schedule(new CheckModelFile(),cron);
              if (future!=null){
                  flag = true;
                  logger.info("定時check訓練模型文件,任務啓動成功!!!");
              }else {
                  logger.info("定時check訓練模型文件,任務啓動失敗!!!");
              }
              return flag;
          }
    
          /**
           * 中止定時任務
           * @return
           */
          public boolean stopCron() {
              boolean flag = false;
              if (future != null) {
                  boolean cancel = future.cancel(true);
                  if (cancel){
                      flag = true;
                      logger.info("定時check訓練模型文件,任務中止成功!!!");
                  }else {
                      logger.info("定時check訓練模型文件,任務中止失敗!!!");
                  }
              }else {
                  flag = true;
                  logger.info("定時check訓練模型文件,任務已經中止!!!");
              }
              return flag;
          }
    
    
          class CheckModelFile implements Runnable{
    
              @Override
              public void run() {
                  //編寫你本身的業務邏輯  
                  System.out.print("模型文件檢查完畢!!!")
              }
          }
    
      }

4、總結

  • 到此基於springtask下的定時任務的簡單使用算是差很少了,其中難免有些錯誤的地方,或者理解有偏頗的地方歡迎你們提出來!
  • 基於分佈式集羣下的定時任務使用,後續有時間再繼續!!!

我的博客地址:springboot

csdn:https://blog.csdn.net/tiantuo6513
cnblogs:https://www.cnblogs.com/baixianlong
segmentfault:https://segmentfault.com/u/baixianlong github:https://github.com/xianlongbai

相關文章
相關標籤/搜索