聊聊flink的SpoutWrapper

本文主要研究一下flink的SpoutWrapperhtml

SpoutWrapper

flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/SpoutWrapper.javajava

/**
 * A {@link SpoutWrapper} wraps an {@link IRichSpout} in order to execute it within a Flink Streaming program. It
 * takes the spout's output tuples and transforms them into Flink tuples of type {@code OUT} (see
 * {@link SpoutCollector} for supported types).<br>
 * <br>
 * Per default, {@link SpoutWrapper} calls the wrapped spout's {@link IRichSpout#nextTuple() nextTuple()} method in
 * an infinite loop.<br>
 * Alternatively, {@link SpoutWrapper} can call {@link IRichSpout#nextTuple() nextTuple()} for a finite number of
 * times and terminate automatically afterwards (for finite input streams). The number of {@code nextTuple()} calls can
 * be specified as a certain number of invocations or can be undefined. In the undefined case, {@link SpoutWrapper}
 * terminates if no record was emitted to the output collector for the first time during a call to
 * {@link IRichSpout#nextTuple() nextTuple()}.<br>
 * If the given spout implements {@link FiniteSpout} interface and {@link #numberOfInvocations} is not provided or
 * is {@code null}, {@link SpoutWrapper} calls {@link IRichSpout#nextTuple() nextTuple()} method until
 * {@link FiniteSpout#reachedEnd()} returns true.
 */
public final class SpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> implements StoppableFunction {
	//......

	/** The number of {@link IRichSpout#nextTuple()} calls. */
	private Integer numberOfInvocations; // do not use int -> null indicates an infinite loop

	/**
	 * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of
	 * the given {@link IRichSpout spout} a finite number of times. The output type will be one of {@link Tuple0} to
	 * {@link Tuple25} depending on the spout's declared number of attributes.
	 *
	 * @param spout
	 *            The {@link IRichSpout spout} to be used.
	 * @param numberOfInvocations
	 *            The number of calls to {@link IRichSpout#nextTuple()}. If value is negative, {@link SpoutWrapper}
	 *            terminates if no tuple was emitted for the first time. If value is {@code null}, finite invocation is
	 *            disabled.
	 * @throws IllegalArgumentException
	 *             If the number of declared output attributes is not with range [0;25].
	 */
	public SpoutWrapper(final IRichSpout spout, final Integer numberOfInvocations)
			throws IllegalArgumentException {
		this(spout, (Collection<String>) null, numberOfInvocations);
	}

