聊聊flink的SourceFunction

本文主要研究一下flink的SourceFunctionhtml

實例

// set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> dataStreamSource = env.addSource(new RandomWordSource());

        dataStreamSource.map(new UpperCaseMapFunc()).print();

        env.execute("sourceFunctionDemo");
  • 這裏經過addSource方法來添加自定義的SourceFunction

SourceFunction

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.javajava

/**
 * Base interface for all stream data sources in Flink. The contract of a stream source
 * is the following: When the source should start emitting elements, the {@link #run} method
 * is called with a {@link SourceContext} that can be used for emitting elements.
 * The run method can run for as long as necessary. The source must, however, react to an
 * invocation of {@link #cancel()} by breaking out of its main loop.
 *
 * <h3>CheckpointedFunction Sources</h3>
 *
 * <p>Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}
 * interface must ensure that state checkpointing, updating of internal state and emission of
 * elements are not done concurrently. This is achieved by using the provided checkpointing lock
 * object to protect update of state and emission of elements in a synchronized block.
 *
 * <p>This is the basic pattern one should follow when implementing a checkpointed source:
 *
 * <pre>{@code
 *  public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
 *      private long count = 0L;
 *      private volatile boolean isRunning = true;
 *
 *      private transient ListState<Long> checkpointedCount;
 *
 *      public void run(SourceContext<T> ctx) {
 *          while (isRunning && count < 1000) {
 *              // this synchronized block ensures that state checkpointing,
 *              // internal state updates and emission of elements are an atomic operation
 *              synchronized (ctx.getCheckpointLock()) {
 *                  ctx.collect(count);
 *                  count++;
 *              }
 *          }
 *      }
 *
 *      public void cancel() {
 *          isRunning = false;
 *      }
 *
 *      public void initializeState(FunctionInitializationContext context) {
 *          this.checkpointedCount = context
 *              .getOperatorStateStore()
 *              .getListState(new ListStateDescriptor<>("count", Long.class));
 *
 *          if (context.isRestored()) {
 *              for (Long count : this.checkpointedCount.get()) {
 *                  this.count = count;
 *              }
 *          }
 *      }
 *
 *      public void snapshotState(FunctionSnapshotContext context) {
 *          this.checkpointedCount.clear();
 *          this.checkpointedCount.add(count);
 *      }
 * }
 * }</pre>
 *
 *
 * <h3>Timestamps and watermarks:</h3>
 * Sources may assign timestamps to elements and may manually emit watermarks.
 * However, these are only interpreted if the streaming program runs on
 * {@link TimeCharacteristic#EventTime}. On other time characteristics
 * ({@link TimeCharacteristic#IngestionTime} and {@link TimeCharacteristic#ProcessingTime}),
 * the watermarks from the source function are ignored.
 *
 * <h3>Gracefully Stopping Functions</h3>
 * Functions may additionally implement the {@link org.apache.flink.api.common.functions.StoppableFunction}
 * interface. "Stopping" a function, in contrast to "canceling" means a graceful exit that leaves the
 * state and the emitted elements in a consistent state.
 *
 * <p>When a source is stopped, the executing thread is not interrupted, but expected to leave the
 * {@link #run(SourceContext)} method in reasonable time on its own, preserving the atomicity
 * of state updates and element emission.
 *
 * @param <T> The type of the elements produced by this source.
 *
 * @see org.apache.flink.api.common.functions.StoppableFunction
 * @see org.apache.flink.streaming.api.TimeCharacteristic
 */
@Public
public interface SourceFunction<T> extends Function, Serializable {

    /**
     * Starts the source. Implementations can use the {@link SourceContext} emit
     * elements.
     *
     * <p>Sources that implement {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction}
     * must lock on the checkpoint lock (using a synchronized block) before updating internal
     * state and emitting elements, to make both an atomic operation:
     *
     * <pre>{@code
     *  public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
     *      private long count = 0L;
     *      private volatile boolean isRunning = true;
     *
     *      private transient ListState<Long> checkpointedCount;
     *
     *      public void run(SourceContext<T> ctx) {
     *          while (isRunning && count < 1000) {
     *              // this synchronized block ensures that state checkpointing,
     *              // internal state updates and emission of elements are an atomic operation
     *              synchronized (ctx.getCheckpointLock()) {
     *                  ctx.collect(count);
     *                  count++;
     *              }
     *          }
     *      }
     *
     *      public void cancel() {
     *          isRunning = false;
     *      }
     *
     *      public void initializeState(FunctionInitializationContext context) {
     *          this.checkpointedCount = context
     *              .getOperatorStateStore()
     *              .getListState(new ListStateDescriptor<>("count", Long.class));
     *
     *          if (context.isRestored()) {
     *              for (Long count : this.checkpointedCount.get()) {
     *                  this.count = count;
     *              }
     *          }
     *      }
     *
     *      public void snapshotState(FunctionSnapshotContext context) {
     *          this.checkpointedCount.clear();
     *          this.checkpointedCount.add(count);
     *      }
     * }
     * }</pre>
     *
     * @param ctx The context to emit elements to and for accessing locks.
     */
    void run(SourceContext<T> ctx) throws Exception;

