SpringBoot源碼解析-Scheduled定時器的原理

定時器的使用

能夠參考下肥朝大佬的文章 原理暫且不談,定時器你當真會用? 寫的很形象。spring

定時器原理剖析

定時器的基礎是jdk中的工具類ScheduledThreadPoolExecutor,想要了解springboot定時器的原理,先得了解ScheduledThreadPoolExecutor的原理。springboot

該類中有三個主要的方法:bash

  1. schedule(...)
  2. scheduleWithFixedDelay(...)
  3. scheduleAtFixedRate(...)

咱們先簡單回顧下這三個方法。app

schedule方法

schedule方法的做用是提供一個延時執行的任務,該任務只會執行一次。該方法的三個參數以下工具

schedule(Runnable command,   long delay,   TimeUnit unit)
複製代碼

command爲須要執行的任務,delay和unit組合起來使用,表示延時的時間。post

public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        //校驗參數
        if (command == null || unit == null)
            throw new NullPointerException();
        //任務轉換
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        //添加任務到延時隊列
        delayedExecute(t);
        return t;
    }
複製代碼

首先看一下任務轉換的邏輯:ui

//將延時的時間加上如今的時間,轉化成真正執行任務的時間
    private long triggerTime(long delay, TimeUnit unit) {
        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }

    //將任務轉化爲ScheduledFutureTask對象
    ScheduledFutureTask(Runnable r, V result, long ns) {
        super(r, result);
        this.time = ns;
        //period爲0表示只執行一次
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }
複製代碼

接下來就是添加進任務隊列:this

private void delayedExecute(RunnableScheduledFuture<?> task) {
        //檢查任務狀態
        if (isShutdown())
            reject(task);
        else {
            //添加進隊列
            super.getQueue().add(task);
            //在執行以前,再次檢查任務狀態
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                //檢查是否有線程在處理任務,若是工做線程數少於核心線程數,會新建worker。
                ensurePrestart();
        }
    }
複製代碼

添加的邏輯看完了,如今看一下加入隊列後是如何執行的:spa

//worker線程會調用剛剛封裝好的ScheduledFutureTask對象的run方法
public void run() {
    //判斷period是不是0
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        //在schedule方法中period是0,進入父類的run方法,run方法中
        //會調用咱們傳入的任務
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}
複製代碼

schedule方法的執行邏輯大體如上,schedule方法只執行一次。線程

scheduleWithFixedDelay方法

該方法的做用是在任務執行完成後,通過固定延時時間再次運行。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        //校驗參數
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        //將任務轉化爲ScheduledFutureTask對象,注意這個地方period不是0了!
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        //將outerTask設置爲本身
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        //添加進延時隊列
        delayedExecute(t);
        return t;
    }
複製代碼

和schedule方法稍有不一樣,一個是period不在是0,而是小於0,還有就是將outerTask設置爲本身。

添加進任務隊列的邏輯都是同樣的,因此直接看執行邏輯:

public void run() {
    //這個地方periodic是true了
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        ScheduledFutureTask.super.run();
    //因此會進入下面這個邏輯
    else if (ScheduledFutureTask.super.runAndReset()) {
        //設置下一次任務時間
        setNextRunTime();
        //將本身再次添加進隊列
        reExecutePeriodic(outerTask);
    }
}
//period是小於0的,注意這個地方大於0和小於0邏輯上的區別
private void setNextRunTime() {
    long p = period;
    if (p > 0)
        //大於0的話,是用上次執行的時間,加上延時時間算出下次執行的時間
        time += p;
    else
        //小於0的話,是用當前時間,加上延時時間,算出下次執行時間
        time = triggerTime(-p);
}
複製代碼

scheduleAtFixedRate方法

這個方法和上一個方法幾乎同樣,惟一的區別就是他的period是大於0的,因此延時時間按照大於0來計算。


springboot中定時器的原理

瞭解完ScheduledThreadPoolExecutor的基礎原理後,咱們來看一下springboot中定時任務的調度。springboot定時任務調度的基礎是ScheduledAnnotationBeanPostProcessor類,查看繼承體系發現該類實現了BeanPostProcessor接口,因此進入該類的postProcessAfterInitialization方法。