	/**
	 * Instantiates a new {@link SpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()} method of
	 * the given {@link IRichSpout spout} in an infinite loop. The output type will be one of {@link Tuple0} to
	 * {@link Tuple25} depending on the spout's declared number of attributes.
	 *
	 * @param spout
	 *            The {@link IRichSpout spout} to be used.
	 * @throws IllegalArgumentException
	 *             If the number of declared output attributes is not with range [0;25].
	 */
	public SpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
		this(spout, (Collection<String>) null, null);
	}

	@Override
	public final void run(final SourceContext<OUT> ctx) throws Exception {
		final GlobalJobParameters config = super.getRuntimeContext().getExecutionConfig()
				.getGlobalJobParameters();
		StormConfig stormConfig = new StormConfig();

		if (config != null) {
			if (config instanceof StormConfig) {
				stormConfig = (StormConfig) config;
			} else {
				stormConfig.putAll(config.toMap());
			}
		}

		final TopologyContext stormTopologyContext = WrapperSetupHelper.createTopologyContext(
				(StreamingRuntimeContext) super.getRuntimeContext(), this.spout, this.name,
				this.stormTopology, stormConfig);

		SpoutCollector<OUT> collector = new SpoutCollector<OUT>(this.numberOfAttributes,
				stormTopologyContext.getThisTaskId(), ctx);

		this.spout.open(stormConfig, stormTopologyContext, new SpoutOutputCollector(collector));
		this.spout.activate();

		if (numberOfInvocations == null) {
			if (this.spout instanceof FiniteSpout) {
				final FiniteSpout finiteSpout = (FiniteSpout) this.spout;

				while (this.isRunning && !finiteSpout.reachedEnd()) {
					finiteSpout.nextTuple();
				}
			} else {
				while (this.isRunning) {
					this.spout.nextTuple();
				}
			}
		} else {
			int counter = this.numberOfInvocations;
			if (counter >= 0) {
				while ((--counter >= 0) && this.isRunning) {
					this.spout.nextTuple();
				}
			} else {
				do {
					collector.tupleEmitted = false;
					this.spout.nextTuple();
				} while (collector.tupleEmitted && this.isRunning);
			}
		}
	}

	/**
	 * {@inheritDoc}
	 *
	 * <p>Sets the {@link #isRunning} flag to {@code false}.
	 */
	@Override
	public void cancel() {
		this.isRunning = false;
	}

	/**
	 * {@inheritDoc}
	 *
	 * <p>Sets the {@link #isRunning} flag to {@code false}.
	 */
	@Override
	public void stop() {
		this.isRunning = false;
	}

	@Override
	public void close() throws Exception {
		this.spout.close();
	}
}
  • SpoutWrapper繼承了RichParallelSourceFunction類,實現了StoppableFunction接口的stop方法
  • SpoutWrapper的run方法建立了flink的SpoutCollector做爲storm的SpoutOutputCollector的構造器參數,以後調用spout的open方法,把包裝了SpoutCollector(flink)的SpoutOutputCollector傳遞給spout,用來收集spout發射的數據
  • 以後就是根據numberOfInvocations參數來調用spout.nextTuple()方法來發射數據;numberOfInvocations是控制調用spout的nextTuple的次數,它能夠在建立SpoutWrapper的時候在構造器中設置,若是使用沒有numberOfInvocations參數的構造器,則該值爲null,表示infinite loop
  • flink對storm的spout有進行封裝,提供了FiniteSpout接口,它有個reachedEnd接口用來判斷數據是否發送完畢,來將storm的spout改造爲finite模式;這裏若是使用的是storm原始的spout,則就是一直循環調用nextTuple方法
  • 若是有設置numberOfInvocations並且大於等於0,則根據指定的次數來調用nextTuple方法;若是該值小於0,則根據collector.tupleEmitted值來判斷是否終止循環

SpoutCollector

flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/SpoutCollector.javaapache

/**
 * A {@link SpoutCollector} is used by {@link SpoutWrapper} to provided an Storm
 * compatible output collector to the wrapped spout. It transforms the emitted Storm tuples into
 * Flink tuples and emits them via the provide {@link SourceContext} object.
 */
class SpoutCollector<OUT> extends AbstractStormCollector<OUT> implements ISpoutOutputCollector {

	/** The Flink source context object. */
	private final SourceContext<OUT> flinkContext;

	/**
	 * Instantiates a new {@link SpoutCollector} that emits Flink tuples to the given Flink source context. If the
	 * number of attributes is specified as zero, any output type is supported. If the number of attributes is between 0
	 * to 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively.
	 *
	 * @param numberOfAttributes
	 *            The number of attributes of the emitted tuples.
	 * @param taskId
	 *            The ID of the producer task (negative value for unknown).
	 * @param flinkContext
	 *            The Flink source context to be used.
	 * @throws UnsupportedOperationException
	 *             if the specified number of attributes is greater than 25
	 */
	SpoutCollector(final HashMap<String, Integer> numberOfAttributes, final int taskId,
			final SourceContext<OUT> flinkContext) throws UnsupportedOperationException {
		super(numberOfAttributes, taskId);
		assert (flinkContext != null);
		this.flinkContext = flinkContext;
	}

	@Override
	protected List<Integer> doEmit(final OUT flinkTuple) {
		this.flinkContext.collect(flinkTuple);
		// TODO
		return null;
	}

	@Override
	public void reportError(final Throwable error) {
		// not sure, if Flink can support this
	}

