經過源碼理解Spring中@Scheduled的實現原理而且實現調度任務動態裝載

前提

最近的新項目和數據同步相關,有定時調度的需求。以前一直有使用過QuartzXXL-JobEasy Scheduler等調度框架,後來愈加以爲這些框架過重量級了,因而想到了Spring內置的Scheduling模塊。而原生的Scheduling模塊只是內存態的調度模塊,不支持任務的持久化或者配置(配置任務經過@Scheduled註解進行硬編碼,不能抽離到類以外),所以考慮理解Scheduling模塊的底層原理,而且基於此造一個簡單的輪子,使之支持調度任務配置:經過配置文件或者JDBC數據源。java

Scheduling模塊

Scheduling模塊是spring-context依賴下的一個包org.springframework.schedulingmysql

這個模塊的類並很少,有四個子包:web

  • 頂層包的定義了一些通用接口和異常。
  • org.springframework.scheduling.annotation:定義了調度、異步任務相關的註解和解析類,經常使用的註解如@Async@EnableAsync@EnableScheduling@Scheduled
  • org.springframework.scheduling.concurrent:定義了調度任務執行器和相對應的FactoryBean
  • org.springframework.scheduling.config:定義了配置解析、任務具體實現類、調度任務XML配置文件解析相關的解析類。
  • org.springframework.scheduling.support:定義了反射支持類、Cron表達式解析器等工具類。

若是想單獨使用Scheduling,只須要引入spring-context這個依賴。可是如今流行使用SpringBoot,引入spring-boot-starter-web已經集成了spring-context,能夠直接使用Scheduling模塊,筆者編寫本文的時候(2020-03-14SpringBoot的最新版本爲2.2.5.RELEASE,能夠選用此版本進行源碼分析或者生產應用:redis

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
    <spring.boot.version>2.2.5.RELEASE</spring.boot.version>
</properties>
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring.boot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

開啓Scheduling模塊支持只須要在某一個配置類中添加@EnableScheduling註解便可,通常爲了明確模塊的引入,建議在啓動類中使用此註解,如:spring

@EnableScheduling
@SpringBootApplication
public class App {

    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }
}

Scheduling模塊的工做流程

這個圖描述了Scheduling模塊的工做流程,這裏分析一下非XML配置下的流程(右邊的分支):sql

  • 經過註解@EnableScheduling中的@Import引入了SchedulingConfiguration,而SchedulingConfiguration中配置了一個類型爲ScheduledAnnotationBeanPostProcessor名稱爲org.springframework.context.annotation.internalScheduledAnnotationProcessorBean,這裏有個常見的技巧,Spring內部加載的Bean通常會定義名稱爲internalXXXBeanrole會定義爲ROLE_INFRASTRUCTURE = 2
  • Bean後置處理器ScheduledAnnotationBeanPostProcessor會解析和處理每個符合特定類型的Bean中的@Scheduled註解(注意@Scheduled只能使用在方法或者註解上),而且把解析完成的方法封裝爲不一樣類型的Task實例,緩存在ScheduledTaskRegistrar中的。
  • ScheduledAnnotationBeanPostProcessor中的鉤子接口方法afterSingletonsInstantiated()在全部單例初始化完成以後回調觸發,在此方法中設置了ScheduledTaskRegistrar中的任務調度器(TaskScheduler或者ScheduledExecutorService類型)實例,而且調用ScheduledTaskRegistrar#afterPropertiesSet()方法添加全部緩存的Task實例到任務調度器中執行。

任務調度器

Scheduling模塊支持TaskScheduler或者ScheduledExecutorService類型的任務調度器,而ScheduledExecutorService實際上是JDK併發包java.util.concurrent的接口,通常實現類就是調度線程池ScheduledThreadPoolExecutor。實際上,ScheduledExecutorService類型的實例最終會經過適配器模式轉變爲ConcurrentTaskScheduler,因此這裏只須要分析TaskScheduler類型的執行器。shell

  • ThreadPoolTaskScheduler:基於線程池實現的任務執行器,這個是最經常使用的實現,底層依賴於ScheduledThreadPoolExecutor實現。
  • ConcurrentTaskSchedulerTaskScheduler接口和ScheduledExecutorService接口的適配器,若是自定義一個ScheduledThreadPoolExecutor類型的Bean,那麼任務執行器就會適配爲ConcurrentTaskScheduler
  • DefaultManagedTaskSchedulerJDK7引入的JSR-236的支持,能夠經過JNDI配置此調度執行器,通常不多用到,底層也是依賴於ScheduledThreadPoolExecutor實現。

也就是說,內置的三個調度器類型底層都依賴於JUC調度線程池ScheduledThreadPoolExecutor。這裏分析一下頂層接口org.springframework.scheduling.TaskScheduler提供的功能(筆者已經把功能一致的default方法暫時移除):express

// 省略一些功能一致的default方法
public interface TaskScheduler {
     
     // 調度一個任務,經過觸發器實例指定觸發時間週期
     ScheduledFuture<?> schedule(Runnable task, Trigger trigger);
     
     // 指定起始時間調度一個任務 - 單次執行
     ScheduledFuture<?> schedule(Runnable task, Date startTime);

     // 指定固定頻率調度一個任務,period的單位是毫秒
     ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period);
     
     // 指定起始時間和固定頻率調度一個任務,period的單位是毫秒
     ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period);

     // 指定固定延遲間隔調度一個任務,delay的單位是毫秒
     ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay);

    // 指定起始時間和固定延遲間隔調度一個任務,delay的單位是毫秒
     ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay);
}

Task的分類

Scheduling模塊中支持不一樣類型的任務,主要包括下面的3種(解析的優先順序也是以下):編程

  1. Cron表達式任務,支持經過Cron表達式配置執行的週期,對應的任務類型爲org.springframework.scheduling.config.CronTask
  2. 固定延遲間隔任務,也就是上一輪執行完畢後間隔固定週期再執行本輪,依次類推,對應的的任務類型爲org.springframework.scheduling.config.FixedDelayTask
  3. 固定頻率任務,基於固定的間隔時間執行,不會理會上一輪是否執行完畢本輪會照樣執行,對應的的任務類型爲org.springframework.scheduling.config.FixedRateTask

關於這幾類Task,舉幾個簡單的例子。CronTask是經過cron表達式指定執行週期的,而且不支持延遲執行,可使用特殊字符-禁用任務執行:json

// 註解聲明式使用 - 每五秒執行一次,不支持initialDelay
@Scheduled(cron = "*/5 * * * * ?")
public void processTask(){

}

// 註解聲明式使用 - 禁止任務執行
@Scheduled(cron = "-")
public void processTask(){

}

// 編程式使用
public class Tasks {

    static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    public static void main(String[] args) throws Exception {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.initialize();
        CronTask cronTask = new CronTask(() -> {
            System.out.println(String.format("[%s] - CronTask觸發...", F.format(LocalDateTime.now())));
        }, "*/5 * * * * ?");
        taskScheduler.schedule(cronTask.getRunnable(),cronTask.getTrigger());
        Thread.sleep(Integer.MAX_VALUE);
    }
}
// 某次執行輸出結果
[2020-03-16 01:07:00] - CronTask觸發...
[2020-03-16 01:07:05] - CronTask觸發...
......