    /**
     * Cancels the source. Most sources will have a while loop inside the
     * {@link #run(SourceContext)} method. The implementation needs to ensure that the
     * source will break out of that loop after this method is called.
     *
     * <p>A typical pattern is to have an {@code "volatile boolean isRunning"} flag that is set to
     * {@code false} in this method. That flag is checked in the loop condition.
     *
     * <p>When a source is canceled, the executing thread will also be interrupted
     * (via {@link Thread#interrupt()}). The interruption happens strictly after this
     * method has been called, so any interruption handler can rely on the fact that
     * this method has completed. It is good practice to make any flags altered by
     * this method "volatile", in order to guarantee the visibility of the effects of
     * this method to any interruption handler.
     */
    void cancel();

    // ------------------------------------------------------------------------
    //  source context
    // ------------------------------------------------------------------------

    /**
     * Interface that source functions use to emit elements, and possibly watermarks.
     *
     * @param <T> The type of the elements produced by the source.
     */
    @Public // Interface might be extended in the future with additional methods.
    interface SourceContext<T> {

        //......
    }
}
  • SourceFunction是flink stream data sources的基本接口,這裏頭定義了run方法以及cancel方法,同時定義了SourceContext接口

SourceContext

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.javareact

/**
     * Interface that source functions use to emit elements, and possibly watermarks.
     *
     * @param <T> The type of the elements produced by the source.
     */
    @Public // Interface might be extended in the future with additional methods.
    interface SourceContext<T> {

        /**
         * Emits one element from the source, without attaching a timestamp. In most cases,
         * this is the default way of emitting elements.
         *
         * <p>The timestamp that the element will get assigned depends on the time characteristic of
         * the streaming program:
         * <ul>
         *     <li>On {@link TimeCharacteristic#ProcessingTime}, the element has no timestamp.</li>
         *     <li>On {@link TimeCharacteristic#IngestionTime}, the element gets the system's
         *         current time as the timestamp.</li>
         *     <li>On {@link TimeCharacteristic#EventTime}, the element will have no timestamp initially.
         *         It needs to get a timestamp (via a {@link TimestampAssigner}) before any time-dependent
         *         operation (like time windows).</li>
         * </ul>
         *
         * @param element The element to emit
         */
        void collect(T element);

        /**
         * Emits one element from the source, and attaches the given timestamp. This method
         * is relevant for programs using {@link TimeCharacteristic#EventTime}, where the
         * sources assign timestamps themselves, rather than relying on a {@link TimestampAssigner}
         * on the stream.
         *
         * <p>On certain time characteristics, this timestamp may be ignored or overwritten.
         * This allows programs to switch between the different time characteristics and behaviors
         * without changing the code of the source functions.
         * <ul>
         *     <li>On {@link TimeCharacteristic#ProcessingTime}, the timestamp will be ignored,
         *         because processing time never works with element timestamps.</li>
         *     <li>On {@link TimeCharacteristic#IngestionTime}, the timestamp is overwritten with the
         *         system's current time, to realize proper ingestion time semantics.</li>
         *     <li>On {@link TimeCharacteristic#EventTime}, the timestamp will be used.</li>
         * </ul>
         *
         * @param element The element to emit
         * @param timestamp The timestamp in milliseconds since the Epoch
         */
        @PublicEvolving
        void collectWithTimestamp(T element, long timestamp);

        /**
         * Emits the given {@link Watermark}. A Watermark of value {@code t} declares that no
         * elements with a timestamp {@code t' <= t} will occur any more. If further such
         * elements will be emitted, those elements are considered <i>late</i>.
         *
         * <p>This method is only relevant when running on {@link TimeCharacteristic#EventTime}.
         * On {@link TimeCharacteristic#ProcessingTime},Watermarks will be ignored. On
         * {@link TimeCharacteristic#IngestionTime}, the Watermarks will be replaced by the
         * automatic ingestion time watermarks.
         *
         * @param mark The Watermark to emit
         */
        @PublicEvolving
        void emitWatermark(Watermark mark);

        /**
         * Marks the source to be temporarily idle. This tells the system that this source will
         * temporarily stop emitting records and watermarks for an indefinite amount of time. This
         * is only relevant when running on {@link TimeCharacteristic#IngestionTime} and
         * {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their
         * watermarks without the need to wait for watermarks from this source while it is idle.
         *
         * <p>Source functions should make a best effort to call this method as soon as they
         * acknowledge themselves to be idle. The system will consider the source to resume activity
         * again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)},
         * or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source.
         */
        @PublicEvolving
        void markAsTemporarilyIdle();

        /**
         * Returns the checkpoint lock. Please refer to the class-level comment in
         * {@link SourceFunction} for details about how to write a consistent checkpointed
         * source.
         *
         * @return The object to use as the lock
         */
        Object getCheckpointLock();

        /**
         * This method is called by the system to shut down the context.
         */
        void close();
    }
  • SourceContext主要定義了數據源發射數據的接口,這裏是collect方法(若是數據自己沒有時間,則在使用TimeCharacteristic.EventTime的時候,能夠使用TimestampAssigner在進行依賴時間的相關操做時指定timestamp;若是是配合TimeCharacteristic.IngestionTime,則無需指定,系統會自動生成timestamp);除了collect方法外,還有collectWithTimestamp發射數據同時指定timestamp(配合TimeCharacteristic.EventTime使用)
  • 此外還定義了emitWatermark方法,用於處理數據亂序時,只考慮哪些時間範圍內的數據,這個只有在配合TimeCharacteristic.EventTime的時候纔有效;若是是TimeCharacteristic.ProcessingTime則watermark會被忽略;若是是TimeCharacteristic.IngestionTime則watermark會被自動生成的ingestion time watermarks替代
  • 這裏還定義了markAsTemporarilyIdle方法,用於告訴系統當前的source會暫停發射數據一段時間,這個只在配合使用TimeCharacteristic.IngestionTime或者TimeCharacteristic.EventTime的時候纔有效;當SourceContext.collect(T)或者SourceContext.collectWithTimestamp(T, long)或者SourceContext.emitWatermark(Watermark)被調用時,系統會認爲source又恢復回來繼續生產數據
  • 這裏還定義了getCheckpointLock方法,用於返回checkpoint的lock,方便source處理checkpoint相關的邏輯
  • close方法主要給系統來調用,用於關閉context相關的資源