	@Override
	public List<Integer> emit(final String streamId, final List<Object> tuple, final Object messageId) {
		return this.tansformAndEmit(streamId, tuple);
	}

	@Override
	public void emitDirect(final int taskId, final String streamId, final List<Object> tuple, final Object messageId) {
		throw new UnsupportedOperationException("Direct emit is not supported by Flink");
	}

	public long getPendingCount() {
		return 0;
	}

}
  • SpoutCollector實現了storm的ISpoutOutputCollector接口,實現了該接口定義的emit、emitDirect、getPendingCount、reportError方法;flink目前不支持emitDirect方法,另外getPendingCount也始終返回0,reportError方法是個空操做
  • doEmit裏頭調用flinkContext.collect(flinkTuple)來發射數據,該方法爲protected,主要是給tansformAndEmit調用的
  • tansformAndEmit方法由父類AbstractStormCollector提供

AbstractStormCollector.tansformAndEmit

flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/AbstractStormCollector.javabootstrap

/**
	 * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via {@link #doEmit(Object)}
	 * to the specified output stream.
	 *
	 * @param The
	 *            The output stream id.
	 * @param tuple
	 *            The Storm tuple to be emitted.
	 * @return the return value of {@link #doEmit(Object)}
	 */
	@SuppressWarnings("unchecked")
	protected final List<Integer> tansformAndEmit(final String streamId, final List<Object> tuple) {
		List<Integer> taskIds;

		int numAtt = this.numberOfAttributes.get(streamId);
		int taskIdIdx = numAtt;
		if (this.taskId >= 0 && numAtt < 0) {
			numAtt = 1;
			taskIdIdx = 0;
		}
		if (numAtt >= 0) {
			assert (tuple.size() == numAtt);
			Tuple out = this.outputTuple.get(streamId);
			for (int i = 0; i < numAtt; ++i) {
				out.setField(tuple.get(i), i);
			}
			if (this.taskId >= 0) {
				out.setField(this.taskId, taskIdIdx);
			}
			if (this.split) {
				this.splitTuple.streamId = streamId;
				this.splitTuple.value = out;

				taskIds = doEmit((OUT) this.splitTuple);
			} else {
				taskIds = doEmit((OUT) out);
			}

		} else {
			assert (tuple.size() == 1);
			if (this.split) {
				this.splitTuple.streamId = streamId;
				this.splitTuple.value = tuple.get(0);

				taskIds = doEmit((OUT) this.splitTuple);
			} else {
				taskIds = doEmit((OUT) tuple.get(0));
			}
		}
		this.tupleEmitted = true;

		return taskIds;
	}
  • AbstractStormCollector.tansformAndEmit,這裏主要處理了split的場景,即一個spout declare了多個stream,最後都經過子類SpoutCollector.doEmit來發射數據
  • 若是split爲true,則傳給doEmit方法的是splitTuple,即SplitStreamType,它記錄了streamId及其value
  • 若是split爲false,則傳給doEmit方法的是Tuple類型,即至關於SplitStreamType中的value,相比於SplitStreamType少了streamId信息

Task.run

flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.javaapi

/**
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and
 * runs it, providing all services necessary for example to consume input data,
 * produce its results (intermediate result partitions) and communicate
 * with the JobManager.
 *
 * <p>The Flink operators (implemented as subclasses of
 * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks.
 * The task connects those to the network stack and actor messages, and tracks the state
 * of the execution and handles exceptions.
 *
 * <p>Tasks have no knowledge about how they relate to other tasks, or whether they
 * are the first attempt to execute the task, or a repeated attempt. All of that
 * is only known to the JobManager. All the task knows are its own runnable code,
 * the task's configuration, and the IDs of the intermediate results to consume and
 * produce (if any).
 *
 * <p>Each Task is run by one dedicated thread.
 */
public class Task implements Runnable, TaskActions, CheckpointListener {
	//......

	/**
	 * The core work method that bootstraps the task and executes its code.
	 */
	@Override
	public void run() {
			//......
			// now load and instantiate the task's invokable code
			invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);

