最近的新項目和數據同步相關,有定時調度的需求。以前一直有使用過Quartz
、XXL-Job
、Easy Scheduler
等調度框架,後來愈加以爲這些框架過重量級了,因而想到了Spring
內置的Scheduling
模塊。而原生的Scheduling
模塊只是內存態的調度模塊,不支持任務的持久化或者配置(配置任務經過@Scheduled
註解進行硬編碼,不能抽離到類以外),所以考慮理解Scheduling
模塊的底層原理,而且基於此造一個簡單的輪子,使之支持調度任務配置:經過配置文件或者JDBC
數據源。java
Scheduling
模塊是spring-context
依賴下的一個包org.springframework.scheduling
:mysql
這個模塊的類並很少,有四個子包:web
org.springframework.scheduling.annotation
:定義了調度、異步任務相關的註解和解析類,經常使用的註解如@Async
、@EnableAsync
、@EnableScheduling
和@Scheduled
。org.springframework.scheduling.concurrent
:定義了調度任務執行器和相對應的FactoryBean
。org.springframework.scheduling.config
:定義了配置解析、任務具體實現類、調度任務XML
配置文件解析相關的解析類。org.springframework.scheduling.support
:定義了反射支持類、Cron
表達式解析器等工具類。若是想單獨使用Scheduling
,只須要引入spring-context
這個依賴。可是如今流行使用SpringBoot
,引入spring-boot-starter-web
已經集成了spring-context
,能夠直接使用Scheduling
模塊,筆者編寫本文的時候(2020-03-14
)SpringBoot
的最新版本爲2.2.5.RELEASE
,能夠選用此版本進行源碼分析或者生產應用:redis
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <spring.boot.version>2.2.5.RELEASE</spring.boot.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>
開啓Scheduling
模塊支持只須要在某一個配置類中添加@EnableScheduling
註解便可,通常爲了明確模塊的引入,建議在啓動類中使用此註解,如:spring
@EnableScheduling @SpringBootApplication public class App { public static void main(String[] args) { SpringApplication.run(App.class, args); } }
這個圖描述了Scheduling
模塊的工做流程,這裏分析一下非XML
配置下的流程(右邊的分支):sql
@EnableScheduling
中的@Import
引入了SchedulingConfiguration
,而SchedulingConfiguration
中配置了一個類型爲ScheduledAnnotationBeanPostProcessor
名稱爲org.springframework.context.annotation.internalScheduledAnnotationProcessor
的Bean
,這裏有個常見的技巧,Spring
內部加載的Bean
通常會定義名稱爲internalXXX
,Bean
的role
會定義爲ROLE_INFRASTRUCTURE = 2
。Bean
後置處理器ScheduledAnnotationBeanPostProcessor
會解析和處理每個符合特定類型的Bean
中的@Scheduled
註解(注意@Scheduled
只能使用在方法或者註解上),而且把解析完成的方法封裝爲不一樣類型的Task
實例,緩存在ScheduledTaskRegistrar
中的。ScheduledAnnotationBeanPostProcessor
中的鉤子接口方法afterSingletonsInstantiated()
在全部單例初始化完成以後回調觸發,在此方法中設置了ScheduledTaskRegistrar
中的任務調度器(TaskScheduler
或者ScheduledExecutorService
類型)實例,而且調用ScheduledTaskRegistrar#afterPropertiesSet()
方法添加全部緩存的Task
實例到任務調度器中執行。Scheduling
模塊支持TaskScheduler
或者ScheduledExecutorService
類型的任務調度器,而ScheduledExecutorService
實際上是JDK
併發包java.util.concurrent
的接口,通常實現類就是調度線程池ScheduledThreadPoolExecutor
。實際上,ScheduledExecutorService
類型的實例最終會經過適配器模式轉變爲ConcurrentTaskScheduler
,因此這裏只須要分析TaskScheduler
類型的執行器。shell
ThreadPoolTaskScheduler
:基於線程池實現的任務執行器,這個是最經常使用的實現,底層依賴於ScheduledThreadPoolExecutor
實現。ConcurrentTaskScheduler
:TaskScheduler
接口和ScheduledExecutorService
接口的適配器,若是自定義一個ScheduledThreadPoolExecutor
類型的Bean
,那麼任務執行器就會適配爲ConcurrentTaskScheduler
。DefaultManagedTaskScheduler
:JDK7
引入的JSR-236
的支持,能夠經過JNDI
配置此調度執行器,通常不多用到,底層也是依賴於ScheduledThreadPoolExecutor
實現。也就是說,內置的三個調度器類型底層都依賴於JUC
調度線程池ScheduledThreadPoolExecutor
。這裏分析一下頂層接口org.springframework.scheduling.TaskScheduler
提供的功能(筆者已經把功能一致的default
方法暫時移除):express
// 省略一些功能一致的default方法 public interface TaskScheduler { // 調度一個任務,經過觸發器實例指定觸發時間週期 ScheduledFuture<?> schedule(Runnable task, Trigger trigger); // 指定起始時間調度一個任務 - 單次執行 ScheduledFuture<?> schedule(Runnable task, Date startTime); // 指定固定頻率調度一個任務,period的單位是毫秒 ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period); // 指定起始時間和固定頻率調度一個任務,period的單位是毫秒 ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period); // 指定固定延遲間隔調度一個任務,delay的單位是毫秒 ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay); // 指定起始時間和固定延遲間隔調度一個任務,delay的單位是毫秒 ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay); }
Scheduling
模塊中支持不一樣類型的任務,主要包括下面的3種(解析的優先順序也是以下):編程
Cron
表達式任務,支持經過Cron
表達式配置執行的週期,對應的任務類型爲org.springframework.scheduling.config.CronTask
。org.springframework.scheduling.config.FixedDelayTask
。org.springframework.scheduling.config.FixedRateTask
。關於這幾類Task
,舉幾個簡單的例子。CronTask
是經過cron
表達式指定執行週期的,而且不支持延遲執行,可使用特殊字符-
禁用任務執行:json
// 註解聲明式使用 - 每五秒執行一次,不支持initialDelay @Scheduled(cron = "*/5 * * * * ?") public void processTask(){ } // 註解聲明式使用 - 禁止任務執行 @Scheduled(cron = "-") public void processTask(){ } // 編程式使用 public class Tasks { static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) throws Exception { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.setPoolSize(10); taskScheduler.initialize(); CronTask cronTask = new CronTask(() -> { System.out.println(String.format("[%s] - CronTask觸發...", F.format(LocalDateTime.now()))); }, "*/5 * * * * ?"); taskScheduler.schedule(cronTask.getRunnable(),cronTask.getTrigger()); Thread.sleep(Integer.MAX_VALUE); } } // 某次執行輸出結果 [2020-03-16 01:07:00] - CronTask觸發... [2020-03-16 01:07:05] - CronTask觸發... ......
FixedDelayTask
須要配置延遲間隔值(fixedDelay
或者fixedDelayString
)和可選的起始延遲執行時間(initialDelay
或者initialDelayString
),這裏注意一點是fixedDelayString
和initialDelayString
都支持從EmbeddedValueResolver
(簡單理解爲配置文件的屬性處理器)讀取和Duration
(例如P2D
就是parses as 2 days
,表示86400秒)支持格式的解析:
// 註解聲明式使用 - 延遲一秒開始執行,延遲間隔爲5秒 @Scheduled(fixedDelay = 5000, initialDelay = 1000) public void process(){ } // 註解聲明式使用 - spring-boot配置文件中process.task.fixedDelay=5000 process.task.initialDelay=1000 @Scheduled(fixedDelayString = "${process.task.fixedDelay}", initialDelayString = "${process.task.initialDelay}") public void process(){ } // 編程式使用 public class Tasks { static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) throws Exception { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.setPoolSize(10); taskScheduler.initialize(); FixedDelayTask fixedDelayTask = new FixedDelayTask(() -> { System.out.println(String.format("[%s] - FixedDelayTask觸發...", F.format(LocalDateTime.now()))); }, 5000, 1000); Date startTime = new Date(System.currentTimeMillis() + fixedDelayTask.getInitialDelay()); taskScheduler.scheduleWithFixedDelay(fixedDelayTask.getRunnable(), startTime, fixedDelayTask.getInterval()); Thread.sleep(Integer.MAX_VALUE); } } // 某次執行輸出結果 [2020-03-16 01:06:12] - FixedDelayTask觸發... [2020-03-16 01:06:17] - FixedDelayTask觸發... ......
FixedRateTask
須要配置固定間隔值(fixedRate
或者fixedRateString
)和可選的起始延遲執行時間(initialDelay
或者initialDelayString
),這裏注意一點是fixedRateString
和initialDelayString
都支持從EmbeddedValueResolver
(簡單理解爲配置文件的屬性處理器)讀取和Duration
(例如P2D
就是parses as 2 days
,表示86400秒)支持格式的解析:
// 註解聲明式使用 - 延遲一秒開始執行,每隔5秒執行一次 @Scheduled(fixedRate = 5000, initialDelay = 1000) public void processTask(){ } // 註解聲明式使用 - spring-boot配置文件中process.task.fixedRate=5000 process.task.initialDelay=1000 @Scheduled(fixedRateString = "${process.task.fixedRate}", initialDelayString = "${process.task.initialDelay}") public void process(){ } // 編程式使用 public class Tasks { static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) throws Exception { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.setPoolSize(10); taskScheduler.initialize(); FixedRateTask fixedRateTask = new FixedRateTask(() -> { System.out.println(String.format("[%s] - FixedRateTask觸發...", F.format(LocalDateTime.now()))); }, 5000, 1000); Date startTime = new Date(System.currentTimeMillis() + fixedRateTask.getInitialDelay()); taskScheduler.scheduleAtFixedRate(fixedRateTask.getRunnable(), startTime, fixedRateTask.getInterval()); Thread.sleep(Integer.MAX_VALUE); } } // 某次執行輸出結果 [2020-03-16 23:58:25] - FixedRateTask觸發... [2020-03-16 23:58:30] - FixedRateTask觸發... ......
在SpringBoot
註解體系下,Scheduling
模塊的全部邏輯基本在ScheduledAnnotationBeanPostProcessor
和ScheduledTaskRegistrar
中。通常來講,一個類實現的接口表明了它能提供的功能,先看ScheduledAnnotationBeanPostProcessor
實現的接口:
ScheduledTaskHolder
接口:返回Set<ScheduledTask>
,表示持有的全部任務實例。MergedBeanDefinitionPostProcessor
接口:Bean
定義合併時回調,預留空實現,暫時不作任何處理。BeanPostProcessor
接口:也就是MergedBeanDefinitionPostProcessor
的父接口,Bean
實例初始化先後分別回調,其中,後回調的postProcessAfterInitialization()
方法就是用於解析@Scheduled
和裝載ScheduledTask
,須要重點關注此方法的邏輯。DestructionAwareBeanPostProcessor
接口:具體的Bean
實例銷燬的時候回調,用於Bean
實例銷燬的時候移除和取消對應的任務實例。Ordered
接口:用於Bean
加載時候的排序,主要是改變ScheduledAnnotationBeanPostProcessor
在BeanPostProcessor
執行鏈中的順序。EmbeddedValueResolverAware
接口:回調StringValueResolver
實例,用於解析帶佔位符的環境變量屬性值。BeanNameAware
接口:回調BeanName
。BeanFactoryAware
接口:回調BeanFactory
實例,具體是DefaultListableBeanFactory
,也就是熟知的IOC
容器。ApplicationContextAware
接口:回調ApplicationContext
實例,也就是熟知的Spring
上下文,它是IOC
容器的門面,同時是事件廣播器、資源加載器的實現等等。SmartInitializingSingleton
接口:全部單例實例化完畢以後回調,做用是在持有的applicationContext
爲NULL
的時候開始調度全部加載完成的任務,這個鉤子接口十分有用,筆者經常使用它作一些資源初始化工做。ApplicationListener
接口:監聽Spring
應用的事件,具體是ApplicationListener<ContextRefreshedEvent>
,監聽上下文刷新的事件,若是事件中攜帶的ApplicationContext
實例和ApplicationContextAware
回調的ApplicationContext
實例一致,那麼在此監聽回調方法中開始調度全部加載完成的任務,也就是在ScheduledAnnotationBeanPostProcessor
這個類中,SmartInitializingSingleton
接口的實現和ApplicationListener
接口的實現邏輯是互斥的。DisposableBean
接口:當前Bean
實例銷燬時候回調,也就是ScheduledAnnotationBeanPostProcessor
自身被銷燬的時候回調,用於取消和清理全部的ScheduledTask
。上面分析的鉤子接口在SpringBoot體系中能夠按需使用,瞭解回調不一樣鉤子接口的回調時機,能夠在特定時機完成達到理想的效果。
@Scheduled
註解的解析集中在postProcessAfterInitialization()
方法:
public Object postProcessAfterInitialization(Object bean, String beanName) { // 忽略AopInfrastructureBean、TaskScheduler和ScheduledExecutorService三種類型的Bean if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler || bean instanceof ScheduledExecutorService) { // Ignore AOP infrastructure such as scoped proxies. return bean; } // 獲取Bean的用戶態類型,例如Bean有可能被CGLIB加強,這個時候要取其父類 Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); // nonAnnotatedClasses存放着不存在@Scheduled註解的類型,緩存起來避免重複判斷它是否攜帶@Scheduled註解的方法 if (!this.nonAnnotatedClasses.contains(targetClass) && AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) { // 由於JDK8以後支持重複註解,所以獲取具體類型中Method -> @Scheduled的集合,也就是有可能一個方法使用多個@Scheduled註解,最終會封裝爲多個Task Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> { Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledMethods.isEmpty() ? scheduledMethods : null); }); // 解析到類型中不存在@Scheduled註解的方法添加到nonAnnotatedClasses緩存 if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @Scheduled annotations found on bean class: " + targetClass); } } else { // Method -> @Scheduled的集合遍歷processScheduled()方法進行登記 annotatedMethods.forEach((method, scheduledMethods) -> scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean))); if (logger.isTraceEnabled()) { logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; }
processScheduled(Scheduled scheduled, Method method, Object bean)
就是具體的註解解析和Task
封裝的方法:
// Runnable適配器 - 用於反射調用具體的方法,觸發任務方法執行 public class ScheduledMethodRunnable implements Runnable { private final Object target; private final Method method; public ScheduledMethodRunnable(Object target, Method method) { this.target = target; this.method = method; } ....// 省略無關代碼 // 這個就是最終的任務方法執行的核心方法,抑制修飾符,而後反射調用 @Override public void run() { try { ReflectionUtils.makeAccessible(this.method); this.method.invoke(this.target); } catch (InvocationTargetException ex) { ReflectionUtils.rethrowRuntimeException(ex.getTargetException()); } catch (IllegalAccessException ex) { throw new UndeclaredThrowableException(ex); } } } // 經過方法所在Bean實例和方法封裝Runnable適配器ScheduledMethodRunnable實例 protected Runnable createRunnable(Object target, Method method) { Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled"); Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass()); return new ScheduledMethodRunnable(target, invocableMethod); } // 這個方法十分長,不過邏輯並不複雜,它只作了四件事 // 0. 解析@Scheduled中的initialDelay、initialDelayString屬性,適用於FixedDelayTask或者FixedRateTask的延遲執行 // 1. 優先解析@Scheduled中的cron屬性,封裝爲CronTask,經過ScheduledTaskRegistrar進行緩存 // 2. 解析@Scheduled中的fixedDelay、fixedDelayString屬性,封裝爲FixedDelayTask,經過ScheduledTaskRegistrar進行緩存 // 3. 解析@Scheduled中的fixedRate、fixedRateString屬性,封裝爲FixedRateTask,經過ScheduledTaskRegistrar進行緩存 protected void processScheduled(Scheduled scheduled, Method method, Object bean) { try { // 經過方法宿主Bean和目標方法封裝Runnable適配器ScheduledMethodRunnable實例 Runnable runnable = createRunnable(bean, method); boolean processedSchedule = false; String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required"; // 緩存已經裝載的任務 Set<ScheduledTask> tasks = new LinkedHashSet<>(4); // Determine initial delay // 解析初始化延遲執行時間,initialDelayString支持佔位符配置,若是initialDelayString配置了,會覆蓋initialDelay的值 long initialDelay = scheduled.initialDelay(); String initialDelayString = scheduled.initialDelayString(); if (StringUtils.hasText(initialDelayString)) { Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both"); if (this.embeddedValueResolver != null) { initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString); } if (StringUtils.hasLength(initialDelayString)) { try { initialDelay = parseDelayAsLong(initialDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long"); } } } // Check cron expression // 解析時區zone的值,支持支持佔位符配置,判斷cron是否存在,存在則裝載爲CronTask String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { String zone = scheduled.zone(); if (this.embeddedValueResolver != null) { cron = this.embeddedValueResolver.resolveStringValue(cron); zone = this.embeddedValueResolver.resolveStringValue(zone); } if (StringUtils.hasLength(cron)) { Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers"); processedSchedule = true; if (!Scheduled.CRON_DISABLED.equals(cron)) { TimeZone timeZone; if (StringUtils.hasText(zone)) { timeZone = StringUtils.parseTimeZoneString(zone); } else { timeZone = TimeZone.getDefault(); } // 此方法雖然表面上是調度CronTask,實際上因爲ScheduledTaskRegistrar不持有TaskScheduler,只是把任務添加到它的緩存中 // 返回的任務實例添加到宿主Bean的緩存中,而後最後會放入宿主Bean -> List<ScheduledTask>映射中 tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } } } // At this point we don't need to differentiate between initial delay set or not anymore // 修正小於0的初始化延遲執行時間值爲0 if (initialDelay < 0) { initialDelay = 0; } // 解析fixedDelay和fixedDelayString,若是同時配置,fixedDelayString最終解析出來的整數值會覆蓋fixedDelay,封裝爲FixedDelayTask long fixedDelay = scheduled.fixedDelay(); if (fixedDelay >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } String fixedDelayString = scheduled.fixedDelayString(); if (StringUtils.hasText(fixedDelayString)) { if (this.embeddedValueResolver != null) { fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString); } if (StringUtils.hasLength(fixedDelayString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedDelay = parseDelayAsLong(fixedDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long"); } // 此方法雖然表面上是調度FixedDelayTask,實際上因爲ScheduledTaskRegistrar不持有TaskScheduler,只是把任務添加到它的緩存中 // 返回的任務實例添加到宿主Bean的緩存中,而後最後會放入宿主Bean -> List<ScheduledTask>映射中 tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } } // 解析fixedRate和fixedRateString,若是同時配置,fixedRateString最終解析出來的整數值會覆蓋fixedRate,封裝爲FixedRateTask long fixedRate = scheduled.fixedRate(); if (fixedRate >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } String fixedRateString = scheduled.fixedRateString(); if (StringUtils.hasText(fixedRateString)) { if (this.embeddedValueResolver != null) { fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString); } if (StringUtils.hasLength(fixedRateString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedRate = parseDelayAsLong(fixedRateString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long"); } // 此方法雖然表面上是調度FixedRateTask,實際上因爲ScheduledTaskRegistrar不持有TaskScheduler,只是把任務添加到它的緩存中 // 返回的任務實例添加到宿主Bean的緩存中,而後最後會放入宿主Bean -> List<ScheduledTask>映射中 tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } } // Check whether we had any attribute set Assert.isTrue(processedSchedule, errorMessage); // Finally register the scheduled tasks synchronized (this.scheduledTasks) { // 註冊全部任務實例,這個映射Key爲宿主Bean實例,Value爲List<ScheduledTask>,後面用於調度全部註冊完成的任務 Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4)); regTasks.addAll(tasks); } } catch (IllegalArgumentException ex) { throw new IllegalStateException( "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage()); } }
總的來講,這個方法作了四件事:
@Scheduled
中的initialDelay
、initialDelayString
屬性,適用於FixedDelayTask
或者FixedRateTask
的延遲執行。@Scheduled
中的cron
屬性,封裝爲CronTask
,經過ScheduledTaskRegistrar
進行緩存。@Scheduled
中的fixedDelay
、fixedDelayString
屬性,封裝爲FixedDelayTask
,經過ScheduledTaskRegistrar
進行緩存。@Scheduled
中的fixedRate
、fixedRateString
屬性,封裝爲FixedRateTask
,經過ScheduledTaskRegistrar
進行緩存。@Scheduled
修飾的某個方法若是同時配置了cron
、fixedDelay|fixedDelayString
和fixedRate|fixedRateString
屬性,意味着此方法同時封裝爲三種任務CronTask
、FixedDelayTask
和FixedRateTask
。解析xxString
值的使用,用到了EmbeddedValueResolver
解析字符串的值,支持佔位符,這樣能夠直接獲取環境配置中的佔位符屬性(基於SPEL
的特性,甚至能夠支持嵌套佔位符)。解析成功的全部任務實例存放在ScheduledAnnotationBeanPostProcessor
的一個映射scheduledTasks
中:
// 宿主Bean實例 -> 解析完成的任務實例Set private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16);
解析和緩存工做完成以後,接着分析最終激活全部調度任務的邏輯,見互斥方法afterSingletonsInstantiated()
和onApplicationEvent()
,二者中必定只有一個方法可以調用finishRegistration()
:
// 全部單例實例化完畢以後回調 public void afterSingletonsInstantiated() { // Remove resolved singleton classes from cache this.nonAnnotatedClasses.clear(); if (this.applicationContext == null) { // Not running in an ApplicationContext -> register tasks early... finishRegistration(); } } // 上下文刷新完成以後回調 @Override public void onApplicationEvent(ContextRefreshedEvent event) { if (event.getApplicationContext() == this.applicationContext) { // Running in an ApplicationContext -> register tasks this late... // giving other ContextRefreshedEvent listeners a chance to perform // their work at the same time (e.g. Spring Batch's job registration). finishRegistration(); } } // private void finishRegistration() { // 若是持有的scheduler對象不爲null則設置ScheduledTaskRegistrar中的任務調度器 if (this.scheduler != null) { this.registrar.setScheduler(this.scheduler); } // 這個判斷通常會成立,獲得的BeanFactory就是DefaultListableBeanFactory if (this.beanFactory instanceof ListableBeanFactory) { // 獲取全部的調度配置器SchedulingConfigurer實例,而且都回調configureTasks()方法,這個很重要,它是用戶動態裝載調取任務的擴展鉤子接口 Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class); List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values()); // SchedulingConfigurer實例列表排序 AnnotationAwareOrderComparator.sort(configurers); for (SchedulingConfigurer configurer : configurers) { configurer.configureTasks(this.registrar); } } // 下面這一大段邏輯都是爲了從BeanFactory取出任務調度器實例,主要判斷TaskScheduler或者ScheduledExecutorService類型的Bean,包括嘗試經過類型或者名字獲取 // 獲取成功後設置到ScheduledTaskRegistrar中 if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) { Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type"); try { // Search for TaskScheduler bean... this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false)); } catch (NoUniqueBeanDefinitionException ex) { logger.trace("Could not find unique TaskScheduler bean", ex); try { this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true)); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskScheduler bean exists within the context, and " + "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " + "(possibly as an alias); or implement the SchedulingConfigurer interface and call " + "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { logger.trace("Could not find default TaskScheduler bean", ex); // Search for ScheduledExecutorService bean next... try { this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false)); } catch (NoUniqueBeanDefinitionException ex2) { logger.trace("Could not find unique ScheduledExecutorService bean", ex2); try { this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true)); } catch (NoSuchBeanDefinitionException ex3) { if (logger.isInfoEnabled()) { logger.info("More than one ScheduledExecutorService bean exists within the context, and " + "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " + "(possibly as an alias); or implement the SchedulingConfigurer interface and call " + "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + ex2.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex2) { logger.trace("Could not find default ScheduledExecutorService bean", ex2); // Giving up -> falling back to default scheduler within the registrar... logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing"); } } } // 調用ScheduledTaskRegistrar的afterPropertiesSet()方法,裝載全部的調度任務 this.registrar.afterPropertiesSet(); } public class ScheduledTaskRegistrar implements ScheduledTaskHolder, InitializingBean, DisposableBean { // 省略其餘代碼......... @Override public void afterPropertiesSet() { scheduleTasks(); } // 裝載全部調度任務 @SuppressWarnings("deprecation") protected void scheduleTasks() { // 這裏注意一點,若是找不到任務調度器實例,那麼會用單個線程調度全部任務 if (this.taskScheduler == null) { this.localExecutor = Executors.newSingleThreadScheduledExecutor(); this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); } // 調度全部裝載完畢的自定義觸發器的任務實例 if (this.triggerTasks != null) { for (TriggerTask task : this.triggerTasks) { addScheduledTask(scheduleTriggerTask(task)); } } // 調度全部裝載完畢的CronTask if (this.cronTasks != null) { for (CronTask task : this.cronTasks) { addScheduledTask(scheduleCronTask(task)); } } // 調度全部裝載完畢的FixedRateTask if (this.fixedRateTasks != null) { for (IntervalTask task : this.fixedRateTasks) { addScheduledTask(scheduleFixedRateTask(task)); } } // 調度全部裝載完畢的FixedDelayTask if (this.fixedDelayTasks != null) { for (IntervalTask task : this.fixedDelayTasks) { addScheduledTask(scheduleFixedDelayTask(task)); } } } // 省略其餘代碼......... }
注意兩個個問題:
TaskScheduler
或者ScheduledExecutorService
類型的Bean
,那麼調度模塊只會建立一個線程去調度全部裝載完畢的任務,若是任務比較多,執行密度比較大,頗有可能會形成大量任務飢餓,表現爲存在部分任務不會觸發調度的場景(這個是調度模塊生產中常常遇到的故障,須要重點排查是否沒有設置TaskScheduler
或者ScheduledExecutorService
)。SchedulingConfigurer
是調度模塊提供給使用的進行擴展的鉤子接口,用於在激活全部調度任務以前回調ScheduledTaskRegistrar
實例,只要拿到ScheduledTaskRegistrar
實例,咱們就可使用它註冊和裝載新的Task
。Scheduling
模塊自己已經支持基於NamespaceHandler
支持經過XML
文件配置調度任務,可是筆者一直認爲XML
給人的感受太"重",使用起來顯得太笨重,這裏打算擴展出JSON
文件配置和基於JDBC
數據源配置(也就是持久化任務,這裏選用MySQL
)。根據前文的源碼分析,須要用到SchedulingConfigurer
接口的實現,用於在全部調度任務觸發以前從外部添加自定義的調度任務。先定義調度任務的一些配置屬性類:
// 調度任務類型枚舉 @Getter @RequiredArgsConstructor public enum ScheduleTaskType { CRON("CRON"), FIXED_DELAY("FIXED_DELAY"), FIXED_RATE("FIXED_RATE"), ; private final String type; } // 調度任務配置,enable屬性爲全局開關 @Data public class ScheduleTaskProperties { private Long version; private Boolean enable; private List<ScheduleTasks> tasks; } // 調度任務集合,筆者設計的時候採用一個宿主類中每一個獨立方法都是一個任務實例的模式 @Data public class ScheduleTasks { // 這裏故意叫Klass表明Class,避免關鍵字衝突 private String taskHostKlass; private Boolean enable; private List<ScheduleTaskMethod> taskMethods; } // 調度任務方法 - enable爲任務開關,沒有配置會被ScheduleTaskProperties或者ScheduleTasks中的enable覆蓋 @Data public class ScheduleTaskMethod { private Boolean enable; private String taskDescription; private String taskMethod; // 時區,cron的計算須要用到 private String timeZone; private String cronExpression; private String intervalMilliseconds; private String initialDelayMilliseconds; }
設計的時候,考慮到多個任務執行方法能夠放在同一個宿主類,這樣能夠方便同一種類的任務進行統一管理,如:
public class TaskHostClass { public void task1() { } public void task2() { } ...... public void taskN() { } }
細節方面,intervalMilliseconds
和initialDelayMilliseconds
的單位設計爲毫秒,使用字符串形式,方即可以基於StringValueResolver
解析配置文件中的屬性配置。添加一個抽象的SchedulingConfigurer
:
@Slf4j public abstract class AbstractSchedulingConfigurer implements SchedulingConfigurer, InitializingBean, BeanFactoryAware, EmbeddedValueResolverAware { @Getter private StringValueResolver embeddedValueResolver; private ConfigurableBeanFactory configurableBeanFactory; private final List<InternalTaskProperties> internalTasks = Lists.newLinkedList(); private final Set<String> tasksLoaded = Sets.newHashSet(); @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { configurableBeanFactory = (ConfigurableBeanFactory) beanFactory; } @Override public void afterPropertiesSet() throws Exception { internalTasks.clear(); internalTasks.addAll(loadTaskProperties()); } @Override public void setEmbeddedValueResolver(StringValueResolver resolver) { embeddedValueResolver = resolver; } @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { for (InternalTaskProperties task : internalTasks) { try { synchronized (tasksLoaded) { String key = task.taskHostKlass() + "#" + task.taskMethod(); // 避免重複加載 if (!tasksLoaded.contains(key)) { if (task instanceof CronTaskProperties) { loadCronTask((CronTaskProperties) task, taskRegistrar); } if (task instanceof FixedDelayTaskProperties) { loadFixedDelayTask((FixedDelayTaskProperties) task, taskRegistrar); } if (task instanceof FixedRateTaskProperties) { loadFixedRateTask((FixedRateTaskProperties) task, taskRegistrar); } tasksLoaded.add(key); } else { log.info("調度任務已經裝載,任務宿主類:{},任務執行方法:{}", task.taskHostKlass(), task.taskMethod()); } } } catch (Exception e) { throw new IllegalStateException(String.format("加載調度任務異常,任務宿主類:%s,任務執行方法:%s", task.taskHostKlass(), task.taskMethod()), e); } } } private ScheduledMethodRunnable loadScheduledMethodRunnable(String taskHostKlass, String taskMethod) throws Exception { Class<?> klass = ClassUtils.forName(taskHostKlass, null); Object target = configurableBeanFactory.getBean(klass); Method method = ReflectionUtils.findMethod(klass, taskMethod); if (null == method) { throw new IllegalArgumentException(String.format("找不到目標方法,任務宿主類:%s,任務執行方法:%s", taskHostKlass, taskMethod)); } Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass()); return new ScheduledMethodRunnable(target, invocableMethod); } private void loadCronTask(CronTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception { ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod()); String cronExpression = embeddedValueResolver.resolveStringValue(pops.cronExpression()); if (null != cronExpression) { String timeZoneString = embeddedValueResolver.resolveStringValue(pops.timeZone()); TimeZone timeZone; if (null != timeZoneString) { timeZone = TimeZone.getTimeZone(timeZoneString); } else { timeZone = TimeZone.getDefault(); } CronTask cronTask = new CronTask(runnable, new CronTrigger(cronExpression, timeZone)); taskRegistrar.addCronTask(cronTask); log.info("裝載CronTask[{}#{}()]成功,cron表達式:{},任務描述:{}", cronExpression, pops.taskMethod(), pops.cronExpression(), pops.taskDescription()); } } private void loadFixedDelayTask(FixedDelayTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception { ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod()); long fixedDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds())); long initialDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds())); FixedDelayTask fixedDelayTask = new FixedDelayTask(runnable, fixedDelayMilliseconds, initialDelayMilliseconds); taskRegistrar.addFixedDelayTask(fixedDelayTask); log.info("裝載FixedDelayTask[{}#{}()]成功,固定延遲間隔:{} ms,初始延遲執行時間:{} ms,任務描述:{}", pops.taskHostKlass(), pops.taskMethod(), fixedDelayMilliseconds, initialDelayMilliseconds, pops.taskDescription()); } private void loadFixedRateTask(FixedRateTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception { ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod()); long fixedRateMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds())); long initialDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds())); FixedRateTask fixedRateTask = new FixedRateTask(runnable, fixedRateMilliseconds, initialDelayMilliseconds); taskRegistrar.addFixedRateTask(fixedRateTask); log.info("裝載FixedRateTask[{}#{}()]成功,固定執行頻率:{} ms,初始延遲執行時間:{} ms,任務描述:{}", pops.taskHostKlass(), pops.taskMethod(), fixedRateMilliseconds, initialDelayMilliseconds, pops.taskDescription()); } private long parseDelayAsLong(String value) { if (null == value) { return 0L; } if (value.length() > 1 && (isP(value.charAt(0)) || isP(value.charAt(1)))) { return Duration.parse(value).toMillis(); } return Long.parseLong(value); } private boolean isP(char ch) { return (ch == 'P' || ch == 'p'); } /** * 加載任務配置,預留給子類實現 */ protected abstract List<InternalTaskProperties> loadTaskProperties() throws Exception; interface InternalTaskProperties { String taskHostKlass(); String taskMethod(); String taskDescription(); } @Builder protected static class CronTaskProperties implements InternalTaskProperties { private String taskHostKlass; private String taskMethod; private String cronExpression; private String taskDescription; private String timeZone; @Override public String taskDescription() { return taskDescription; } public String cronExpression() { return cronExpression; } public String timeZone() { return timeZone; } @Override public String taskHostKlass() { return taskHostKlass; } @Override public String taskMethod() { return taskMethod; } } @Builder protected static class FixedDelayTaskProperties implements InternalTaskProperties { private String taskHostKlass; private String taskMethod; private String intervalMilliseconds; private String initialDelayMilliseconds; private String taskDescription; @Override public String taskDescription() { return taskDescription; } public String initialDelayMilliseconds() { return initialDelayMilliseconds; } public String intervalMilliseconds() { return intervalMilliseconds; } @Override public String taskHostKlass() { return taskHostKlass; } @Override public String taskMethod() { return taskMethod; } } @Builder protected static class FixedRateTaskProperties implements InternalTaskProperties { private String taskHostKlass; private String taskMethod; private String intervalMilliseconds; private String initialDelayMilliseconds; private String taskDescription; @Override public String taskDescription() { return taskDescription; } public String initialDelayMilliseconds() { return initialDelayMilliseconds; } public String intervalMilliseconds() { return intervalMilliseconds; } @Override public String taskHostKlass() { return taskHostKlass; } @Override public String taskMethod() { return taskMethod; } } }
loadTaskProperties()
方法用於加載任務配置,留給子類實現。
JSON
配置文件的格式以下(類路徑下的scheduling/tasks.json
文件):
{ "version": 1, "tasks": [ { "taskKlass": "club.throwable.schedule.Tasks", "taskMethods": [ { "taskType": "FIXED_DELAY", "taskDescription": "processTask1任務", "taskMethod": "processTask1", "intervalMilliseconds": "5000" } ] } ] }
每一個層級都有一個enable
屬性,默認爲true
,只有強制指定爲false
的時候纔不會裝載對應的任務調度方法。這裏就是簡單繼承AbstractSchedulingConfigurer
,實現從類路徑加載配置的邏輯,定義JsonSchedulingConfigurer
:
public class JsonSchedulingConfigurer extends AbstractSchedulingConfigurer { // 這裏把默認的任務配置JSON文件放在CLASSPATH下的scheduling/tasks.json,能夠經過配置項scheduling.json.config.location進行覆蓋 @Value("${scheduling.json.config.location:scheduling/tasks.json}") private String location; @Autowired private ObjectMapper objectMapper; @Override protected List<InternalTaskProperties> loadTaskProperties() throws Exception { ClassPathResource resource = new ClassPathResource(location); String content = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); ScheduleTaskProperties properties = objectMapper.readValue(content, ScheduleTaskProperties.class); if (Boolean.FALSE.equals(properties.getEnable()) || null == properties.getTasks()) { return Lists.newArrayList(); } List<InternalTaskProperties> target = Lists.newArrayList(); for (ScheduleTasks tasks : properties.getTasks()) { if (null != tasks) { List<ScheduleTaskMethod> taskMethods = tasks.getTaskMethods(); if (null != taskMethods) { for (ScheduleTaskMethod taskMethod : taskMethods) { if (!Boolean.FALSE.equals(taskMethod.getEnable())) { if (ScheduleTaskType.CRON == taskMethod.getTaskType()) { target.add(CronTaskProperties.builder() .taskMethod(taskMethod.getTaskMethod()) .cronExpression(taskMethod.getCronExpression()) .timeZone(taskMethod.getTimeZone()) .taskDescription(taskMethod.getTaskDescription()) .taskHostKlass(tasks.getTaskKlass()) .build()); } if (ScheduleTaskType.FIXED_DELAY == taskMethod.getTaskType()) { target.add(FixedDelayTaskProperties.builder() .taskMethod(taskMethod.getTaskMethod()) .intervalMilliseconds(taskMethod.getIntervalMilliseconds()) .initialDelayMilliseconds(taskMethod.getInitialDelayMilliseconds()) .taskDescription(taskMethod.getTaskDescription()) .taskHostKlass(tasks.getTaskKlass()) .build()); } if (ScheduleTaskType.FIXED_RATE == taskMethod.getTaskType()) { target.add(FixedRateTaskProperties.builder() .taskMethod(taskMethod.getTaskMethod()) .intervalMilliseconds(taskMethod.getIntervalMilliseconds()) .initialDelayMilliseconds(taskMethod.getInitialDelayMilliseconds()) .taskDescription(taskMethod.getTaskDescription()) .taskHostKlass(tasks.getTaskKlass()) .build()); } } } } } } return target; } }
添加一個配置類和任務類:
@Configuration public class SchedulingAutoConfiguration { @Bean public JsonSchedulingConfigurer jsonSchedulingConfigurer(){ return new JsonSchedulingConfigurer(); } } // club.throwable.schedule.Tasks @Slf4j @Component public class Tasks { public void processTask1() { log.info("processTask1觸發.........."); } }
啓動SpringBoot
應用,某次執行的部分日誌以下:
2020-03-22 16:24:17.248 INFO 22836 --- [ main] c.t.s.AbstractSchedulingConfigurer : 裝載FixedDelayTask[club.throwable.schedule.Tasks#processTask1()]成功,固定延遲間隔:5000 ms,初始延遲執行時間:0 ms,任務描述:processTask1任務 2020-03-22 16:24:22.275 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1觸發.......... 2020-03-22 16:24:27.277 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1觸發.......... 2020-03-22 16:24:32.279 INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1觸發.......... ......
這裏有些細節沒有完善,例如配置文件參數的一些非空判斷、配置值是否合法等等校驗邏輯沒有作,若是要設計成一個工業級的類庫,這些方面必需要考慮。
JDBC
數據源這裏用MySQL
舉例說明,先建一個調度任務配置表scheduling_task
:
CREATE TABLE `schedule_task` ( id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT COMMENT '主鍵', edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '建立時間', editor VARCHAR(32) NOT NULL DEFAULT 'admin' COMMENT '修改者', creator VARCHAR(32) NOT NULL DEFAULT 'admin' COMMENT '建立者', deleted BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '軟刪除標識', task_host_class VARCHAR(256) NOT NULL COMMENT '任務宿主類全類名', task_method VARCHAR(128) NOT NULL COMMENT '任務執行方法名', task_type VARCHAR(16) NOT NULL COMMENT '任務類型', task_description VARCHAR(64) NOT NULL COMMENT '任務描述', cron_expression VARCHAR(128) COMMENT 'cron表達式', time_zone VARCHAR(32) COMMENT '時區', interval_milliseconds BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '執行間隔時間', initial_delay_milliseconds BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '初始延遲執行時間', UNIQUE uniq_class_method (task_host_class, task_method) ) COMMENT '調度任務配置表';
其實具體的作法和JSON
配置差很少,先引入spring-boot-starter-jdbc
,接着編寫MysqlSchedulingConfigurer
:
// DAO @RequiredArgsConstructor public class MysqlScheduleTaskDao { private final JdbcTemplate jdbcTemplate; private static final ResultSetExtractor<List<ScheduleTask>> MULTI = r -> { List<ScheduleTask> tasks = Lists.newArrayList(); while (r.next()) { ScheduleTask task = new ScheduleTask(); tasks.add(task); task.setId(r.getLong("id")); task.setCronExpression(r.getString("cron_expression")); task.setInitialDelayMilliseconds(r.getLong("initial_delay_milliseconds")); task.setIntervalMilliseconds(r.getLong("interval_milliseconds")); task.setTimeZone(r.getString("time_zone")); task.setTaskDescription(r.getString("task_description")); task.setTaskHostClass(r.getString("task_host_class")); task.setTaskMethod(r.getString("task_method")); task.setTaskType(r.getString("task_type")); } return tasks; }; public List<ScheduleTask> selectAllTasks() { return jdbcTemplate.query("SELECT * FROM schedule_task WHERE deleted = 0", MULTI); } } // MysqlSchedulingConfigurer @RequiredArgsConstructor public class MysqlSchedulingConfigurer extends AbstractSchedulingConfigurer { private final MysqlScheduleTaskDao mysqlScheduleTaskDao; @Override protected List<InternalTaskProperties> loadTaskProperties() throws Exception { List<InternalTaskProperties> target = Lists.newArrayList(); List<ScheduleTask> tasks = mysqlScheduleTaskDao.selectAllTasks(); if (!tasks.isEmpty()) { for (ScheduleTask task : tasks) { ScheduleTaskType scheduleTaskType = ScheduleTaskType.fromType(task.getTaskType()); if (ScheduleTaskType.CRON == scheduleTaskType) { target.add(CronTaskProperties.builder() .taskMethod(task.getTaskMethod()) .cronExpression(task.getCronExpression()) .timeZone(task.getTimeZone()) .taskDescription(task.getTaskDescription()) .taskHostKlass(task.getTaskHostClass()) .build()); } if (ScheduleTaskType.FIXED_DELAY == scheduleTaskType) { target.add(FixedDelayTaskProperties.builder() .taskMethod(task.getTaskMethod()) .intervalMilliseconds(String.valueOf(task.getIntervalMilliseconds())) .initialDelayMilliseconds(String.valueOf(task.getInitialDelayMilliseconds())) .taskDescription(task.getTaskDescription()) .taskHostKlass(task.getTaskHostClass()) .build()); } if (ScheduleTaskType.FIXED_RATE == scheduleTaskType) { target.add(FixedRateTaskProperties.builder() .taskMethod(task.getTaskMethod()) .intervalMilliseconds(String.valueOf(task.getIntervalMilliseconds())) .initialDelayMilliseconds(String.valueOf(task.getInitialDelayMilliseconds())) .taskDescription(task.getTaskDescription()) .taskHostKlass(task.getTaskHostClass()) .build()); } } } return target; } }
記得引入spring-boot-starter-jdbc
和mysql-connector-java
而且激活MysqlSchedulingConfigurer
配置。插入一條記錄:
INSERT INTO `schedule_task`(`id`, `edit_time`, `create_time`, `editor`, `creator`, `deleted`, `task_host_class`, `task_method`, `task_type`, `task_description`, `cron_expression`, `time_zone`, `interval_milliseconds`, `initial_delay_milliseconds`) VALUES (1, '2020-03-30 23:46:10', '2020-03-30 23:46:10', 'admin', 'admin', 0, 'club.throwable.schedule.Tasks', 'processTask1', 'FIXED_DELAY', '測試任務', NULL, NULL, 10000, 5000);
而後啓動服務,某次執行的輸出:
2020-03-30 23:47:27.376 INFO 53120 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1觸發.......... 2020-03-30 23:47:37.378 INFO 53120 --- [pool-1-thread-1] club.throwable.schedule.Tasks : processTask1觸發.......... ....
有些時候咱們但願能夠JSON
配置和JDBC
數據源配置進行混合配置,或者動態二選一以便靈活應對多環境的場景(例如要在開發環境使用JSON
配置而測試和生產環境使用JDBC
數據源配置,甚至能夠將JDBC
數據源配置覆蓋JSON
配置,這樣能保證老是傾向於使用JDBC
數據源配置),這樣須要對前面兩小節的實現加多一層抽象。這裏的設計能夠參考SpringMVC
中的控制器參數解析器的設計,具體是HandlerMethodArgumentResolverComposite
,其實道理是相同的。
在生產實踐中,暫時不考慮生成任務執行日誌和細粒度的監控,着重作了兩件事:
通常狀況下,咱們須要禁止任務併發執行,考慮引入Redisson
提供的分佈式鎖:
// 引入依賴 <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>最新版本</version> </dependency> // 配置類 @Configuration @AutoConfigureAfter(RedisAutoConfiguration.class) public class RedissonAutoConfiguration { @Autowired private RedisProperties redisProperties; @Bean(destroyMethod = "shutdown") public RedissonClient redissonClient() { Config config = new Config(); SingleServerConfig singleServerConfig = config.useSingleServer(); singleServerConfig.setAddress(String.format("redis://%s:%d", redisProperties.getHost(), redisProperties.getPort())); if (redisProperties.getDatabase() > 0) { singleServerConfig.setDatabase(redisProperties.getDatabase()); } if (null != redisProperties.getPassword()) { singleServerConfig.setPassword(redisProperties.getPassword()); } return Redisson.create(config); } } // 分佈式鎖工廠 @Component public class DistributedLockFactory { private static final String DISTRIBUTED_LOCK_PATH_PREFIX = "dl:"; @Autowired private RedissonClient redissonClient; public DistributedLock provideDistributedLock(String lockKey) { String lockPath = DISTRIBUTED_LOCK_PATH_PREFIX + lockKey; return new RedissonDistributedLock(redissonClient, lockPath); } }
這裏考慮到項目依賴了spring-boot-starter-redis
,直接複用了它的配置屬性類(RedissonDistributedLock
是RLock
的輕量級封裝,見附錄)。使用方式以下:
@Autowired private DistributedLockFactory distributedLockFactory; public void task1() { DistributedLock lock = distributedLockFactory.provideDistributedLock(lockKey); // 等待時間爲20秒,持有鎖的最大時間爲60秒 boolean tryLock = lock.tryLock(20L, 60, TimeUnit.SECONDS); if (tryLock) { try { // 業務邏輯 }finally { lock.unlock(); } } }
MDC
實際上是Mapped Diagnostic Context
的縮寫,也就是映射診斷上下文,通常用於日誌框架裏面同一個線程執行過程的跟蹤(例如一個線程跑過了多個方法,各個方法裏面都打印了日誌,那麼經過MDC
能夠對整個調用鏈經過一個惟一標識關聯起來),例如這裏選用slf4j
提供的org.slf4j.MDC
:
@Component public class MappedDiagnosticContextAssistant { /** * 在MDC中執行 * * @param runnable runnable */ public void processInMappedDiagnosticContext(Runnable runnable) { String uuid = UUID.randomUUID().toString(); MDC.put("TRACE_ID", uuid); try { runnable.run(); } finally { MDC.remove("TRACE_ID"); } } }
任務執行的時候須要包裹成一個Runnale
實例:
public void task1() { mappedDiagnosticContextAssistant.processInMappedDiagnosticContext(() -> { StopWatch watch = new StopWatch(); watch.start(); log.info("開始執行......"); // 業務邏輯 watch.stop(); log.info("執行完畢,耗時:{} ms......", watch.getTotalTimeMillis()); }); }
結合前面一節提到的併發控制,那麼最終執行的任務方法以下:
public void task1() { mappedDiagnosticContextAssistant.processInMappedDiagnosticContext(() -> { StopWatch watch = new StopWatch(); watch.start(); log.info("開始執行......"); scheduleTaskAssistant.executeInDistributedLock("任務分佈式鎖KEY", () -> { // 真實的業務邏輯 }); watch.stop(); log.info("執行完畢,耗時:{} ms......", watch.getTotalTimeMillis()); }); }
這裏的方法看起來比較彆扭,其實能夠直接在任務裝載的時候基於分佈式鎖和MDC
進行封裝,方式相似於ScheduledMethodRunnable
,這裏不作展開,由於要詳細展開篇幅可能比較大(ScheduleTaskAssistant
見附錄)。
其實spring-context
整個調度模塊徹底依賴於TaskScheduler
實現,更底層的是JUC
調度線程池ScheduledThreadPoolExecutor
。若是想要從底層原理理解整個調度模塊的運行原理,那麼就必定要分析ScheduledThreadPoolExecutor
的實現。整篇文章大體介紹了spring-context
調度模塊的加載調度任務的流程,而且基於擴展接口SchedulingConfigurer
擴展出多種自定義配置調度任務的方式,可是考慮到須要在生產環境中運行,那麼免不了須要考慮監控、併發控制、日誌跟蹤等等的功能,可是這樣就會使得整個調度模塊變重,慢慢地就會發現,這個輪子越造越大,越有主流調度框架Quartz
或者Easy Scheduler
的影子。筆者認爲,軟件工程,有些時候要權衡取捨,該拋棄的就應該果斷拋棄,不然老是負重而行,還能走多遠?
參考資料:
SpringBoot
源碼ScheduleTaskAssistant
:
@RequiredArgsConstructor @Component public class ScheduleTaskAssistant { /** * 5秒 */ public static final long DEFAULT_WAIT_TIME = 5L; /** * 30秒 */ public static final long DEFAULT_LEAVE_TIME = 30L; private final DistributedLockFactory distributedLockFactory; /** * 在分佈式鎖中執行 * * @param waitTime 鎖等着時間 * @param leaveTime 鎖持有時間 * @param timeUnit 時間單位 * @param lockKey 鎖的key * @param task 任務對象 */ public void executeInDistributedLock(long waitTime, long leaveTime, TimeUnit timeUnit, String lockKey, Runnable task) { DistributedLock lock = distributedLockFactory.dl(lockKey); boolean tryLock = lock.tryLock(waitTime, leaveTime, timeUnit); if (tryLock) { try { long waitTimeMillis = timeUnit.toMillis(waitTime); long start = System.currentTimeMillis(); task.run(); long end = System.currentTimeMillis(); long cost = end - start; // 預防鎖過早釋放 if (cost < waitTimeMillis) { Sleeper.X.sleep(waitTimeMillis - cost); } } finally { lock.unlock(); } } } /** * 在分佈式鎖中執行 - 使用默認時間 * * @param lockKey 鎖的key * @param task 任務對象 */ public void executeInDistributedLock(String lockKey, Runnable task) { executeInDistributedLock(DEFAULT_WAIT_TIME, DEFAULT_LEAVE_TIME, TimeUnit.SECONDS, lockKey, task); } }
RedissonDistributedLock
:
@Slf4j public class RedissonDistributedLock implements DistributedLock { private final RedissonClient redissonClient; private final String lockPath; private final RLock internalLock; RedissonDistributedLock(RedissonClient redissonClient, String lockPath) { this.redissonClient = redissonClient; this.lockPath = lockPath; this.internalLock = initInternalLock(); } private RLock initInternalLock() { return redissonClient.getLock(lockPath); } @Override public boolean isLock() { return internalLock.isLocked(); } @Override public boolean isHeldByCurrentThread() { return internalLock.isHeldByCurrentThread(); } @Override public void lock(long leaseTime, TimeUnit unit) { internalLock.lock(leaseTime, unit); } @Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) { try { return internalLock.tryLock(waitTime, leaseTime, unit); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(String.format("Acquire lock fail by thread interrupted,path:%s", lockPath), e); } } @Override public void unlock() { try { internalLock.unlock(); } catch (IllegalMonitorStateException ex) { log.warn("Unlock path:{} error for thread status change in concurrency", lockPath, ex); } } }
(本文完 c-7-d e-a-20200324 真是有點滑稽,筆者發現任務持久化最好仍是用現成的工業級調度器,因而基於Quartz作了輕量級封裝,寫了個後臺管理界面,且聽下回分解)