Task.run(上游)

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.javaapache

/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary for example to consume input data,
 * produce its results (intermediate result partitions) and communicate
 * with the JobManager.
 *
 * <p>The Flink operators (implemented as subclasses of
 * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
 * The task connects those to the network stack and actor messages, and tracks the state
 * of the execution and handles exceptions.
 *
 * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
 * are the first attempt to execute the task, or a repeated attempt. All of that
 * is only known to the JobManager. All the task knows are its own runnable code,
 * the task's configuration, and the IDs of the intermediate results to consume and
 * produce (if any).
 *
 * <p>Each Task is run by one dedicated thread.
 */
public class Task implements Runnable, TaskActions, CheckpointListener {
    //......

    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
            //......
            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;

            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            notifyObservers(ExecutionState.RUNNING, null);
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);

            // run the invokable
            invokable.invoke();

            //......
    }
}
  • Task的run方法會調用invokable.invoke(),這裏的invokable爲StreamTask

StreamTask.invoke

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.javabootstrap

/**
 * Base class for all streaming tasks. A task is the unit of local processing that is deployed
 * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
 * the Task's operator chain. Operators that are chained together execute synchronously in the
 * same thread and hence on the same stream partition. A common case for these chains
 * are successive map/flatmap/filter tasks.
 *
 * <p>The task chain contains one "head" operator and multiple chained operators.
 * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks,
 * as well as for sources, iteration heads and iteration tails.
 *
 * <p>The Task class deals with the setup of the streams read by the head operator, and the streams
 * produced by the operators at the ends of the operator chain. Note that the chain may fork and
 * thus have multiple ends.
 *
 * <p>The life cycle of the task is set up as follows:
 * <pre>{@code
 *  -- setInitialState -> provides state of all operators in the chain
 *
 *  -- invoke()
 *        |
 *        +----> Create basic utils (config, etc) and load the chain of operators
 *        +----> operators.setup()
 *        +----> task specific init()
 *        +----> initialize-operator-states()
 *        +----> open-operators()
 *        +----> run()
 *        +----> close-operators()
 *        +----> dispose-operators()
 *        +----> common cleanup
 *        +----> task specific cleanup()
 * }</pre>
 *
 * <p>The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
 * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods
 * are called concurrently.
 *
 * @param <OUT>
 * @param <OP>
 */
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
        extends AbstractInvokable
        implements AsyncExceptionHandler {

        //......

    @Override
    public final void invoke() throws Exception {

        boolean disposed = false;
        try {
            //......

            // let the task do its work
            isRunning = true;
            run();

            // if this left the run() method cleanly despite the fact that this was canceled,
            // make sure the "clean shutdown" is not attempted
            if (canceled) {
                throw new CancelTaskException();
            }

            LOG.debug("Finished task {}", getName());

            //......
        }
        finally {
            // clean up everything we initialized
            isRunning = false;

            //......
        }
    }
}
  • StreamTask的invoke方法裏頭調用了子類的run方法,這裏子類爲SourceStreamTask