			// ----------------------------------------------------------------
			//  actual task core work
			// ----------------------------------------------------------------

			// we must make strictly sure that the invokable is accessible to the cancel() call
			// by the time we switched to running.
			this.invokable = invokable;

			// switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
			if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
				throw new CancelTaskException();
			}

			// notify everyone that we switched to running
			notifyObservers(ExecutionState.RUNNING, null);
			taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));

			// make sure the user code classloader is accessible thread-locally
			executingThread.setContextClassLoader(userCodeClassLoader);

			// run the invokable
			invokable.invoke();

			//......
	}
}
  • Task的run方法會調用invokable.invoke(),這裏的invokable爲StreamTask

StreamTask

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.javaapp

/**
 * Base class for all streaming tasks. A task is the unit of local processing that is deployed
 * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form
 * the Task's operator chain. Operators that are chained together execute synchronously in the
 * same thread and hence on the same stream partition. A common case for these chains
 * are successive map/flatmap/filter tasks.
 *
 * <p>The task chain contains one "head" operator and multiple chained operators.
 * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks,
 * as well as for sources, iteration heads and iteration tails.
 *
 * <p>The Task class deals with the setup of the streams read by the head operator, and the streams
 * produced by the operators at the ends of the operator chain. Note that the chain may fork and
 * thus have multiple ends.
 *
 * <p>The life cycle of the task is set up as follows:
 * <pre>{@code
 *  -- setInitialState -> provides state of all operators in the chain
 *
 *  -- invoke()
 *        |
 *        +----> Create basic utils (config, etc) and load the chain of operators
 *        +----> operators.setup()
 *        +----> task specific init()
 *        +----> initialize-operator-states()
 *        +----> open-operators()
 *        +----> run()
 *        +----> close-operators()
 *        +----> dispose-operators()
 *        +----> common cleanup
 *        +----> task specific cleanup()
 * }</pre>
 *
 * <p>The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a
 * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods
 * are called concurrently.
 *
 * @param <OUT>
 * @param <OP>
 */
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
		extends AbstractInvokable
		implements AsyncExceptionHandler {

		//......

	@Override
	public final void invoke() throws Exception {

		boolean disposed = false;
		try {
			//......

			// let the task do its work
			isRunning = true;
			run();

			// if this left the run() method cleanly despite the fact that this was canceled,
			// make sure the "clean shutdown" is not attempted
			if (canceled) {
				throw new CancelTaskException();
			}

			LOG.debug("Finished task {}", getName());

			//......
		}
		finally {
			// clean up everything we initialized
			isRunning = false;

			//......
		}
	}
}
  • StreamTask的invoke方法裏頭調用子類的run方法,這裏子類爲StoppableSourceStreamTask

StoppableSourceStreamTask

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StoppableSourceStreamTask.javaide

/**
 * Stoppable task for executing stoppable streaming sources.
 *
 * @param <OUT> Type of the produced elements
 * @param <SRC> Stoppable source function
 */
public class StoppableSourceStreamTask<OUT, SRC extends SourceFunction<OUT> & StoppableFunction>
	extends SourceStreamTask<OUT, SRC, StoppableStreamSource<OUT, SRC>> implements StoppableTask {

	private volatile boolean stopped;

	public StoppableSourceStreamTask(Environment environment) {
		super(environment);
	}

	@Override
	protected void run() throws Exception {
		if (!stopped) {
			super.run();
		}
	}

	@Override
	public void stop() {
		stopped = true;
		if (this.headOperator != null) {
			this.headOperator.stop();
		}
	}
}
  • StoppableSourceStreamTask繼承了SourceStreamTask,主要是實現了StoppableTask的stop方法,它的run方法由其直接父類SourceStreamTask來實現

SourceStreamTask

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.javaoop