FixedDelayTask須要配置延遲間隔值(fixedDelay或者fixedDelayString)和可選的起始延遲執行時間(initialDelay或者initialDelayString),這裏注意一點是fixedDelayStringinitialDelayString都支持從EmbeddedValueResolver(簡單理解爲配置文件的屬性處理器)讀取和Duration(例如P2D就是parses as 2 days,表示86400秒)支持格式的解析:

// 註解聲明式使用 - 延遲一秒開始執行,延遲間隔爲5秒
@Scheduled(fixedDelay = 5000, initialDelay = 1000)
public void process(){
        
}

// 註解聲明式使用 - spring-boot配置文件中process.task.fixedDelay=5000  process.task.initialDelay=1000
@Scheduled(fixedDelayString = "${process.task.fixedDelay}", initialDelayString = "${process.task.initialDelay}")
public void process(){
        
}

// 編程式使用
public class Tasks {

    static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    public static void main(String[] args) throws Exception {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.initialize();
        FixedDelayTask fixedDelayTask = new FixedDelayTask(() -> {
            System.out.println(String.format("[%s] - FixedDelayTask觸發...", F.format(LocalDateTime.now())));
        }, 5000, 1000);
        Date startTime = new Date(System.currentTimeMillis() + fixedDelayTask.getInitialDelay());
        taskScheduler.scheduleWithFixedDelay(fixedDelayTask.getRunnable(), startTime, fixedDelayTask.getInterval());
        Thread.sleep(Integer.MAX_VALUE);
    }
}
// 某次執行輸出結果
[2020-03-16 01:06:12] - FixedDelayTask觸發...
[2020-03-16 01:06:17] - FixedDelayTask觸發...
......

FixedRateTask須要配置固定間隔值(fixedRate或者fixedRateString)和可選的起始延遲執行時間(initialDelay或者initialDelayString),這裏注意一點是fixedRateStringinitialDelayString都支持從EmbeddedValueResolver(簡單理解爲配置文件的屬性處理器)讀取和Duration(例如P2D就是parses as 2 days,表示86400秒)支持格式的解析:

// 註解聲明式使用 - 延遲一秒開始執行,每隔5秒執行一次
@Scheduled(fixedRate = 5000, initialDelay = 1000)
public void processTask(){

}

// 註解聲明式使用 - spring-boot配置文件中process.task.fixedRate=5000  process.task.initialDelay=1000
@Scheduled(fixedRateString = "${process.task.fixedRate}", initialDelayString = "${process.task.initialDelay}")
public void process(){
        
}

// 編程式使用
public class Tasks {

    static DateTimeFormatter F = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    public static void main(String[] args) throws Exception {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(10);
        taskScheduler.initialize();
        FixedRateTask fixedRateTask = new FixedRateTask(() -> {
            System.out.println(String.format("[%s] - FixedRateTask觸發...", F.format(LocalDateTime.now())));
        }, 5000, 1000);
        Date startTime = new Date(System.currentTimeMillis() + fixedRateTask.getInitialDelay());
        taskScheduler.scheduleAtFixedRate(fixedRateTask.getRunnable(), startTime, fixedRateTask.getInterval());
        Thread.sleep(Integer.MAX_VALUE);
    }
}
// 某次執行輸出結果
[2020-03-16 23:58:25] - FixedRateTask觸發...
[2020-03-16 23:58:30] - FixedRateTask觸發...
......

簡單分析核心流程的源代碼

SpringBoot註解體系下,Scheduling模塊的全部邏輯基本在ScheduledAnnotationBeanPostProcessorScheduledTaskRegistrar中。通常來講,一個類實現的接口表明了它能提供的功能,先看ScheduledAnnotationBeanPostProcessor實現的接口:

  • ScheduledTaskHolder接口:返回Set<ScheduledTask>,表示持有的全部任務實例。
  • MergedBeanDefinitionPostProcessor接口:Bean定義合併時回調,預留空實現,暫時不作任何處理。
  • BeanPostProcessor接口:也就是MergedBeanDefinitionPostProcessor的父接口,Bean實例初始化先後分別回調,其中,後回調的postProcessAfterInitialization()方法就是用於解析@Scheduled和裝載ScheduledTask,須要重點關注此方法的邏輯。
  • DestructionAwareBeanPostProcessor接口:具體的Bean實例銷燬的時候回調,用於Bean實例銷燬的時候移除和取消對應的任務實例。
  • Ordered接口:用於Bean加載時候的排序,主要是改變ScheduledAnnotationBeanPostProcessorBeanPostProcessor執行鏈中的順序。
  • EmbeddedValueResolverAware接口:回調StringValueResolver實例,用於解析帶佔位符的環境變量屬性值。
  • BeanNameAware接口:回調BeanName
  • BeanFactoryAware接口:回調BeanFactory實例,具體是DefaultListableBeanFactory,也就是熟知的IOC容器。
  • ApplicationContextAware接口:回調ApplicationContext實例,也就是熟知的Spring上下文,它是IOC容器的門面,同時是事件廣播器、資源加載器的實現等等。
  • SmartInitializingSingleton接口:全部單例實例化完畢以後回調,做用是在持有的applicationContextNULL的時候開始調度全部加載完成的任務,這個鉤子接口十分有用,筆者經常使用它作一些資源初始化工做。
  • ApplicationListener接口:監聽Spring應用的事件,具體是ApplicationListener<ContextRefreshedEvent>,監聽上下文刷新的事件,若是事件中攜帶的ApplicationContext實例和ApplicationContextAware回調的ApplicationContext實例一致,那麼在此監聽回調方法中開始調度全部加載完成的任務,也就是在ScheduledAnnotationBeanPostProcessor這個類中,SmartInitializingSingleton接口的實現和ApplicationListener接口的實現邏輯是互斥的。
  • DisposableBean接口:當前Bean實例銷燬時候回調,也就是ScheduledAnnotationBeanPostProcessor自身被銷燬的時候回調,用於取消和清理全部的ScheduledTask

上面分析的鉤子接口在SpringBoot體系中能夠按需使用,瞭解回調不一樣鉤子接口的回調時機,能夠在特定時機完成達到理想的效果。

@Scheduled註解的解析集中在postProcessAfterInitialization()方法:

public Object postProcessAfterInitialization(Object bean, String beanName) {
    // 忽略AopInfrastructureBean、TaskScheduler和ScheduledExecutorService三種類型的Bean
    if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
            bean instanceof ScheduledExecutorService) {
        // Ignore AOP infrastructure such as scoped proxies.
        return bean;
    }
    // 獲取Bean的用戶態類型,例如Bean有可能被CGLIB加強,這個時候要取其父類
    Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
    // nonAnnotatedClasses存放着不存在@Scheduled註解的類型,緩存起來避免重複判斷它是否攜帶@Scheduled註解的方法
    if (!this.nonAnnotatedClasses.contains(targetClass) &&
            AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
        // 由於JDK8以後支持重複註解,所以獲取具體類型中Method -> @Scheduled的集合,也就是有可能一個方法使用多個@Scheduled註解,最終會封裝爲多個Task
        Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
                    Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                            method, Scheduled.class, Schedules.class);
                    return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
                });
        // 解析到類型中不存在@Scheduled註解的方法添加到nonAnnotatedClasses緩存
        if (annotatedMethods.isEmpty()) {
            this.nonAnnotatedClasses.add(targetClass);
            if (logger.isTraceEnabled()) {
                logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
            }
        }
        else {
            // Method -> @Scheduled的集合遍歷processScheduled()方法進行登記
            annotatedMethods.forEach((method, scheduledMethods) ->
                    scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
            if (logger.isTraceEnabled()) {
                logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
                        "': " + annotatedMethods);
            }
        }
    }
    return bean;
}

