本文主要研究一下flink的ScheduledExecutorhtml
java.base/java/util/concurrent/Executor.javajava
public interface Executor { /** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */ void execute(Runnable command); }
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutor.javaapache
public interface ScheduledExecutor extends Executor { /** * Executes the given command after the given delay. * * @param command the task to execute in the future * @param delay the time from now to delay the execution * @param unit the time unit of the delay parameter * @return a ScheduledFuture representing the completion of the scheduled task */ ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); /** * Executes the given callable after the given delay. The result of the callable is returned * as a {@link ScheduledFuture}. * * @param callable the callable to execute * @param delay the time from now to delay the execution * @param unit the time unit of the delay parameter * @param <V> result type of the callable * @return a ScheduledFuture which holds the future value of the given callable */ <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); /** * Executes the given command periodically. The first execution is started after the * {@code initialDelay}, the second execution is started after {@code initialDelay + period}, * the third after {@code initialDelay + 2*period} and so on. * The task is executed until either an execution fails, or the returned {@link ScheduledFuture} * is cancelled. * * @param command the task to be executed periodically * @param initialDelay the time from now until the first execution is triggered * @param period the time after which the next execution is triggered * @param unit the time unit of the delay and period parameter * @return a ScheduledFuture representing the periodic task. This future never completes * unless an execution of the given task fails or if the future is cancelled */ ScheduledFuture<?> scheduleAtFixedRate( Runnable command, long initialDelay, long period, TimeUnit unit); /** * Executed the given command repeatedly with the given delay between the end of an execution * and the start of the next execution. * The task is executed repeatedly until either an exception occurs or if the returned * {@link ScheduledFuture} is cancelled. * * @param command the task to execute repeatedly * @param initialDelay the time from now until the first execution is triggered * @param delay the time between the end of the current and the start of the next execution * @param unit the time unit of the initial delay and the delay parameter * @return a ScheduledFuture representing the repeatedly executed task. This future never * completes unless the execution of the given task fails or if the future is cancelled */ ScheduledFuture<?> scheduleWithFixedDelay( Runnable command, long initialDelay, long delay, TimeUnit unit); }
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ScheduledExecutorServiceAdapter.javaapi
public class ScheduledExecutorServiceAdapter implements ScheduledExecutor { private final ScheduledExecutorService scheduledExecutorService; public ScheduledExecutorServiceAdapter(ScheduledExecutorService scheduledExecutorService) { this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService); } @Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { return scheduledExecutorService.schedule(command, delay, unit); } @Override public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { return scheduledExecutorService.schedule(callable, delay, unit); } @Override public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { return scheduledExecutorService.scheduleAtFixedRate(command, initialDelay, period, unit); } @Override public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { return scheduledExecutorService.scheduleWithFixedDelay(command, initialDelay, delay, unit); } @Override public void execute(Runnable command) { scheduledExecutorService.execute(command); } }
scheduleWithFixedDelay、execute方法less
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.javaide
public final class ActorSystemScheduledExecutorAdapter implements ScheduledExecutor { private final ActorSystem actorSystem; public ActorSystemScheduledExecutorAdapter(ActorSystem actorSystem) { this.actorSystem = Preconditions.checkNotNull(actorSystem, "rpcService"); } @Override @Nonnull public ScheduledFuture<?> schedule(@Nonnull Runnable command, long delay, @Nonnull TimeUnit unit) { ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>(command, unit.toNanos(delay), 0L); Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit); scheduledFutureTask.setCancellable(cancellable); return scheduledFutureTask; } @Override @Nonnull public <V> ScheduledFuture<V> schedule(@Nonnull Callable<V> callable, long delay, @Nonnull TimeUnit unit) { ScheduledFutureTask<V> scheduledFutureTask = new ScheduledFutureTask<>(callable, unit.toNanos(delay), 0L); Cancellable cancellable = internalSchedule(scheduledFutureTask, delay, unit); scheduledFutureTask.setCancellable(cancellable); return scheduledFutureTask; } @Override @Nonnull public ScheduledFuture<?> scheduleAtFixedRate(@Nonnull Runnable command, long initialDelay, long period, @Nonnull TimeUnit unit) { ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>( command, triggerTime(unit.toNanos(initialDelay)), unit.toNanos(period)); Cancellable cancellable = actorSystem.scheduler().schedule( new FiniteDuration(initialDelay, unit), new FiniteDuration(period, unit), scheduledFutureTask, actorSystem.dispatcher()); scheduledFutureTask.setCancellable(cancellable); return scheduledFutureTask; } @Override @Nonnull public ScheduledFuture<?> scheduleWithFixedDelay(@Nonnull Runnable command, long initialDelay, long delay, @Nonnull TimeUnit unit) { ScheduledFutureTask<Void> scheduledFutureTask = new ScheduledFutureTask<>( command, triggerTime(unit.toNanos(initialDelay)), unit.toNanos(-delay)); Cancellable cancellable = internalSchedule(scheduledFutureTask, initialDelay, unit); scheduledFutureTask.setCancellable(cancellable); return scheduledFutureTask; } @Override public void execute(@Nonnull Runnable command) { actorSystem.dispatcher().execute(command); } private Cancellable internalSchedule(Runnable runnable, long delay, TimeUnit unit) { return actorSystem.scheduler().scheduleOnce( new FiniteDuration(delay, unit), runnable, actorSystem.dispatcher()); } private long now() { return System.nanoTime(); } private long triggerTime(long delay) { return now() + delay; } private final class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { private long time; private final long period; private volatile Cancellable cancellable; ScheduledFutureTask(Callable<V> callable, long time, long period) { super(callable); this.time = time; this.period = period; } ScheduledFutureTask(Runnable runnable, long time, long period) { super(runnable, null); this.time = time; this.period = period; } public void setCancellable(Cancellable newCancellable) { this.cancellable = newCancellable; } @Override public void run() { if (!isPeriodic()) { super.run(); } else if (runAndReset()){ if (period > 0L) { time += period; } else { cancellable = internalSchedule(this, -period, TimeUnit.NANOSECONDS); // check whether we have been cancelled concurrently if (isCancelled()) { cancellable.cancel(); } else { time = triggerTime(-period); } } } } @Override public boolean cancel(boolean mayInterruptIfRunning) { boolean result = super.cancel(mayInterruptIfRunning); return result && cancellable.cancel(); } @Override public long getDelay(@Nonnull TimeUnit unit) { return unit.convert(time - now(), TimeUnit.NANOSECONDS); } @Override public int compareTo(@Nonnull Delayed o) { if (o == this) { return 0; } long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); return (diff < 0L) ? -1 : (diff > 0L) ? 1 : 0; } @Override public boolean isPeriodic() { return period != 0L; } } }