/**
 * {@link StreamTask} for executing a {@link StreamSource}.
 *
 * <p>One important aspect of this is that the checkpointing and the emission of elements must never
 * occur at the same time. The execution must be serial. This is achieved by having the contract
 * with the StreamFunction that it must only modify its state or emit elements in
 * a synchronized block that locks on the lock Object. Also, the modification of the state
 * and the emission of elements must happen in the same block of code that is protected by the
 * synchronized block.
 *
 * @param <OUT> Type of the output elements of this source.
 * @param <SRC> Type of the source function for the stream source operator
 * @param <OP> Type of the stream source operator
 */
@Internal
public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>>
	extends StreamTask<OUT, OP> {

	//......

	@Override
	protected void run() throws Exception {
		headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
	}
}
  • SourceStreamTask主要是調用StreamSource的run方法

StreamSource

flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.javathis

/**
 * {@link StreamOperator} for streaming sources.
 *
 * @param <OUT> Type of the output elements
 * @param <SRC> Type of the source function of this stream source operator
 */
@Internal
public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
		extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {

	//......	

	public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
		run(lockingObject, streamStatusMaintainer, output);
	}

	public void run(final Object lockingObject,
			final StreamStatusMaintainer streamStatusMaintainer,
			final Output<StreamRecord<OUT>> collector) throws Exception {

		final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();

		final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
		final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()
			? getExecutionConfig().getLatencyTrackingInterval()
			: configuration.getLong(MetricOptions.LATENCY_INTERVAL);

		LatencyMarksEmitter<OUT> latencyEmitter = null;
		if (latencyTrackingInterval > 0) {
			latencyEmitter = new LatencyMarksEmitter<>(
				getProcessingTimeService(),
				collector,
				latencyTrackingInterval,
				this.getOperatorID(),
				getRuntimeContext().getIndexOfThisSubtask());
		}

		final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();

		this.ctx = StreamSourceContexts.getSourceContext(
			timeCharacteristic,
			getProcessingTimeService(),
			lockingObject,
			streamStatusMaintainer,
			collector,
			watermarkInterval,
			-1);

		try {
			userFunction.run(ctx);

			// if we get here, then the user function either exited after being done (finite source)
			// or the function was canceled or stopped. For the finite source case, we should emit
			// a final watermark that indicates that we reached the end of event-time
			if (!isCanceledOrStopped()) {
				ctx.emitWatermark(Watermark.MAX_WATERMARK);
			}
		} finally {
			// make sure that the context is closed in any case
			ctx.close();
			if (latencyEmitter != null) {
				latencyEmitter.close();
			}
		}
	}
  • 它調用了userFunction.run(ctx),這裏的userFunction爲SpoutWrapper,從而完成spout的nextTuple的觸發

小結

  • flink使用SpoutWrapper來包裝storm原始的spout,它在run方法裏頭建立了flink的SpoutCollector做爲storm的SpoutOutputCollector的構造器參數,以後調用spout的open方法,把包裝了SpoutCollector(flink)的SpoutOutputCollector傳遞給spout,用來收集spout發射的數據;以後就是根據numberOfInvocations參數來調用spout.nextTuple()方法來發射數據;numberOfInvocations是控制調用spout的nextTuple的次數,它能夠在建立SpoutWrapper的時候在構造器中設置,若是使用沒有numberOfInvocations參數的構造器,則該值爲null,表示infinite loop
  • SpoutCollector的emit方法內部調用了AbstractStormCollector.tansformAndEmit(它最後調用SpoutCollector.doEmit方法來發射),針對多個stream的場景,封裝了SplitStreamType的tuple給到doEmit方法;若是隻有一個stream,則僅僅將普通的tuple傳給doEmit方法
  • flink的Task的run方法會調用StreamTask的invoke方法,而StreamTask的invoke方法會調用子類(這裏子類爲StoppableSourceStreamTask)的run方法,StoppableSourceStreamTask的run方法是直接父類SourceStreamTask來實現的,而它主要是調用了StreamSource的run方法,而StreamSource的run方法調用了userFunction.run(ctx),這裏的userFunction爲SpoutWrapper,從而執行spout的nextTuple的邏輯,經過flink的SpoutCollector進行發射

doc

相關文章
相關標籤/搜索