public Object postProcessAfterInitialization(Object bean, String beanName) {
		if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
				bean instanceof ScheduledExecutorService) {
			// Ignore AOP infrastructure such as scoped proxies.
			return bean;
		}

		Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
		if (!this.nonAnnotatedClasses.contains(targetClass)) {
			//查找被Scheduled註解標註的類
			Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
					(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
						Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
								method, Scheduled.class, Schedules.class);
						return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
					});
			if (annotatedMethods.isEmpty()) {
				this.nonAnnotatedClasses.add(targetClass);
				if (logger.isTraceEnabled()) {
					logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
				}
			}
			else {
				// Non-empty set of methods
				//若是被Scheduled註解標註,就執行processScheduled方法。
				annotatedMethods.forEach((method, scheduledMethods) ->
						scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
				if (logger.isTraceEnabled()) {
					logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
							"': " + annotatedMethods);
				}
			}
		}
		return bean;
	}
	
	//以cron模式來解析一下processScheduled方法
	protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
		try {
			Runnable runnable = createRunnable(bean, method);
			boolean processedSchedule = false;
			...
			// 解析註解裏的屬性
			String cron = scheduled.cron();
			if (StringUtils.hasText(cron)) {
				String zone = scheduled.zone();
				if (this.embeddedValueResolver != null) {
					cron = this.embeddedValueResolver.resolveStringValue(cron);
					zone = this.embeddedValueResolver.resolveStringValue(zone);
				}
				if (StringUtils.hasLength(cron)) {
					Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
					processedSchedule = true;
					if (!Scheduled.CRON_DISABLED.equals(cron)) {
						TimeZone timeZone;
						if (StringUtils.hasText(zone)) {
							timeZone = StringUtils.parseTimeZoneString(zone);
						}
						else {
							timeZone = TimeZone.getDefault();
						}
						//將封裝好的任務存儲起來
						tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
					}
				}
			}
			...
			// Finally register the scheduled tasks
			synchronized (this.scheduledTasks) {
				Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
				//根據bean分類,將每一個bean的定時任務存進scheduledTasks
				regTasks.addAll(tasks);
			}
		}
		...
	}

	public ScheduledTask scheduleCronTask(CronTask task) {
		ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
		boolean newTask = false;
		if (scheduledTask == null) {
			//根據task,新建一個ScheduledTask
			scheduledTask = new ScheduledTask(task);
			newTask = true;
		}
		if (this.taskScheduler != null) {
			scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
		}
		else {
			//根據定時任務種類的區別存儲task
			addCronTask(task);
			this.unresolvedTasks.put(task, scheduledTask);
		}
		return (newTask ? scheduledTask : null);
	}
複製代碼

在postProcessAfterInitialization方法中,spring主要就是解析註解,並將根據註解生成相應的延時任務。那麼如今解析好了,也存儲好了,執行的地方在哪裏呢?在一次查看該類的繼承體系,發現該類還實現了ApplicationListener接口,因此進入onApplicationEvent方法。

public void onApplicationEvent(ContextRefreshedEvent event) {
		if (event.getApplicationContext() == this.applicationContext) {
			finishRegistration();
		}
	}

	private void finishRegistration() {
		...
		//上面一大段都是尋找taskScheduler類的,若是沒有設置的話這邊是找不到的
		this.registrar.afterPropertiesSet();
	}

	public void afterPropertiesSet() {
		scheduleTasks();
	}

	protected void scheduleTasks() {
		//沒有自定義配置就使用默認配置
		if (this.taskScheduler == null) {
			//默認的執行器只有一個線程使用的時候要注意一下
			this.localExecutor = Executors.newSingleThreadScheduledExecutor();
			this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
		}
		if (this.triggerTasks != null) {
			for (TriggerTask task : this.triggerTasks) {
				addScheduledTask(scheduleTriggerTask(task));
			}
		}
		if (this.cronTasks != null) {
			for (CronTask task : this.cronTasks) {
				addScheduledTask(scheduleCronTask(task));
			}
		}
		if (this.fixedRateTasks != null) {
			for (IntervalTask task : this.fixedRateTasks) {
				addScheduledTask(scheduleFixedRateTask(task));
			}
		}
		if (this.fixedDelayTasks != null) {
			for (IntervalTask task : this.fixedDelayTasks) {
				addScheduledTask(scheduleFixedDelayTask(task));
			}
		}
	}