SourceStreamTask.run

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.javac#

@Override
    protected void run() throws Exception {
        headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
    }
  • SourceStreamTask的run方法主要調用headOperator的run方法,這裏的headOperator爲SourceStream

SourceStream.run

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.javawindows

public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
        run(lockingObject, streamStatusMaintainer, output);
    }

    public void run(final Object lockingObject,
            final StreamStatusMaintainer streamStatusMaintainer,
            final Output<StreamRecord<OUT>> collector) throws Exception {

        final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();

        final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
        final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
            ? getExecutionConfig().getLatencyTrackingInterval()
            : configuration.getLong(MetricOptions.LATENCY_INTERVAL);

        LatencyMarksEmitter<OUT> latencyEmitter = null;
        if (latencyTrackingInterval > 0) {
            latencyEmitter = new LatencyMarksEmitter<>(
                getProcessingTimeService(),
                collector,
                latencyTrackingInterval,
                this.getOperatorID(),
                getRuntimeContext().getIndexOfThisSubtask());
        }

        final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();

        this.ctx = StreamSourceContexts.getSourceContext(
            timeCharacteristic,
            getProcessingTimeService(),
            lockingObject,
            streamStatusMaintainer,
            collector,
            watermarkInterval,
            -1);

        try {
            userFunction.run(ctx);

            // if we get here, then the user function either exited after being done (finite source)
            // or the function was canceled or stopped. For the finite source case, we should emit
            // a final watermark that indicates that we reached the end of event-time
            if (!isCanceledOrStopped()) {
                ctx.emitWatermark(Watermark.MAX_WATERMARK);
            }
        } finally {
            // make sure that the context is closed in any case
            ctx.close();
            if (latencyEmitter != null) {
                latencyEmitter.close();
            }
        }
    }
  • SourceStream的run方法,這裏先經過StreamSourceContexts.getSourceContext構造SourceFunction.SourceContext,而後調用userFunction的run方法,這裏的userFunction爲RandomWordSource,即用戶自定義的SourceFunction(這裏要注意在調用userFunction.run(ctx)以前,若是latencyTrackingInterval大於0,還建立了LatencyMarksEmitter)

RandomWordSource.run

public class RandomWordSource implements SourceFunction<String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(RandomWordSource.class);

    private volatile boolean isRunning = true;

    private static final String[] words = new String[]{"The", "brown", "fox", "quick", "jump", "sucky", "5dolla"};

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            Thread.sleep(300);
            int rnd = (int) (Math.random() * 10 % words.length);
            LOGGER.info("emit word: {}", words[rnd]);
            ctx.collect(words[rnd]);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}
  • RandomWordSource的run方法會一直循環發射數據

StreamSource.LatencyMarksEmitter

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.javaapi

private static class LatencyMarksEmitter<OUT> {
        private final ScheduledFuture<?> latencyMarkTimer;

