本文主要研究一下flink的SpoutWrapperhtml
flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/SpoutWrapper.javajava
/** * A {@link SpoutWrapper} wraps an {@link IRichSpout} in order to execute it within a Flink Streaming program. It * takes the spout's output tuples and transforms them into Flink tuples of type {@code OUT} (see * {@link SpoutCollector} for supported types).<br> * <br> * Per default, {@link SpoutWrapper} calls the wrapped spout's {@link IRichSpout#nextTuple() nextTuple()} method in * an infinite loop.<br> * Alternatively, {@link SpoutWrapper} can call {@link IRichSpout#nextTuple() nextTuple()} for a finite number of * times and terminate automatically afterwards (for finite input streams). The number of {@code nextTuple()} calls can * be specified as a certain number of invocations or can be undefined. In the undefined case, {@link SpoutWrapper} * terminates if no record was emitted to the output collector for the first time during a call to * {@link IRichSpout#nextTuple() nextTuple()}.<br> * If the given spout implements {@link FiniteSpout} interface and {@link #numberOfInvocations} is not provided or * is {@code null}, {@link SpoutWrapper} calls {@link IRichSpout#nextTuple() nextTuple()} method until * {@link FiniteSpout#reachedEnd()} returns true. */ public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> implements StoppableFunction { //...... /** The number of {@link IRichSpout#nextTuple()} calls. */ private Integer numberOfInvocations; // do not use int -> null indicates an infinite loop /** * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of * the given {@link IRichSpout spout} a finite number of times. The output type will be one of {@link Tuple0} to * {@link Tuple25} depending on the spout's declared number of attributes. * * @param spout * The {@link IRichSpout spout} to be used. * @param numberOfInvocations * The number of calls to {@link IRichSpout#nextTuple()}. If value is negative, {@link SpoutWrapper} * terminates if no tuple was emitted for the first time. If value is {@code null}, finite invocation is * disabled. * @throws IllegalArgumentException * If the number of declared output attributes is not with range [0;25]. */ public SpoutWrapper(final IRichSpout spout, final Integer numberOfInvocations) throws IllegalArgumentException { this(spout, (Collection<String>) null, numberOfInvocations); } /** * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of * the given {@link IRichSpout spout} in an infinite loop. The output type will be one of {@link Tuple0} to * {@link Tuple25} depending on the spout's declared number of attributes. * * @param spout * The {@link IRichSpout spout} to be used. * @throws IllegalArgumentException * If the number of declared output attributes is not with range [0;25]. */ public SpoutWrapper(final IRichSpout spout) throws IllegalArgumentException { this(spout, (Collection<String>) null, null); } @Override public final void run(final SourceContext<OUT> ctx) throws Exception { final GlobalJobParameters config = super.getRuntimeContext().getExecutionConfig() .getGlobalJobParameters(); StormConfig stormConfig = new StormConfig(); if (config != null) { if (config instanceof StormConfig) { stormConfig = (StormConfig) config; } else { stormConfig.putAll(config.toMap()); } } final TopologyContext stormTopologyContext = WrapperSetupHelper.createTopologyContext( (StreamingRuntimeContext) super.getRuntimeContext(), this.spout, this.name, this.stormTopology, stormConfig); SpoutCollector<OUT> collector = new SpoutCollector<OUT>(this.numberOfAttributes, stormTopologyContext.getThisTaskId(), ctx); this.spout.open(stormConfig, stormTopologyContext, new SpoutOutputCollector(collector)); this.spout.activate(); if (numberOfInvocations == null) { if (this.spout instanceof FiniteSpout) { final FiniteSpout finiteSpout = (FiniteSpout) this.spout; while (this.isRunning && !finiteSpout.reachedEnd()) { finiteSpout.nextTuple(); } } else { while (this.isRunning) { this.spout.nextTuple(); } } } else { int counter = this.numberOfInvocations; if (counter >= 0) { while ((--counter >= 0) && this.isRunning) { this.spout.nextTuple(); } } else { do { collector.tupleEmitted = false; this.spout.nextTuple(); } while (collector.tupleEmitted && this.isRunning); } } } /** * {@inheritDoc} * * <p>Sets the {@link #isRunning} flag to {@code false}. */ @Override public void cancel() { this.isRunning = false; } /** * {@inheritDoc} * * <p>Sets the {@link #isRunning} flag to {@code false}. */ @Override public void stop() { this.isRunning = false; } @Override public void close() throws Exception { this.spout.close(); } }
flink
)的SpoutOutputCollector傳遞給spout,用來收集spout發射的數據flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/SpoutCollector.javaapache
/** * A {@link SpoutCollector} is used by {@link SpoutWrapper} to provided an Storm * compatible output collector to the wrapped spout. It transforms the emitted Storm tuples into * Flink tuples and emits them via the provide {@link SourceContext} object. */ class SpoutCollector<OUT> extends AbstractStormCollector<OUT> implements ISpoutOutputCollector { /** The Flink source context object. */ private final SourceContext<OUT> flinkContext; /** * Instantiates a new {@link SpoutCollector} that emits Flink tuples to the given Flink source context. If the * number of attributes is specified as zero, any output type is supported. If the number of attributes is between 0 * to 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively. * * @param numberOfAttributes * The number of attributes of the emitted tuples. * @param taskId * The ID of the producer task (negative value for unknown). * @param flinkContext * The Flink source context to be used. * @throws UnsupportedOperationException * if the specified number of attributes is greater than 25 */ SpoutCollector(final HashMap<String, Integer> numberOfAttributes, final int taskId, final SourceContext<OUT> flinkContext) throws UnsupportedOperationException { super(numberOfAttributes, taskId); assert (flinkContext != null); this.flinkContext = flinkContext; } @Override protected List<Integer> doEmit(final OUT flinkTuple) { this.flinkContext.collect(flinkTuple); // TODO return null; } @Override public void reportError(final Throwable error) { // not sure, if Flink can support this } @Override public List<Integer> emit(final String streamId, final List<Object> tuple, final Object messageId) { return this.tansformAndEmit(streamId, tuple); } @Override public void emitDirect(final int taskId, final String streamId, final List<Object> tuple, final Object messageId) { throw new UnsupportedOperationException("Direct emit is not supported by Flink"); } public long getPendingCount() { return 0; } }
flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/AbstractStormCollector.javabootstrap
/** * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via {@link #doEmit(Object)} * to the specified output stream. * * @param The * The output stream id. * @param tuple * The Storm tuple to be emitted. * @return the return value of {@link #doEmit(Object)} */ @SuppressWarnings("unchecked") protected final List<Integer> tansformAndEmit(final String streamId, final List<Object> tuple) { List<Integer> taskIds; int numAtt = this.numberOfAttributes.get(streamId); int taskIdIdx = numAtt; if (this.taskId >= 0 && numAtt < 0) { numAtt = 1; taskIdIdx = 0; } if (numAtt >= 0) { assert (tuple.size() == numAtt); Tuple out = this.outputTuple.get(streamId); for (int i = 0; i < numAtt; ++i) { out.setField(tuple.get(i), i); } if (this.taskId >= 0) { out.setField(this.taskId, taskIdIdx); } if (this.split) { this.splitTuple.streamId = streamId; this.splitTuple.value = out; taskIds = doEmit((OUT) this.splitTuple); } else { taskIds = doEmit((OUT) out); } } else { assert (tuple.size() == 1); if (this.split) { this.splitTuple.streamId = streamId; this.splitTuple.value = tuple.get(0); taskIds = doEmit((OUT) this.splitTuple); } else { taskIds = doEmit((OUT) tuple.get(0)); } } this.tupleEmitted = true; return taskIds; }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.javaapi
/** * The Task represents one execution of a parallel subtask on a TaskManager. * A Task wraps a Flink operator (which may be a user function) and * runs it, providing all services necessary for example to consume input data, * produce its results (intermediate result partitions) and communicate * with the JobManager. * * <p>The Flink operators (implemented as subclasses of * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks. * The task connects those to the network stack and actor messages, and tracks the state * of the execution and handles exceptions. * * <p>Tasks have no knowledge about how they relate to other tasks, or whether they * are the first attempt to execute the task, or a repeated attempt. All of that * is only known to the JobManager. All the task knows are its own runnable code, * the task's configuration, and the IDs of the intermediate results to consume and * produce (if any). * * <p>Each Task is run by one dedicated thread. */ public class Task implements Runnable, TaskActions, CheckpointListener { //...... /** * The core work method that bootstraps the task and executes its code. */ @Override public void run() { //...... // now load and instantiate the task's invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running notifyObservers(ExecutionState.RUNNING, null); taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); //...... } }
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.javaapp
/** * Base class for all streaming tasks. A task is the unit of local processing that is deployed * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form * the Task's operator chain. Operators that are chained together execute synchronously in the * same thread and hence on the same stream partition. A common case for these chains * are successive map/flatmap/filter tasks. * * <p>The task chain contains one "head" operator and multiple chained operators. * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks, * as well as for sources, iteration heads and iteration tails. * * <p>The Task class deals with the setup of the streams read by the head operator, and the streams * produced by the operators at the ends of the operator chain. Note that the chain may fork and * thus have multiple ends. * * <p>The life cycle of the task is set up as follows: * <pre>{@code * -- setInitialState -> provides state of all operators in the chain * * -- invoke() * | * +----> Create basic utils (config, etc) and load the chain of operators * +----> operators.setup() * +----> task specific init() * +----> initialize-operator-states() * +----> open-operators() * +----> run() * +----> close-operators() * +----> dispose-operators() * +----> common cleanup * +----> task specific cleanup() * }</pre> * * <p>The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods * are called concurrently. * * @param <OUT> * @param <OP> */ @Internal public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends AbstractInvokable implements AsyncExceptionHandler { //...... @Override public final void invoke() throws Exception { boolean disposed = false; try { //...... // let the task do its work isRunning = true; run(); // if this left the run() method cleanly despite the fact that this was canceled, // make sure the "clean shutdown" is not attempted if (canceled) { throw new CancelTaskException(); } LOG.debug("Finished task {}", getName()); //...... } finally { // clean up everything we initialized isRunning = false; //...... } } }
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.javaide
/** * Stoppable task for executing stoppable streaming sources. * * @param <OUT> Type of the produced elements * @param <SRC> Stoppable source function */ public class StoppableSourceStreamTask<OUT, SRC extends SourceFunction<OUT> & StoppableFunction> extends SourceStreamTask<OUT, SRC, StoppableStreamSource<OUT, SRC>> implements StoppableTask { private volatile boolean stopped; public StoppableSourceStreamTask(Environment environment) { super(environment); } @Override protected void run() throws Exception { if (!stopped) { super.run(); } } @Override public void stop() { stopped = true; if (this.headOperator != null) { this.headOperator.stop(); } } }
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.javaoop
/** * {@link StreamTask} for executing a {@link StreamSource}. * * <p>One important aspect of this is that the checkpointing and the emission of elements must never * occur at the same time. The execution must be serial. This is achieved by having the contract * with the StreamFunction that it must only modify its state or emit elements in * a synchronized block that locks on the lock Object. Also, the modification of the state * and the emission of elements must happen in the same block of code that is protected by the * synchronized block. * * @param <OUT> Type of the output elements of this source. * @param <SRC> Type of the source function for the stream source operator * @param <OP> Type of the stream source operator */ @Internal public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>> extends StreamTask<OUT, OP> { //...... @Override protected void run() throws Exception { headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()); } }
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.javathis
/** * {@link StreamOperator} for streaming sources. * * @param <OUT> Type of the output elements * @param <SRC> Type of the source function of this stream source operator */ @Internal public class StreamSource<OUT, SRC extends SourceFunction<OUT>> extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> { //...... public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception { run(lockingObject, streamStatusMaintainer, output); } public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer, final Output<StreamRecord<OUT>> collector) throws Exception { final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic(); final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(); final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured() ? getExecutionConfig().getLatencyTrackingInterval() : configuration.getLong(MetricOptions.LATENCY_INTERVAL); LatencyMarksEmitter<OUT> latencyEmitter = null; if (latencyTrackingInterval > 0) { latencyEmitter = new LatencyMarksEmitter<>( getProcessingTimeService(), collector, latencyTrackingInterval, this.getOperatorID(), getRuntimeContext().getIndexOfThisSubtask()); } final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(); this.ctx = StreamSourceContexts.getSourceContext( timeCharacteristic, getProcessingTimeService(), lockingObject, streamStatusMaintainer, collector, watermarkInterval, -1); try { userFunction.run(ctx); // if we get here, then the user function either exited after being done (finite source) // or the function was canceled or stopped. For the finite source case, we should emit // a final watermark that indicates that we reached the end of event-time if (!isCanceledOrStopped()) { ctx.emitWatermark(Watermark.MAX_WATERMARK); } } finally { // make sure that the context is closed in any case ctx.close(); if (latencyEmitter != null) { latencyEmitter.close(); } } }
flink
)的SpoutOutputCollector傳遞給spout,用來收集spout發射的數據;以後就是根據numberOfInvocations參數來調用spout.nextTuple()方法來發射數據;numberOfInvocations是控制調用spout的nextTuple的次數,它能夠在建立SpoutWrapper的時候在構造器中設置,若是使用沒有numberOfInvocations參數的構造器,則該值爲null,表示infinite loop它最後調用SpoutCollector.doEmit方法來發射
),針對多個stream的場景,封裝了SplitStreamType的tuple給到doEmit方法;若是隻有一個stream,則僅僅將普通的tuple傳給doEmit方法這裏子類爲StoppableSourceStreamTask
)的run方法,StoppableSourceStreamTask的run方法是直接父類SourceStreamTask來實現的,而它主要是調用了StreamSource的run方法,而StreamSource的run方法調用了userFunction.run(ctx),這裏的userFunction爲SpoutWrapper,從而執行spout的nextTuple的邏輯,經過flink的SpoutCollector進行發射