processScheduled(Scheduled scheduled, Method method, Object bean)就是具體的註解解析和Task封裝的方法:

// Runnable適配器 - 用於反射調用具體的方法,觸發任務方法執行
public class ScheduledMethodRunnable implements Runnable {

	private final Object target;

	private final Method method;

	public ScheduledMethodRunnable(Object target, Method method) {
		this.target = target;
		this.method = method;
	}
        ....// 省略無關代碼

        // 這個就是最終的任務方法執行的核心方法,抑制修飾符,而後反射調用
	@Override
	public void run() {
		try {
			ReflectionUtils.makeAccessible(this.method);
			this.method.invoke(this.target);
		}
		catch (InvocationTargetException ex) {
			ReflectionUtils.rethrowRuntimeException(ex.getTargetException());
		}
		catch (IllegalAccessException ex) {
			throw new UndeclaredThrowableException(ex);
		}
	}    
}

// 經過方法所在Bean實例和方法封裝Runnable適配器ScheduledMethodRunnable實例
protected Runnable createRunnable(Object target, Method method) {
	Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
	Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
	return new ScheduledMethodRunnable(target, invocableMethod);
}


// 這個方法十分長,不過邏輯並不複雜,它只作了四件事
// 0. 解析@Scheduled中的initialDelay、initialDelayString屬性,適用於FixedDelayTask或者FixedRateTask的延遲執行
// 1. 優先解析@Scheduled中的cron屬性,封裝爲CronTask,經過ScheduledTaskRegistrar進行緩存
// 2. 解析@Scheduled中的fixedDelay、fixedDelayString屬性,封裝爲FixedDelayTask,經過ScheduledTaskRegistrar進行緩存
// 3. 解析@Scheduled中的fixedRate、fixedRateString屬性,封裝爲FixedRateTask,經過ScheduledTaskRegistrar進行緩存
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
    try {
        // 經過方法宿主Bean和目標方法封裝Runnable適配器ScheduledMethodRunnable實例
        Runnable runnable = createRunnable(bean, method);
        boolean processedSchedule = false;
        String errorMessage =
                "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
        
        // 緩存已經裝載的任務
        Set<ScheduledTask> tasks = new LinkedHashSet<>(4);

        // Determine initial delay
        // 解析初始化延遲執行時間,initialDelayString支持佔位符配置,若是initialDelayString配置了,會覆蓋initialDelay的值
        long initialDelay = scheduled.initialDelay();
        String initialDelayString = scheduled.initialDelayString();
        if (StringUtils.hasText(initialDelayString)) {
            Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
            if (this.embeddedValueResolver != null) {
                initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
            }
            if (StringUtils.hasLength(initialDelayString)) {
                try {
                    initialDelay = parseDelayAsLong(initialDelayString);
                }
                catch (RuntimeException ex) {
                    throw new IllegalArgumentException(
                            "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
                }
            }
        }

        // Check cron expression
        // 解析時區zone的值,支持支持佔位符配置,判斷cron是否存在,存在則裝載爲CronTask
        String cron = scheduled.cron();
        if (StringUtils.hasText(cron)) {
            String zone = scheduled.zone();
            if (this.embeddedValueResolver != null) {
                cron = this.embeddedValueResolver.resolveStringValue(cron);
                zone = this.embeddedValueResolver.resolveStringValue(zone);
            }
            if (StringUtils.hasLength(cron)) {
                Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
                processedSchedule = true;
                if (!Scheduled.CRON_DISABLED.equals(cron)) {
                    TimeZone timeZone;
                    if (StringUtils.hasText(zone)) {
                        timeZone = StringUtils.parseTimeZoneString(zone);
                    }
                    else {
                        timeZone = TimeZone.getDefault();
                    }
                    // 此方法雖然表面上是調度CronTask,實際上因爲ScheduledTaskRegistrar不持有TaskScheduler,只是把任務添加到它的緩存中
                    // 返回的任務實例添加到宿主Bean的緩存中,而後最後會放入宿主Bean -> List<ScheduledTask>映射中
                    tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
                }
            }
        }

        // At this point we don't need to differentiate between initial delay set or not anymore
        // 修正小於0的初始化延遲執行時間值爲0
        if (initialDelay < 0) {
            initialDelay = 0;
        }

        // 解析fixedDelay和fixedDelayString,若是同時配置,fixedDelayString最終解析出來的整數值會覆蓋fixedDelay,封裝爲FixedDelayTask
        long fixedDelay = scheduled.fixedDelay();
        if (fixedDelay >= 0) {
            Assert.isTrue(!processedSchedule, errorMessage);
            processedSchedule = true;
            tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
        }
        String fixedDelayString = scheduled.fixedDelayString();
        if (StringUtils.hasText(fixedDelayString)) {
            if (this.embeddedValueResolver != null) {
                fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
            }
            if (StringUtils.hasLength(fixedDelayString)) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                try {
                    fixedDelay = parseDelayAsLong(fixedDelayString);
                }
                catch (RuntimeException ex) {
                    throw new IllegalArgumentException(
                            "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
                }
                // 此方法雖然表面上是調度FixedDelayTask,實際上因爲ScheduledTaskRegistrar不持有TaskScheduler,只是把任務添加到它的緩存中
                // 返回的任務實例添加到宿主Bean的緩存中,而後最後會放入宿主Bean -> List<ScheduledTask>映射中
                tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
            }
        }

        // 解析fixedRate和fixedRateString,若是同時配置,fixedRateString最終解析出來的整數值會覆蓋fixedRate,封裝爲FixedRateTask
        long fixedRate = scheduled.fixedRate();
        if (fixedRate >= 0) {
            Assert.isTrue(!processedSchedule, errorMessage);
            processedSchedule = true;
            tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
        }
        String fixedRateString = scheduled.fixedRateString();
        if (StringUtils.hasText(fixedRateString)) {
            if (this.embeddedValueResolver != null) {
                fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
            }
            if (StringUtils.hasLength(fixedRateString)) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                try {
                    fixedRate = parseDelayAsLong(fixedRateString);
                }
                catch (RuntimeException ex) {
                    throw new IllegalArgumentException(
                            "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
                }
                 // 此方法雖然表面上是調度FixedRateTask,實際上因爲ScheduledTaskRegistrar不持有TaskScheduler,只是把任務添加到它的緩存中
                // 返回的任務實例添加到宿主Bean的緩存中,而後最後會放入宿主Bean -> List<ScheduledTask>映射中
                tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
            }
        }

        // Check whether we had any attribute set
        Assert.isTrue(processedSchedule, errorMessage);

        // Finally register the scheduled tasks
        synchronized (this.scheduledTasks) {
            // 註冊全部任務實例,這個映射Key爲宿主Bean實例,Value爲List<ScheduledTask>,後面用於調度全部註冊完成的任務
            Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
            regTasks.addAll(tasks);
        }
    }
    catch (IllegalArgumentException ex) {
        throw new IllegalStateException(
                "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
    }
}

總的來講,這個方法作了四件事:

  • 解析@Scheduled中的initialDelayinitialDelayString屬性,適用於FixedDelayTask或者FixedRateTask的延遲執行。
  • 優先解析@Scheduled中的cron屬性,封裝爲CronTask,經過ScheduledTaskRegistrar進行緩存。
  • 解析@Scheduled中的fixedDelayfixedDelayString屬性,封裝爲FixedDelayTask,經過ScheduledTaskRegistrar進行緩存。
  • 解析@Scheduled中的fixedRatefixedRateString屬性,封裝爲FixedRateTask,經過ScheduledTaskRegistrar進行緩存。

@Scheduled修飾的某個方法若是同時配置了cronfixedDelay|fixedDelayStringfixedRate|fixedRateString屬性,意味着此方法同時封裝爲三種任務CronTaskFixedDelayTaskFixedRateTask。解析xxString值的使用,用到了EmbeddedValueResolver解析字符串的值,支持佔位符,這樣能夠直接獲取環境配置中的佔位符屬性(基於SPEL的特性,甚至能夠支持嵌套佔位符)。解析成功的全部任務實例存放在ScheduledAnnotationBeanPostProcessor的一個映射scheduledTasks中:

// 宿主Bean實例 -> 解析完成的任務實例Set
private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16);