        public LatencyMarksEmitter(
                final ProcessingTimeService processingTimeService,
                final Output<StreamRecord<OUT>> output,
                long latencyTrackingInterval,
                final OperatorID operatorId,
                final int subtaskIndex) {

            latencyMarkTimer = processingTimeService.scheduleAtFixedRate(
                new ProcessingTimeCallback() {
                    @Override
                    public void onProcessingTime(long timestamp) throws Exception {
                        try {
                            // ProcessingTimeService callbacks are executed under the checkpointing lock
                            output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex));
                        } catch (Throwable t) {
                            // we catch the Throwables here so that we don't trigger the processing
                            // timer services async exception handler
                            LOG.warn("Error while emitting latency marker.", t);
                        }
                    }
                },
                0L,
                latencyTrackingInterval);
        }

        public void close() {
            latencyMarkTimer.cancel(true);
        }
    }
  • LatencyMarksEmitter是在StreamSource的run方法裏頭,調用userFunction的run方法前建立的(若是latencyTrackingInterval>0的話),這裏的latencyTrackingInterval先調用getExecutionConfig().isLatencyTrackingConfigured()判斷executionConfig是否有配置該值,有配置的話則使用getExecutionConfig().getLatencyTrackingInterval()返回的值,沒有配置的話則使用configuration.getLong(MetricOptions.LATENCY_INTERVAL)返回的值,後者默認是2000L(這裏使用的是後者的配置,即爲2000)
  • LatencyMarksEmitter的構造器裏頭調用processingTimeService.scheduleAtFixedRate方法註冊了一個fixedRate的定時任務,調度間隔爲latencyTrackingInterval
  • 定時任務的處理內容在ProcessingTimeCallback的onProcessTime方法,裏頭調用了output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex))來發送LatencyMarker;這裏的processingTimeService爲SystemProcessingTimeService;這裏的output爲AbstractStreamOperator.CountingOutput

SystemProcessingTimeService.scheduleAtFixedRate

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.javaapp

@Override
    public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
        long nextTimestamp = getCurrentProcessingTime() + initialDelay;

        // we directly try to register the timer and only react to the status on exception
        // that way we save unnecessary volatile accesses for each timer
        try {
            return timerService.scheduleAtFixedRate(
                new RepeatedTriggerTask(status, task, checkpointLock, callback, nextTimestamp, period),
                initialDelay,
                period,
                TimeUnit.MILLISECONDS);
        } catch (RejectedExecutionException e) {
            final int status = this.status.get();
            if (status == STATUS_QUIESCED) {
                return new NeverCompleteFuture(initialDelay);
            }
            else if (status == STATUS_SHUTDOWN) {
                throw new IllegalStateException("Timer service is shut down");
            }
            else {
                // something else happened, so propagate the exception
                throw e;
            }
        }
    }

    @Override
    public long getCurrentProcessingTime() {
        return System.currentTimeMillis();
    }
  • SystemProcessingTimeService的scheduleAtFixedRate方法,實際是委託timerService的scheduleAtFixedRate來執行的,這裏的timerService即ScheduledThreadPoolExecutor,它的corePoolSize爲1,而後它調度的任務是RepeatedTriggerTask

RepeatedTriggerTask

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.javadom

/**
     * Internal task which is repeatedly called by the processing time service.
     */
    private static final class RepeatedTriggerTask implements Runnable {

        private final AtomicInteger serviceStatus;
        private final Object lock;
        private final ProcessingTimeCallback target;
        private final long period;
        private final AsyncExceptionHandler exceptionHandler;

        private long nextTimestamp;

        private RepeatedTriggerTask(
                final AtomicInteger serviceStatus,
                final AsyncExceptionHandler exceptionHandler,
                final Object lock,
                final ProcessingTimeCallback target,
                final long nextTimestamp,
                final long period) {

            this.serviceStatus = Preconditions.checkNotNull(serviceStatus);
            this.lock = Preconditions.checkNotNull(lock);
            this.target = Preconditions.checkNotNull(target);
            this.period = period;
            this.exceptionHandler = Preconditions.checkNotNull(exceptionHandler);

            this.nextTimestamp = nextTimestamp;
        }

        @Override
        public void run() {
            synchronized (lock) {
                try {
                    if (serviceStatus.get() == STATUS_ALIVE) {
                        target.onProcessingTime(nextTimestamp);
                    }

                    nextTimestamp += period;
                } catch (Throwable t) {
                    TimerException asyncException = new TimerException(t);
                    exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", asyncException);
                }
            }
        }
    }
  • RepeatedTriggerTask會在serviceStatus爲STATUS_ALIVE的時候,調用ProcessingTimeCallback的onProcessingTime;這裏的nextTimestamp最初傳進來的是依據getCurrentProcessingTime() + initialDelay來算的,以後不斷累加period

