聊聊flink的ScheduledExecutor

本文主要研究一下flink的ScheduledExecutorhtml

Executor

java.base/java/util/concurrent/Executor.javajava

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}
  • jdk的Executor接口定義了execute方法,接收參數類型爲Runnable

ScheduledExecutor

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.javaapache

public interface ScheduledExecutor extends Executor {

	/**
	 * Executes the given command after the given delay.
	 *
	 * @param command the task to execute in the future
	 * @param delay the time from now to delay the execution
	 * @param unit the time unit of the delay parameter
	 * @return a ScheduledFuture representing the completion of the scheduled task
	 */
	ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

	/**
	 * Executes the given callable after the given delay. The result of the callable is returned
	 * as a {@link ScheduledFuture}.
	 *
	 * @param callable the callable to execute
	 * @param delay the time from now to delay the execution
	 * @param unit the time unit of the delay parameter
	 * @param <V> result type of the callable
	 * @return a ScheduledFuture which holds the future value of the given callable
	 */
	<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

	/**
	 * Executes the given command periodically. The first execution is started after the
	 * {@code initialDelay}, the second execution is started after {@code initialDelay + period},
	 * the third after {@code initialDelay + 2*period} and so on.
	 * The task is executed until either an execution fails, or the returned {@link ScheduledFuture}
	 * is cancelled.
	 *
	 * @param command the task to be executed periodically
	 * @param initialDelay the time from now until the first execution is triggered
	 * @param period the time after which the next execution is triggered
	 * @param unit the time unit of the delay and period parameter
	 * @return a ScheduledFuture representing the periodic task. This future never completes
	 * unless an execution of the given task fails or if the future is cancelled
	 */
	ScheduledFuture<?> scheduleAtFixedRate(
		Runnable command,
		long initialDelay,
		long period,
		TimeUnit unit);

	/**
	 * Executed the given command repeatedly with the given delay between the end of an execution
	 * and the start of the next execution.
	 * The task is executed repeatedly until either an exception occurs or if the returned
	 * {@link ScheduledFuture} is cancelled.
	 *
	 * @param command the task to execute repeatedly
	 * @param initialDelay the time from now until the first execution is triggered
	 * @param delay the time between the end of the current and the start of the next execution
	 * @param unit the time unit of the initial delay and the delay parameter
	 * @return a ScheduledFuture representing the repeatedly executed task. This future never
	 * completes unless the execution of the given task fails or if the future is cancelled
	 */
	ScheduledFuture<?> scheduleWithFixedDelay(
		Runnable command,
		long initialDelay,
		long delay,
		TimeUnit unit);
}
  • ScheduledExecutor接口繼承了Executor,它定義了schedule、scheduleAtFixedRate、scheduleWithFixedDelay方法,其中schedule方法能夠接收Runnable或者Callable,這些方法返回的都是ScheduledFuture;該接口有兩個實現類,分別是ScheduledExecutorServiceAdapter及ActorSystemScheduledExecutorAdapter

ScheduledExecutorServiceAdapter

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.javaapi

public class ScheduledExecutorServiceAdapter implements ScheduledExecutor {

	private final ScheduledExecutorService scheduledExecutorService;

	public ScheduledExecutorServiceAdapter(ScheduledExecutorService scheduledExecutorService) {
		this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService);
	}

	@Override
	public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
		return scheduledExecutorService.schedule(command, delay, unit);
	}

	@Override
	public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
		return scheduledExecutorService.schedule(callable, delay, unit);
	}

	@Override
	public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
		return scheduledExecutorService.scheduleAtFixedRate(command, initialDelay, period, unit);
	}

	@Override
	public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
		return scheduledExecutorService.scheduleWithFixedDelay(command, initialDelay, delay, unit);
	}

	@Override
	public void execute(Runnable command) {
		scheduledExecutorService.execute(command);
	}
}
  • ScheduledExecutorServiceAdapter實現了ScheduledExecutor接口,它使用的是jdk的ScheduledExecutorService來實現,使用了scheduledExecutorService的schedule、scheduleAtFixedRate、 scheduleWithFixedDelay、execute方法

ActorSystemScheduledExecutorAdapter

flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.javaless

public final class ActorSystemScheduledExecutorAdapter implements ScheduledExecutor {

	private final ActorSystem actorSystem;

	public ActorSystemScheduledExecutorAdapter(ActorSystem actorSystem) {
		this.actorSystem = Preconditions.checkNotNull(actorSystem, "rpcService");
	}

	@Override
	@Nonnull
	public ScheduledFuture<?> schedule(@Nonnull Runnable command, long delay, @Nonnull TimeUnit unit) {
		ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(command, unit.toNanos(delay), 0L);

		Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit);

		scheduledFutureTask.setCancellable(cancellable);