複製代碼

在該方法中,清晰的看到了定時任務調用的過程triggerTasks好像不是經過註解進來的,這個先無論。咱們能夠看一下另外三個的執行。

cron執行邏輯

public ScheduledTask scheduleCronTask(CronTask task) {
		ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
		...
		//這個地方taskScheduler已經有默認值了
		if (this.taskScheduler != null) {
			scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
		}
		...
		return (newTask ? scheduledTask : null);
	}

	public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
		try {
			...
			else {
				...
				//新建了一個ReschedulingRunnable對象,調用schedule方法。
				return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule();
			}
		}
		...
	}

	//新建一個ReschedulingRunnable對象,這個對象也實現了runnable接口
	public ReschedulingRunnable(
			Runnable delegate, Trigger trigger, ScheduledExecutorService executor, ErrorHandler errorHandler) {

		super(delegate, errorHandler);
		this.trigger = trigger;
		this.executor = executor;
	}

	public ScheduledFuture<?> schedule() {
		synchronized (this.triggerContextMonitor) {
			this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
			if (this.scheduledExecutionTime == null) {
				return null;
			}
			//計算下次執行時間
			long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
			//將本身傳入執行器,也就是調用本身的run方法
			this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
			return this;
		}
	}

	public void run() {
		Date actualExecutionTime = new Date();
		//執行咱們定義的定時任務
		super.run();
		Date completionTime = new Date();
		synchronized (this.triggerContextMonitor) {
			Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
			//更新時間
			this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
			if (!obtainCurrentFuture().isCancelled()) {
				//在次調用schedule方法
				schedule();
			}
		}
	}

複製代碼

在上面咱們分析執行器邏輯的時候,知道執行器的schedule方法只會執行一次,因此springboot在這個地方使用互相調用的方法,來達到定時循環的目的。因此這個方法中,關鍵的就是時間的更新。

public Date nextExecutionTime(TriggerContext triggerContext) {
		//獲取上一次任務完成時間
		Date date = triggerContext.lastCompletionTime();
		if (date != null) {
			//獲取上一次任務執行的時間
			Date scheduled = triggerContext.lastScheduledExecutionTime();
			//比較兩次時間,使用後者生成新的執行時間
			if (scheduled != null && date.before(scheduled)) {
				date = scheduled;
			}
		}
		else {
			//初始化的時候直接使用當前時間
			date = new Date();
		}
		return this.sequenceGenerator.next(date);
	}
複製代碼

cron模式每次根據上次執行時間和上次完成時間更後面的生成新的時間,結合肥朝的文章應該能夠理解這種模型。不過這個地方我也不太明白什麼狀況下完成時間會在執行時間的前面。反正就是根據最新的時間生成新的時間就是。

剩下的兩個執行邏輯

public ScheduledTask scheduleFixedRateTask(IntervalTask task) {
		FixedRateTask taskToUse = (task instanceof FixedRateTask ? (FixedRateTask) task :
				new FixedRateTask(task.getRunnable(), task.getInterval(), task.getInitialDelay()));
		return scheduleFixedRateTask(taskToUse);
	}

	public ScheduledTask scheduleFixedRateTask(FixedRateTask task) {
		...
			scheduledTask.future =
					this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), task.getInterval());
		...
		return (newTask ? scheduledTask : null);
	}

	public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period) {
		try {
			return this.scheduledExecutor.scheduleAtFixedRate(decorateTask(task, true), 0, period, TimeUnit.MILLISECONDS);
		}
		catch (RejectedExecutionException ex) {
			throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
		}
	}
複製代碼

另外兩個模式就是執行ScheduledThreadPoolExecutor對應的方法了,關鍵仍是時間的邏輯,時間的生成邏輯上面已經給出來了,就是根據period大於0仍是小於0來生成的。


返回目錄

相關文章
相關標籤/搜索