AbstractStreamOperator.CountingOutput.emitLatencyMarker

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java

/**
     * Wrapping {@link Output} that updates metrics on the number of emitted elements.
     */
    public static class CountingOutput<OUT> implements Output<StreamRecord<OUT>> {
        private final Output<StreamRecord<OUT>> output;
        private final Counter numRecordsOut;

        public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) {
            this.output = output;
            this.numRecordsOut = counter;
        }

        @Override
        public void emitWatermark(Watermark mark) {
            output.emitWatermark(mark);
        }

        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            output.emitLatencyMarker(latencyMarker);
        }

        @Override
        public void collect(StreamRecord<OUT> record) {
            numRecordsOut.inc();
            output.collect(record);
        }

        @Override
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
            numRecordsOut.inc();
            output.collect(outputTag, record);
        }

        @Override
        public void close() {
            output.close();
        }
    }
  • 它實際包裝的是RecordWriterOutput

RecordWriterOutput.emitLatencyMarker

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java

/**
 * Implementation of {@link Output} that sends data using a {@link RecordWriter}.
 */
@Internal
public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExposingOutput<StreamRecord<OUT>> {

    private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;

    private SerializationDelegate<StreamElement> serializationDelegate;

    //......

    @Override
    public void emitLatencyMarker(LatencyMarker latencyMarker) {
        serializationDelegate.setInstance(latencyMarker);

        try {
            recordWriter.randomEmit(serializationDelegate);
        }
        catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }
}
  • 這裏的emitLatencyMarker主要調用了StreamRecordWriter的randomEmit(它其實是經過父類RecordWriter來發射),來發射LatencyMarker

RecordWriter

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java

/**
     * This is used to send LatencyMarks to a random target channel.
     */
    public void randomEmit(T record) throws IOException, InterruptedException {
        sendToTarget(record, rng.nextInt(numChannels));
    }

    private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
        RecordSerializer<T> serializer = serializers[targetChannel];

        SerializationResult result = serializer.addRecord(record);

        while (result.isFullBuffer()) {
            if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
                // If this was a full record, we are done. Not breaking
                // out of the loop at this point will lead to another
                // buffer request before breaking out (that would not be
                // a problem per se, but it can lead to stalls in the
                // pipeline).
                if (result.isFullRecord()) {
                    break;
                }
            }
            BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);

            result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
        }
        checkState(!serializer.hasSerializedData(), "All data should be written at once");

        if (flushAlways) {
            targetPartition.flush(targetChannel);
        }
    }
  • RecordWriter的randomEmit就是隨機選擇一個targetChannel,而後進行發送

Task.run(下游)

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary for example to consume input data,
 * produce its results (intermediate result partitions) and communicate
 * with the JobManager.
 *
 * <p>The Flink operators (implemented as subclasses of
 * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
 * The task connects those to the network stack and actor messages, and tracks the state
 * of the execution and handles exceptions.
 *
 * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
 * are the first attempt to execute the task, or a repeated attempt. All of that
 * is only known to the JobManager. All the task knows are its own runnable code,
 * the task's configuration, and the IDs of the intermediate results to consume and
 * produce (if any).
 *
 * <p>Each Task is run by one dedicated thread.
 */
public class Task implements Runnable, TaskActions, CheckpointListener {
    //......

    /**
     * The core work method that bootstraps the task and executes its code.
     */
    @Override
    public void run() {
            //......
            // now load and instantiate the task's invokable code
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------

            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;

            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }

            // notify everyone that we switched to running
            notifyObservers(ExecutionState.RUNNING, null);
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);

            // run the invokable
            invokable.invoke();

            //......
    }
}
  • 下游的Task的run方法會調用invokable.invoke(),這裏的invokable爲OneInputStreamTask

OneInputStreamTask

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java

@Override
    protected void run() throws Exception {
        // cache processor reference on the stack, to make the code more JIT friendly
        final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;

        while (running && inputProcessor.processInput()) {
            // all the work happens in the "processInput" method
        }
    }
  • Task的run方法會調用StreamTask的invoke方法,而invoke方法會調用OneInputStreamTask的run方法這裏主要是不斷循環調用inputProcessor.processInput();這裏的inputProcessor爲StreamInputProcessor