		return scheduledFutureTask;
	}

	@Override
	@Nonnull
	public <V> ScheduledFuture<V> schedule(@Nonnull Callable<V> callable, long delay, @Nonnull TimeUnit unit) {
		ScheduledFutureTask<V> scheduledFutureTask = new ScheduledFutureTask<>(callable, unit.toNanos(delay), 0L);

		Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit);

		scheduledFutureTask.setCancellable(cancellable);

		return scheduledFutureTask;
	}

	@Override
	@Nonnull
	public ScheduledFuture<?> scheduleAtFixedRate(@Nonnull Runnable command, long initialDelay, long period, @Nonnull TimeUnit unit) {
		ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(
			command,
			triggerTime(unit.toNanos(initialDelay)),
			unit.toNanos(period));

		Cancellable cancellable = actorSystem.scheduler().schedule(
			new FiniteDuration(initialDelay, unit),
			new FiniteDuration(period, unit),
			scheduledFutureTask,
			actorSystem.dispatcher());

		scheduledFutureTask.setCancellable(cancellable);

		return scheduledFutureTask;
	}

	@Override
	@Nonnull
	public ScheduledFuture<?> scheduleWithFixedDelay(@Nonnull Runnable command, long initialDelay, long delay, @Nonnull TimeUnit unit) {
		ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(
			command,
			triggerTime(unit.toNanos(initialDelay)),
			unit.toNanos(-delay));

		Cancellable cancellable = internalSchedule(scheduledFutureTask, initialDelay, unit);

		scheduledFutureTask.setCancellable(cancellable);

		return scheduledFutureTask;
	}

	@Override
	public void execute(@Nonnull Runnable command) {
		actorSystem.dispatcher().execute(command);
	}

	private Cancellable internalSchedule(Runnable runnable, long delay, TimeUnit unit) {
		return actorSystem.scheduler().scheduleOnce(
			new FiniteDuration(delay, unit),
			runnable,
			actorSystem.dispatcher());
	}

	private long now() {
		return System.nanoTime();
	}

	private long triggerTime(long delay) {
		return now() + delay;
	}

	private final class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {

		private long time;

		private final long period;

		private volatile Cancellable cancellable;

		ScheduledFutureTask(Callable<V> callable, long time, long period) {
			super(callable);
			this.time = time;
			this.period = period;
		}

		ScheduledFutureTask(Runnable runnable, long time, long period) {
			super(runnable, null);
			this.time = time;
			this.period = period;
		}

		public void setCancellable(Cancellable newCancellable) {
			this.cancellable = newCancellable;
		}

		@Override
		public void run() {
			if (!isPeriodic()) {
				super.run();
			} else if (runAndReset()){
				if (period > 0L) {
					time += period;
				} else {
					cancellable = internalSchedule(this, -period, TimeUnit.NANOSECONDS);

					// check whether we have been cancelled concurrently
					if (isCancelled()) {
						cancellable.cancel();
					} else {
						time = triggerTime(-period);
					}
				}
			}
		}

		@Override
		public boolean cancel(boolean mayInterruptIfRunning) {
			boolean result = super.cancel(mayInterruptIfRunning);

			return result && cancellable.cancel();
		}

		@Override
		public long getDelay(@Nonnull  TimeUnit unit) {
			return unit.convert(time - now(), TimeUnit.NANOSECONDS);
		}

		@Override
		public int compareTo(@Nonnull Delayed o) {
			if (o == this) {
				return 0;
			}

			long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
			return (diff < 0L) ? -1 : (diff > 0L) ? 1 : 0;
		}

		@Override
		public boolean isPeriodic() {
			return period != 0L;
		}
	}
}
  • ActorSystemScheduledExecutorAdapter實現了ScheduledExecutor接口,它使用的是actorSystem來實現;其中execute方法使用的是actorSystem.dispatcher().execute方法
  • schedule及scheduleWithFixedDelay方法調用的是internalSchedule方法,它使用的是actorSystem.scheduler().scheduleOnce方法,只是它們的ScheduledFutureTask不一樣,其中schedule方法的ScheduledFutureTask的period爲0,而scheduleWithFixedDelay方法的ScheduledFutureTask的period爲unit.toNanos(-delay);ScheduledFutureTask的run方法會對period進行判斷,小於等於0的,會再次調用internalSchedule方法,來實現以FixedDelay進行調度的效果
  • scheduleAtFixedRate方法,它使用的是actorSystem.scheduler().schedule方法,其ScheduledFutureTask的period即爲方法參數的period,沒有像scheduleWithFixedDelay方法那樣用unit.toNanos(-delay)做爲period

小結

  • ScheduledExecutor接口繼承了Executor,它定義了schedule、scheduleAtFixedRate、scheduleWithFixedDelay方法,其中schedule方法能夠接收Runnable或者Callable,這些方法返回的都是ScheduledFuture;該接口有兩個實現類,分別是ScheduledExecutorServiceAdapter及ActorSystemScheduledExecutorAdapter
  • ScheduledExecutorServiceAdapter實現了ScheduledExecutor接口,它使用的是jdk的ScheduledExecutorService來實現,使用了scheduledExecutorService的schedule、scheduleAtFixedRate、scheduleWithFixedDelay、execute方法
  • ActorSystemScheduledExecutorAdapter實現了ScheduledExecutor接口,它使用的是actorSystem來實現;其中execute方法使用的是actorSystem.dispatcher().execute方法;schedule及scheduleWithFixedDelay方法調用的是internalSchedule方法,它使用的是actorSystem.scheduler().scheduleOnce方法,只是它們的ScheduledFutureTask不一樣,其中schedule方法的ScheduledFutureTask的period爲0,而scheduleWithFixedDelay方法的ScheduledFutureTask的period爲unit.toNanos(-delay);ScheduledFutureTask的run方法會對period進行判斷,小於等於0的,會再次調用internalSchedule方法,來實現以FixedDelay進行調度的效果;scheduleAtFixedRate方法,它使用的是actorSystem.scheduler().schedule方法,其ScheduledFutureTask的period即爲方法參數的period,沒有像scheduleWithFixedDelay方法那樣用unit.toNanos(-delay)做爲period

doc

相關文章
相關標籤/搜索