解析和緩存工做完成以後,接着分析最終激活全部調度任務的邏輯,見互斥方法afterSingletonsInstantiated()onApplicationEvent(),二者中必定只有一個方法可以調用finishRegistration()

// 全部單例實例化完畢以後回調
public void afterSingletonsInstantiated() {
    // Remove resolved singleton classes from cache
    this.nonAnnotatedClasses.clear();

    if (this.applicationContext == null) {
        // Not running in an ApplicationContext -> register tasks early...
        finishRegistration();
    }
}

// 上下文刷新完成以後回調
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
    if (event.getApplicationContext() == this.applicationContext) {
        // Running in an ApplicationContext -> register tasks this late...
        // giving other ContextRefreshedEvent listeners a chance to perform
        // their work at the same time (e.g. Spring Batch's job registration).
        finishRegistration();
    }
}

// 
private void finishRegistration() {
    // 若是持有的scheduler對象不爲null則設置ScheduledTaskRegistrar中的任務調度器
    if (this.scheduler != null) {
        this.registrar.setScheduler(this.scheduler);
    }
    // 這個判斷通常會成立,獲得的BeanFactory就是DefaultListableBeanFactory
    if (this.beanFactory instanceof ListableBeanFactory) {
        // 獲取全部的調度配置器SchedulingConfigurer實例,而且都回調configureTasks()方法,這個很重要,它是用戶動態裝載調取任務的擴展鉤子接口
        Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
        List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
        // SchedulingConfigurer實例列表排序
        AnnotationAwareOrderComparator.sort(configurers);
        for (SchedulingConfigurer configurer : configurers) {
            configurer.configureTasks(this.registrar);
        }
    }
    // 下面這一大段邏輯都是爲了從BeanFactory取出任務調度器實例,主要判斷TaskScheduler或者ScheduledExecutorService類型的Bean,包括嘗試經過類型或者名字獲取
    // 獲取成功後設置到ScheduledTaskRegistrar中
    if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
        Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
        try {
            // Search for TaskScheduler bean...
            this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
        }
        catch (NoUniqueBeanDefinitionException ex) {
            logger.trace("Could not find unique TaskScheduler bean", ex);
            try {
                this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
            }
            catch (NoSuchBeanDefinitionException ex2) {
                if (logger.isInfoEnabled()) {
                    logger.info("More than one TaskScheduler bean exists within the context, and " +
                            "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
                            "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
                            "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
                            ex.getBeanNamesFound());
                }
            }
        }
        catch (NoSuchBeanDefinitionException ex) {
            logger.trace("Could not find default TaskScheduler bean", ex);
            // Search for ScheduledExecutorService bean next...
            try {
                this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
            }
            catch (NoUniqueBeanDefinitionException ex2) {
                logger.trace("Could not find unique ScheduledExecutorService bean", ex2);
                try {
                    this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
                }
                catch (NoSuchBeanDefinitionException ex3) {
                    if (logger.isInfoEnabled()) {
                        logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
                                "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
                                "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
                                "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
                                ex2.getBeanNamesFound());
                    }
                }
            }
            catch (NoSuchBeanDefinitionException ex2) {
                logger.trace("Could not find default ScheduledExecutorService bean", ex2);
                // Giving up -> falling back to default scheduler within the registrar...
                logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
            }
        }
    }
    // 調用ScheduledTaskRegistrar的afterPropertiesSet()方法,裝載全部的調度任務
    this.registrar.afterPropertiesSet();
}

public class ScheduledTaskRegistrar implements ScheduledTaskHolder, InitializingBean, DisposableBean {

    // 省略其餘代碼.........

    @Override
    public void afterPropertiesSet() {
        scheduleTasks();
    }

    // 裝載全部調度任務
    @SuppressWarnings("deprecation")
    protected void scheduleTasks() {
        // 這裏注意一點,若是找不到任務調度器實例,那麼會用單個線程調度全部任務
        if (this.taskScheduler == null) {
            this.localExecutor = Executors.newSingleThreadScheduledExecutor();
            this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
        }
        // 調度全部裝載完畢的自定義觸發器的任務實例
        if (this.triggerTasks != null) {
            for (TriggerTask task : this.triggerTasks) {
                addScheduledTask(scheduleTriggerTask(task));
            }
        }
        // 調度全部裝載完畢的CronTask
        if (this.cronTasks != null) {
            for (CronTask task : this.cronTasks) {
                addScheduledTask(scheduleCronTask(task));
            }
        }
        // 調度全部裝載完畢的FixedRateTask
        if (this.fixedRateTasks != null) {
            for (IntervalTask task : this.fixedRateTasks) {
                addScheduledTask(scheduleFixedRateTask(task));
            }
        }
        // 調度全部裝載完畢的FixedDelayTask
        if (this.fixedDelayTasks != null) {
            for (IntervalTask task : this.fixedDelayTasks) {
                addScheduledTask(scheduleFixedDelayTask(task));
            }
        }
    }   

    // 省略其餘代碼......... 
}

注意兩個個問題:

  • 若是沒有配置TaskScheduler或者ScheduledExecutorService類型的Bean,那麼調度模塊只會建立一個線程去調度全部裝載完畢的任務,若是任務比較多,執行密度比較大,頗有可能會形成大量任務飢餓,表現爲存在部分任務不會觸發調度的場景(這個是調度模塊生產中常常遇到的故障,須要重點排查是否沒有設置TaskScheduler或者ScheduledExecutorService)。
  • SchedulingConfigurer是調度模塊提供給使用的進行擴展的鉤子接口,用於在激活全部調度任務以前回調ScheduledTaskRegistrar實例,只要拿到ScheduledTaskRegistrar實例,咱們就可使用它註冊和裝載新的Task

調度任務動態裝載

Scheduling模塊自己已經支持基於NamespaceHandler支持經過XML文件配置調度任務,可是筆者一直認爲XML給人的感受太"重",使用起來顯得太笨重,這裏打算擴展出JSON文件配置和基於JDBC數據源配置(也就是持久化任務,這裏選用MySQL)。根據前文的源碼分析,須要用到SchedulingConfigurer接口的實現,用於在全部調度任務觸發以前從外部添加自定義的調度任務。先定義調度任務的一些配置屬性類:

// 調度任務類型枚舉
@Getter
@RequiredArgsConstructor
public enum  ScheduleTaskType {

    CRON("CRON"),

    FIXED_DELAY("FIXED_DELAY"),

    FIXED_RATE("FIXED_RATE"),

    ;

    private final String type;
}

// 調度任務配置,enable屬性爲全局開關
@Data
public class ScheduleTaskProperties {

    private Long version;
    private Boolean enable;
    private List<ScheduleTasks> tasks;
}

// 調度任務集合,筆者設計的時候採用一個宿主類中每一個獨立方法都是一個任務實例的模式
@Data
public class ScheduleTasks {
    
    // 這裏故意叫Klass表明Class,避免關鍵字衝突
    private String taskHostKlass;
    private Boolean enable;
    private List<ScheduleTaskMethod> taskMethods;
}

// 調度任務方法 - enable爲任務開關,沒有配置會被ScheduleTaskProperties或者ScheduleTasks中的enable覆蓋
@Data
public class ScheduleTaskMethod {

    private Boolean enable;
    private String taskDescription;
    private String taskMethod;
    // 時區,cron的計算須要用到
    private String timeZone;
    private String cronExpression;
    private String intervalMilliseconds;
    private String initialDelayMilliseconds;
}

設計的時候,考慮到多個任務執行方法能夠放在同一個宿主類,這樣能夠方便同一種類的任務進行統一管理,如:

public class TaskHostClass {

    public void task1() {

    }

    public void task2() {
        
    }

    ......

    public void taskN() {
        
    }
}

細節方面,intervalMillisecondsinitialDelayMilliseconds的單位設計爲毫秒,使用字符串形式,方即可以基於StringValueResolver解析配置文件中的屬性配置。添加一個抽象的SchedulingConfigurer

@Slf4j
public abstract class AbstractSchedulingConfigurer implements SchedulingConfigurer, InitializingBean, BeanFactoryAware,
        EmbeddedValueResolverAware {

    @Getter
    private StringValueResolver embeddedValueResolver;

    private ConfigurableBeanFactory configurableBeanFactory;

    private final List<InternalTaskProperties> internalTasks = Lists.newLinkedList();

    private final Set<String> tasksLoaded = Sets.newHashSet();

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        configurableBeanFactory = (ConfigurableBeanFactory) beanFactory;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        internalTasks.clear();
        internalTasks.addAll(loadTaskProperties());
    }

    @Override
    public void setEmbeddedValueResolver(StringValueResolver resolver) {
        embeddedValueResolver = resolver;
    }

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        for (InternalTaskProperties task : internalTasks) {
            try {
                synchronized (tasksLoaded) {
                    String key = task.taskHostKlass() + "#" + task.taskMethod();
                    // 避免重複加載
                    if (!tasksLoaded.contains(key)) {
                        if (task instanceof CronTaskProperties) {
                            loadCronTask((CronTaskProperties) task, taskRegistrar);
                        }
                        if (task instanceof FixedDelayTaskProperties) {
                            loadFixedDelayTask((FixedDelayTaskProperties) task, taskRegistrar);
                        }
                        if (task instanceof FixedRateTaskProperties) {
                            loadFixedRateTask((FixedRateTaskProperties) task, taskRegistrar);
                        }
                        tasksLoaded.add(key);
                    } else {
                        log.info("調度任務已經裝載,任務宿主類:{},任務執行方法:{}", task.taskHostKlass(), task.taskMethod());
                    }
                }
            } catch (Exception e) {
                throw new IllegalStateException(String.format("加載調度任務異常,任務宿主類:%s,任務執行方法:%s",
                        task.taskHostKlass(), task.taskMethod()), e);
            }
        }
    }

    private ScheduledMethodRunnable loadScheduledMethodRunnable(String taskHostKlass, String taskMethod) throws Exception {
        Class<?> klass = ClassUtils.forName(taskHostKlass, null);
        Object target = configurableBeanFactory.getBean(klass);
        Method method = ReflectionUtils.findMethod(klass, taskMethod);
        if (null == method) {
            throw new IllegalArgumentException(String.format("找不到目標方法,任務宿主類:%s,任務執行方法:%s", taskHostKlass, taskMethod));
        }
        Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
        return new ScheduledMethodRunnable(target, invocableMethod);
    }

    private void loadCronTask(CronTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception {
        ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod());
        String cronExpression = embeddedValueResolver.resolveStringValue(pops.cronExpression());
        if (null != cronExpression) {
            String timeZoneString = embeddedValueResolver.resolveStringValue(pops.timeZone());
            TimeZone timeZone;
            if (null != timeZoneString) {
                timeZone = TimeZone.getTimeZone(timeZoneString);
            } else {
                timeZone = TimeZone.getDefault();
            }
            CronTask cronTask = new CronTask(runnable, new CronTrigger(cronExpression, timeZone));
            taskRegistrar.addCronTask(cronTask);
            log.info("裝載CronTask[{}#{}()]成功,cron表達式:{},任務描述:{}", cronExpression, pops.taskMethod(),
                    pops.cronExpression(), pops.taskDescription());
        }
    }

    private void loadFixedDelayTask(FixedDelayTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception {
        ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod());
        long fixedDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds()));
        long initialDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds()));
        FixedDelayTask fixedDelayTask = new FixedDelayTask(runnable, fixedDelayMilliseconds, initialDelayMilliseconds);
        taskRegistrar.addFixedDelayTask(fixedDelayTask);
        log.info("裝載FixedDelayTask[{}#{}()]成功,固定延遲間隔:{} ms,初始延遲執行時間:{} ms,任務描述:{}", pops.taskHostKlass(),
                pops.taskMethod(), fixedDelayMilliseconds, initialDelayMilliseconds, pops.taskDescription());
    }

    private void loadFixedRateTask(FixedRateTaskProperties pops, ScheduledTaskRegistrar taskRegistrar) throws Exception {
        ScheduledMethodRunnable runnable = loadScheduledMethodRunnable(pops.taskHostKlass(), pops.taskMethod());
        long fixedRateMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.intervalMilliseconds()));
        long initialDelayMilliseconds = parseDelayAsLong(embeddedValueResolver.resolveStringValue(pops.initialDelayMilliseconds()));
        FixedRateTask fixedRateTask = new FixedRateTask(runnable, fixedRateMilliseconds, initialDelayMilliseconds);
        taskRegistrar.addFixedRateTask(fixedRateTask);
        log.info("裝載FixedRateTask[{}#{}()]成功,固定執行頻率:{} ms,初始延遲執行時間:{} ms,任務描述:{}", pops.taskHostKlass(),
                pops.taskMethod(), fixedRateMilliseconds, initialDelayMilliseconds, pops.taskDescription());
    }

    private long parseDelayAsLong(String value) {
        if (null == value) {
            return 0L;
        }
        if (value.length() > 1 && (isP(value.charAt(0)) || isP(value.charAt(1)))) {
            return Duration.parse(value).toMillis();
        }
        return Long.parseLong(value);
    }

    private boolean isP(char ch) {
        return (ch == 'P' || ch == 'p');
    }

    /**
     * 加載任務配置,預留給子類實現
     */
    protected abstract List<InternalTaskProperties> loadTaskProperties() throws Exception;

    interface InternalTaskProperties {

        String taskHostKlass();

        String taskMethod();

        String taskDescription();
    }

    @Builder
    protected static class CronTaskProperties implements InternalTaskProperties {

        private String taskHostKlass;
        private String taskMethod;
        private String cronExpression;
        private String taskDescription;
        private String timeZone;

        @Override
        public String taskDescription() {
            return taskDescription;
        }

        public String cronExpression() {
            return cronExpression;
        }

        public String timeZone() {
            return timeZone;
        }

        @Override
        public String taskHostKlass() {
            return taskHostKlass;
        }

        @Override
        public String taskMethod() {
            return taskMethod;
        }
    }

    @Builder
    protected static class FixedDelayTaskProperties implements InternalTaskProperties {

        private String taskHostKlass;
        private String taskMethod;
        private String intervalMilliseconds;
        private String initialDelayMilliseconds;
        private String taskDescription;

        @Override
        public String taskDescription() {
            return taskDescription;
        }

        public String initialDelayMilliseconds() {
            return initialDelayMilliseconds;
        }

        public String intervalMilliseconds() {
            return intervalMilliseconds;
        }

        @Override
        public String taskHostKlass() {
            return taskHostKlass;
        }

        @Override
        public String taskMethod() {
            return taskMethod;
        }
    }

    @Builder
    protected static class FixedRateTaskProperties implements InternalTaskProperties {

        private String taskHostKlass;
        private String taskMethod;
        private String intervalMilliseconds;
        private String initialDelayMilliseconds;
        private String taskDescription;

        @Override
        public String taskDescription() {
            return taskDescription;
        }

        public String initialDelayMilliseconds() {
            return initialDelayMilliseconds;
        }

        public String intervalMilliseconds() {
            return intervalMilliseconds;
        }

        @Override
        public String taskHostKlass() {
            return taskHostKlass;
        }

        @Override
        public String taskMethod() {
            return taskMethod;
        }
    }
}