StreamInputProcessor

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java

public boolean processInput() throws Exception {
        if (isFinished) {
            return false;
        }
        if (numRecordsIn == null) {
            try {
                numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
            } catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", e);
                numRecordsIn = new SimpleCounter();
            }
        }

        while (true) {
            if (currentRecordDeserializer != null) {
                DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);

                if (result.isBufferConsumed()) {
                    currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    currentRecordDeserializer = null;
                }

                if (result.isFullRecord()) {
                    StreamElement recordOrMark = deserializationDelegate.getInstance();

                    if (recordOrMark.isWatermark()) {
                        // handle watermark
                        statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
                        continue;
                    } else if (recordOrMark.isStreamStatus()) {
                        // handle stream status
                        statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
                        continue;
                    } else if (recordOrMark.isLatencyMarker()) {
                        // handle latency marker
                        synchronized (lock) {
                            streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
                        }
                        continue;
                    } else {
                        // now we can do the actual processing
                        StreamRecord<IN> record = recordOrMark.asRecord();
                        synchronized (lock) {
                            numRecordsIn.inc();
                            streamOperator.setKeyContextElement1(record);
                            streamOperator.processElement(record);
                        }
                        return true;
                    }
                }
            }

            final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
            if (bufferOrEvent != null) {
                if (bufferOrEvent.isBuffer()) {
                    currentChannel = bufferOrEvent.getChannelIndex();
                    currentRecordDeserializer = recordDeserializers[currentChannel];
                    currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
                }
                else {
                    // Event received
                    final AbstractEvent event = bufferOrEvent.getEvent();
                    if (event.getClass() != EndOfPartitionEvent.class) {
                        throw new IOException("Unexpected event: " + event);
                    }
                }
            }
            else {
                isFinished = true;
                if (!barrierHandler.isEmpty()) {
                    throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
                }
                return false;
            }
        }
    }
  • processInput方法首先調用currentRecordDeserializer.getNextRecord(deserializationDelegate)獲取nextRecord,而後只有當result.isFullRecord()的時候才進行處理
  • 處理的時候會根據StreamElement的不一樣類型進行不一樣處理,主要分爲watermark、streamStatus、latencyMakrker及正常的數據這幾類來處理
  • 若是是正常的數據,則調用streamOperator.processElement(record),這裏的streamOperator爲StreamMap

StreamMap.processElement

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamMap.java

/**
 * A {@link StreamOperator} for executing {@link MapFunction MapFunctions}.
 */
@Internal
public class StreamMap<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

    private static final long serialVersionUID = 1L;

    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        output.collect(element.replace(userFunction.map(element.getValue())));
    }
}
  • 這裏調用了userFunction.map(element.getValue())來進行map操做,這裏的userFunction即爲UpperCaseMapFunc

小結

  • SourceFunction是flink stream data sources的基本接口,這裏頭定義了run方法以及cancel方法,同時定義了SourceContext接口;SourceContext接口主要定義了collect、collectWithTimestamp方法用於發射數據,同時也提供了emitWatermark來發射Watermark
  • 對於數據的發射來講,其調用順序爲Task.run --> StreamTask.invoke --> SourceStreamTask.run --> SourceStream.run --> userFunction.run(ctx)(RandomWordSource.run);SourceStream.run裏頭在調用userFunction.run以前會判斷latencyTrackingInterval是否大於0,若是大於0則會建立LatencyMarksEmitter,它註冊了定時任務來定時回調ProcessingTimeCallback的onProcessingTime方法,來觸發output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex))
  • 這裏至關於下游會收到userFunction.run發送的用戶數據,也會收到定時任務發送的LatencyMarker;下游的調用順序爲Task.run --> StreamTask.invoke --> OneInputStreamTask.run --> StreamInputProcessor.processInput --> statusWatermarkValve.inputWatermark或者statusWatermarkValve.inputStreamStatus或者streamOperator.processLatencyMarker或者streamOperator.processElement;能夠看到StreamInputProcessor.processInput裏頭會根據數據的不一樣類型作不一樣處理,若是是用戶數據,則調用streamOperator.processElement即StreamMap.processElement --> userFunction.map(UpperCaseMapFunc.map)

doc

相關文章
相關標籤/搜索