loadTaskProperties()方法用於加載任務配置,留給子類實現。

JSON配置

JSON配置文件的格式以下(類路徑下的scheduling/tasks.json文件):

{
  "version": 1,
  "tasks": [
    {
      "taskKlass": "club.throwable.schedule.Tasks",
      "taskMethods": [
        {
          "taskType": "FIXED_DELAY",
          "taskDescription": "processTask1任務",
          "taskMethod": "processTask1",
          "intervalMilliseconds": "5000"
        }
      ]
    }
  ]
}

每一個層級都有一個enable屬性,默認爲true,只有強制指定爲false的時候纔不會裝載對應的任務調度方法。這裏就是簡單繼承AbstractSchedulingConfigurer,實現從類路徑加載配置的邏輯,定義JsonSchedulingConfigurer

public class JsonSchedulingConfigurer extends AbstractSchedulingConfigurer {
    
    // 這裏把默認的任務配置JSON文件放在CLASSPATH下的scheduling/tasks.json,能夠經過配置項scheduling.json.config.location進行覆蓋
    @Value("${scheduling.json.config.location:scheduling/tasks.json}")
    private String location;

    @Autowired
    private ObjectMapper objectMapper;

    @Override
    protected List<InternalTaskProperties> loadTaskProperties() throws Exception {
        ClassPathResource resource = new ClassPathResource(location);
        String content = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8);
        ScheduleTaskProperties properties = objectMapper.readValue(content, ScheduleTaskProperties.class);
        if (Boolean.FALSE.equals(properties.getEnable()) || null == properties.getTasks()) {
            return Lists.newArrayList();
        }
        List<InternalTaskProperties> target = Lists.newArrayList();
        for (ScheduleTasks tasks : properties.getTasks()) {
            if (null != tasks) {
                List<ScheduleTaskMethod> taskMethods = tasks.getTaskMethods();
                if (null != taskMethods) {
                    for (ScheduleTaskMethod taskMethod : taskMethods) {
                        if (!Boolean.FALSE.equals(taskMethod.getEnable())) {
                            if (ScheduleTaskType.CRON == taskMethod.getTaskType()) {
                                target.add(CronTaskProperties.builder()
                                        .taskMethod(taskMethod.getTaskMethod())
                                        .cronExpression(taskMethod.getCronExpression())
                                        .timeZone(taskMethod.getTimeZone())
                                        .taskDescription(taskMethod.getTaskDescription())
                                        .taskHostKlass(tasks.getTaskKlass())
                                        .build());
                            }
                            if (ScheduleTaskType.FIXED_DELAY == taskMethod.getTaskType()) {
                                target.add(FixedDelayTaskProperties.builder()
                                        .taskMethod(taskMethod.getTaskMethod())
                                        .intervalMilliseconds(taskMethod.getIntervalMilliseconds())
                                        .initialDelayMilliseconds(taskMethod.getInitialDelayMilliseconds())
                                        .taskDescription(taskMethod.getTaskDescription())
                                        .taskHostKlass(tasks.getTaskKlass())
                                        .build());
                            }
                            if (ScheduleTaskType.FIXED_RATE == taskMethod.getTaskType()) {
                                target.add(FixedRateTaskProperties.builder()
                                        .taskMethod(taskMethod.getTaskMethod())
                                        .intervalMilliseconds(taskMethod.getIntervalMilliseconds())
                                        .initialDelayMilliseconds(taskMethod.getInitialDelayMilliseconds())
                                        .taskDescription(taskMethod.getTaskDescription())
                                        .taskHostKlass(tasks.getTaskKlass())
                                        .build());
                            }
                        }
                    }
                }
            }
        }
        return target;
    }
}

添加一個配置類和任務類:

@Configuration
public class SchedulingAutoConfiguration {

    @Bean
    public JsonSchedulingConfigurer jsonSchedulingConfigurer(){
        return new JsonSchedulingConfigurer();
    }
}

// club.throwable.schedule.Tasks
@Slf4j
@Component
public class Tasks {

    public void processTask1() {
        log.info("processTask1觸發..........");
    }
}

啓動SpringBoot應用,某次執行的部分日誌以下:

2020-03-22 16:24:17.248  INFO 22836 --- [           main] c.t.s.AbstractSchedulingConfigurer       : 裝載FixedDelayTask[club.throwable.schedule.Tasks#processTask1()]成功,固定延遲間隔:5000 ms,初始延遲執行時間:0 ms,任務描述:processTask1任務
2020-03-22 16:24:22.275  INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks            : processTask1觸發..........
2020-03-22 16:24:27.277  INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks            : processTask1觸發..........
2020-03-22 16:24:32.279  INFO 22836 --- [pool-1-thread-1] club.throwable.schedule.Tasks            : processTask1觸發..........
......

這裏有些細節沒有完善,例如配置文件參數的一些非空判斷、配置值是否合法等等校驗邏輯沒有作,若是要設計成一個工業級的類庫,這些方面必需要考慮。

JDBC數據源配置

JDBC數據源這裏用MySQL舉例說明,先建一個調度任務配置表scheduling_task

CREATE TABLE `schedule_task`
(
    id                         BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT COMMENT '主鍵',
    edit_time                  DATETIME        NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新時間',
    create_time                DATETIME        NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '建立時間',
    editor                     VARCHAR(32)     NOT NULL DEFAULT 'admin' COMMENT '修改者',
    creator                    VARCHAR(32)     NOT NULL DEFAULT 'admin' COMMENT '建立者',
    deleted                    BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '軟刪除標識',
    task_host_class            VARCHAR(256)    NOT NULL COMMENT '任務宿主類全類名',
    task_method                VARCHAR(128)    NOT NULL COMMENT '任務執行方法名',
    task_type                  VARCHAR(16)     NOT NULL COMMENT '任務類型',
    task_description           VARCHAR(64)     NOT NULL COMMENT '任務描述',
    cron_expression            VARCHAR(128) COMMENT 'cron表達式',
    time_zone                  VARCHAR(32) COMMENT '時區',
    interval_milliseconds      BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '執行間隔時間',
    initial_delay_milliseconds BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '初始延遲執行時間',
    UNIQUE uniq_class_method (task_host_class, task_method)
) COMMENT '調度任務配置表';

其實具體的作法和JSON配置差很少,先引入spring-boot-starter-jdbc,接着編寫MysqlSchedulingConfigurer

// DAO
@RequiredArgsConstructor
public class MysqlScheduleTaskDao {

    private final JdbcTemplate jdbcTemplate;

    private static final ResultSetExtractor<List<ScheduleTask>> MULTI = r -> {
        List<ScheduleTask> tasks = Lists.newArrayList();
        while (r.next()) {
            ScheduleTask task = new ScheduleTask();
            tasks.add(task);
            task.setId(r.getLong("id"));
            task.setCronExpression(r.getString("cron_expression"));
            task.setInitialDelayMilliseconds(r.getLong("initial_delay_milliseconds"));
            task.setIntervalMilliseconds(r.getLong("interval_milliseconds"));
            task.setTimeZone(r.getString("time_zone"));
            task.setTaskDescription(r.getString("task_description"));
            task.setTaskHostClass(r.getString("task_host_class"));
            task.setTaskMethod(r.getString("task_method"));
            task.setTaskType(r.getString("task_type"));
        }
        return tasks;
    };

    public List<ScheduleTask> selectAllTasks() {
        return jdbcTemplate.query("SELECT * FROM schedule_task WHERE deleted = 0", MULTI);
    }
}

// MysqlSchedulingConfigurer
@RequiredArgsConstructor
public class MysqlSchedulingConfigurer extends AbstractSchedulingConfigurer {

    private final MysqlScheduleTaskDao mysqlScheduleTaskDao;

    @Override
    protected List<InternalTaskProperties> loadTaskProperties() throws Exception {
        List<InternalTaskProperties> target = Lists.newArrayList();
        List<ScheduleTask> tasks = mysqlScheduleTaskDao.selectAllTasks();
        if (!tasks.isEmpty()) {
            for (ScheduleTask task : tasks) {
                ScheduleTaskType scheduleTaskType = ScheduleTaskType.fromType(task.getTaskType());
                if (ScheduleTaskType.CRON == scheduleTaskType) {
                    target.add(CronTaskProperties.builder()
                            .taskMethod(task.getTaskMethod())
                            .cronExpression(task.getCronExpression())
                            .timeZone(task.getTimeZone())
                            .taskDescription(task.getTaskDescription())
                            .taskHostKlass(task.getTaskHostClass())
                            .build());
                }
                if (ScheduleTaskType.FIXED_DELAY == scheduleTaskType) {
                    target.add(FixedDelayTaskProperties.builder()
                            .taskMethod(task.getTaskMethod())
                            .intervalMilliseconds(String.valueOf(task.getIntervalMilliseconds()))
                            .initialDelayMilliseconds(String.valueOf(task.getInitialDelayMilliseconds()))
                            .taskDescription(task.getTaskDescription())
                            .taskHostKlass(task.getTaskHostClass())
                            .build());
                }
                if (ScheduleTaskType.FIXED_RATE == scheduleTaskType) {
                    target.add(FixedRateTaskProperties.builder()
                            .taskMethod(task.getTaskMethod())
                            .intervalMilliseconds(String.valueOf(task.getIntervalMilliseconds()))
                            .initialDelayMilliseconds(String.valueOf(task.getInitialDelayMilliseconds()))
                            .taskDescription(task.getTaskDescription())
                            .taskHostKlass(task.getTaskHostClass())
                            .build());
                }
            }
        }
        return target;
    }
}

記得引入spring-boot-starter-jdbcmysql-connector-java而且激活MysqlSchedulingConfigurer配置。插入一條記錄:

INSERT INTO `schedule_task`(`id`, `edit_time`, `create_time`, `editor`, `creator`, `deleted`, `task_host_class`, `task_method`, `task_type`, `task_description`, `cron_expression`, `time_zone`, `interval_milliseconds`, `initial_delay_milliseconds`) VALUES (1, '2020-03-30 23:46:10', '2020-03-30 23:46:10', 'admin', 'admin', 0, 'club.throwable.schedule.Tasks', 'processTask1', 'FIXED_DELAY', '測試任務', NULL, NULL, 10000, 5000);

而後啓動服務,某次執行的輸出:

2020-03-30 23:47:27.376  INFO 53120 --- [pool-1-thread-1] club.throwable.schedule.Tasks            : processTask1觸發..........
2020-03-30 23:47:37.378  INFO 53120 --- [pool-1-thread-1] club.throwable.schedule.Tasks            : processTask1觸發..........
....

混合配置

有些時候咱們但願能夠JSON配置和JDBC數據源配置進行混合配置,或者動態二選一以便靈活應對多環境的場景(例如要在開發環境使用JSON配置而測試和生產環境使用JDBC數據源配置,甚至能夠將JDBC數據源配置覆蓋JSON配置,這樣能保證老是傾向於使用JDBC數據源配置),這樣須要對前面兩小節的實現加多一層抽象。這裏的設計能夠參考SpringMVC中的控制器參數解析器的設計,具體是HandlerMethodArgumentResolverComposite,其實道理是相同的。

其餘注意事項

在生產實踐中,暫時不考慮生成任務執行日誌和細粒度的監控,着重作了兩件事:

  • 併發控制,(多服務節點下)禁止任務併發執行。
  • 跟蹤任務的日誌軌跡。

解決併發執行問題

通常狀況下,咱們須要禁止任務併發執行,考慮引入Redisson提供的分佈式鎖:

// 引入依賴
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>最新版本</version>
</dependency>

// 配置類
@Configuration
@AutoConfigureAfter(RedisAutoConfiguration.class)
public class RedissonAutoConfiguration {

    @Autowired
    private RedisProperties redisProperties;

    @Bean(destroyMethod = "shutdown")
    public RedissonClient redissonClient() {
        Config config = new Config();
        SingleServerConfig singleServerConfig = config.useSingleServer();
        singleServerConfig.setAddress(String.format("redis://%s:%d", redisProperties.getHost(), redisProperties.getPort()));
        if (redisProperties.getDatabase() > 0) {
            singleServerConfig.setDatabase(redisProperties.getDatabase());
        }
        if (null != redisProperties.getPassword()) {
            singleServerConfig.setPassword(redisProperties.getPassword());
        }
        return Redisson.create(config);
    }
}

// 分佈式鎖工廠
@Component
public class DistributedLockFactory {

    private static final String DISTRIBUTED_LOCK_PATH_PREFIX = "dl:";

    @Autowired
    private RedissonClient redissonClient;

    public DistributedLock provideDistributedLock(String lockKey) {
        String lockPath = DISTRIBUTED_LOCK_PATH_PREFIX + lockKey;
        return new RedissonDistributedLock(redissonClient, lockPath);
    }
}

這裏考慮到項目依賴了spring-boot-starter-redis,直接複用了它的配置屬性類(RedissonDistributedLockRLock的輕量級封裝,見附錄)。使用方式以下:

@Autowired
private DistributedLockFactory distributedLockFactory;

public void task1() {
    DistributedLock lock = distributedLockFactory.provideDistributedLock(lockKey);
    // 等待時間爲20秒,持有鎖的最大時間爲60秒
    boolean tryLock = lock.tryLock(20L, 60, TimeUnit.SECONDS);
    if (tryLock) {
         try {
            // 業務邏輯
         }finally {
            lock.unlock();
        }
    }
}

引入MDC跟蹤任務的Trace

MDC實際上是Mapped Diagnostic Context的縮寫,也就是映射診斷上下文,通常用於日誌框架裏面同一個線程執行過程的跟蹤(例如一個線程跑過了多個方法,各個方法裏面都打印了日誌,那麼經過MDC能夠對整個調用鏈經過一個惟一標識關聯起來),例如這裏選用slf4j提供的org.slf4j.MDC

@Component
public class MappedDiagnosticContextAssistant {

    /**
     * 在MDC中執行
     *
     * @param runnable runnable
     */
    public void processInMappedDiagnosticContext(Runnable runnable) {
        String uuid = UUID.randomUUID().toString();
        MDC.put("TRACE_ID", uuid);
        try {
            runnable.run();
        } finally {
            MDC.remove("TRACE_ID");
        }
    }
}

任務執行的時候須要包裹成一個Runnale實例:

public void task1() {
    mappedDiagnosticContextAssistant.processInMappedDiagnosticContext(() -> {
        StopWatch watch = new StopWatch();
        watch.start();
        log.info("開始執行......");
        // 業務邏輯
        watch.stop();
        log.info("執行完畢,耗時:{} ms......", watch.getTotalTimeMillis());
    });
}

結合前面一節提到的併發控制,那麼最終執行的任務方法以下:

public void task1() {
    mappedDiagnosticContextAssistant.processInMappedDiagnosticContext(() -> {
        StopWatch watch = new StopWatch();
        watch.start();
        log.info("開始執行......");
        scheduleTaskAssistant.executeInDistributedLock("任務分佈式鎖KEY", () -> {
            // 真實的業務邏輯
        });
        watch.stop();
        log.info("執行完畢,耗時:{} ms......", watch.getTotalTimeMillis());
    });
}

這裏的方法看起來比較彆扭,其實能夠直接在任務裝載的時候基於分佈式鎖和MDC進行封裝,方式相似於ScheduledMethodRunnable,這裏不作展開,由於要詳細展開篇幅可能比較大(ScheduleTaskAssistant見附錄)。

小結

其實spring-context整個調度模塊徹底依賴於TaskScheduler實現,更底層的是JUC調度線程池ScheduledThreadPoolExecutor。若是想要從底層原理理解整個調度模塊的運行原理,那麼就必定要分析ScheduledThreadPoolExecutor的實現。整篇文章大體介紹了spring-context調度模塊的加載調度任務的流程,而且基於擴展接口SchedulingConfigurer擴展出多種自定義配置調度任務的方式,可是考慮到須要在生產環境中運行,那麼免不了須要考慮監控、併發控制、日誌跟蹤等等的功能,可是這樣就會使得整個調度模塊變重,慢慢地就會發現,這個輪子越造越大,越有主流調度框架Quartz或者Easy Scheduler的影子。筆者認爲,軟件工程,有些時候要權衡取捨,該拋棄的就應該果斷拋棄,不然老是負重而行,還能走多遠?

參考資料:

  • SpringBoot源碼

附錄

ScheduleTaskAssistant

@RequiredArgsConstructor
@Component
public class ScheduleTaskAssistant {

    /**
     * 5秒
     */
    public static final long DEFAULT_WAIT_TIME = 5L;

    /**
     * 30秒
     */
    public static final long DEFAULT_LEAVE_TIME = 30L;

    private final DistributedLockFactory distributedLockFactory;

    /**
     * 在分佈式鎖中執行
     *
     * @param waitTime  鎖等着時間
     * @param leaveTime 鎖持有時間
     * @param timeUnit  時間單位
     * @param lockKey   鎖的key
     * @param task      任務對象
     */
    public void executeInDistributedLock(long waitTime, long leaveTime, TimeUnit timeUnit, String lockKey, Runnable task) {
        DistributedLock lock = distributedLockFactory.dl(lockKey);
        boolean tryLock = lock.tryLock(waitTime, leaveTime, timeUnit);
        if (tryLock) {
            try {
                long waitTimeMillis = timeUnit.toMillis(waitTime);
                long start = System.currentTimeMillis();
                task.run();
                long end = System.currentTimeMillis();
                long cost = end - start;
                // 預防鎖過早釋放
                if (cost < waitTimeMillis) {
                    Sleeper.X.sleep(waitTimeMillis - cost);
                }
            } finally {
                lock.unlock();
            }
        }
    }

    /**
     * 在分佈式鎖中執行 - 使用默認時間
     *
     * @param lockKey 鎖的key
     * @param task    任務對象
     */
    public void executeInDistributedLock(String lockKey, Runnable task) {
        executeInDistributedLock(DEFAULT_WAIT_TIME, DEFAULT_LEAVE_TIME, TimeUnit.SECONDS, lockKey, task);
    }
}

RedissonDistributedLock

@Slf4j
public class RedissonDistributedLock implements DistributedLock {

    private final RedissonClient redissonClient;
    private final String lockPath;
    private final RLock internalLock;

    RedissonDistributedLock(RedissonClient redissonClient, String lockPath) {
        this.redissonClient = redissonClient;
        this.lockPath = lockPath;
        this.internalLock = initInternalLock();
    }

    private RLock initInternalLock() {
        return redissonClient.getLock(lockPath);
    }

    @Override
    public boolean isLock() {
        return internalLock.isLocked();
    }

    @Override
    public boolean isHeldByCurrentThread() {
        return internalLock.isHeldByCurrentThread();
    }

    @Override
    public void lock(long leaseTime, TimeUnit unit) {
        internalLock.lock(leaseTime, unit);
    }

    @Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) {
        try {
            return internalLock.tryLock(waitTime, leaseTime, unit);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException(String.format("Acquire lock fail by thread interrupted,path:%s", lockPath), e);
        }
    }

    @Override
    public void unlock() {
        try {
            internalLock.unlock();
        } catch (IllegalMonitorStateException ex) {
            log.warn("Unlock path:{} error for thread status change in concurrency", lockPath, ex);
        }
    }
}

(本文完 c-7-d e-a-20200324 真是有點滑稽,筆者發現任務持久化最好仍是用現成的工業級調度器,因而基於Quartz作了輕量級封裝,寫了個後臺管理界面,且聽下回分解)

相關文章
相關標籤/搜索