本文主要研究一下flink的BoltWrapperhtml
flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/BoltWrapper.javajava
/** * A {@link BoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming program. * It takes the Flink input tuples of type {@code IN} and transforms them into {@link StormTuple}s that the bolt can * process. Furthermore, it takes the bolt's output tuples and transforms them into Flink tuples of type {@code OUT} * (see {@link AbstractStormCollector} for supported types).<br/> * <br/> * <strong>Works for single input streams only! See {@link MergedInputsBoltWrapper} for multi-input stream * Bolts.</strong> */ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> { @Override public void open() throws Exception { super.open(); this.flinkCollector = new TimestampedCollector<>(this.output); GlobalJobParameters config = getExecutionConfig().getGlobalJobParameters(); StormConfig stormConfig = new StormConfig(); if (config != null) { if (config instanceof StormConfig) { stormConfig = (StormConfig) config; } else { stormConfig.putAll(config.toMap()); } } this.topologyContext = WrapperSetupHelper.createTopologyContext( getRuntimeContext(), this.bolt, this.name, this.stormTopology, stormConfig); final OutputCollector stormCollector = new OutputCollector(new BoltCollector<OUT>( this.numberOfAttributes, this.topologyContext.getThisTaskId(), this.flinkCollector)); if (this.stormTopology != null) { Map<GlobalStreamId, Grouping> inputs = this.topologyContext.getThisSources(); for (GlobalStreamId inputStream : inputs.keySet()) { for (Integer tid : this.topologyContext.getComponentTasks(inputStream .get_componentId())) { this.inputComponentIds.put(tid, inputStream.get_componentId()); this.inputStreamIds.put(tid, inputStream.get_streamId()); this.inputSchemas.put(tid, this.topologyContext.getComponentOutputFields(inputStream)); } } } this.bolt.prepare(stormConfig, this.topologyContext, stormCollector); } @Override public void dispose() throws Exception { super.dispose(); this.bolt.cleanup(); } @Override public void processElement(final StreamRecord<IN> element) throws Exception { this.flinkCollector.setTimestamp(element); IN value = element.getValue(); if (this.stormTopology != null) { Tuple tuple = (Tuple) value; Integer producerTaskId = tuple.getField(tuple.getArity() - 1); this.bolt.execute(new StormTuple<>(value, this.inputSchemas.get(producerTaskId), producerTaskId, this.inputStreamIds.get(producerTaskId), this.inputComponentIds .get(producerTaskId), MessageId.makeUnanchored())); } else { this.bolt.execute(new StormTuple<>(value, this.inputSchemas.get(null), -1, null, null, MessageId.makeUnanchored())); } } }
flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/BoltCollector.javaapache
/** * A {@link BoltCollector} is used by {@link BoltWrapper} to provided an Storm compatible * output collector to the wrapped bolt. It transforms the emitted Storm tuples into Flink tuples * and emits them via the provide {@link Output} object. */ class BoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOutputCollector { /** The Flink output Collector. */ private final Collector<OUT> flinkOutput; /** * Instantiates a new {@link BoltCollector} that emits Flink tuples to the given Flink output object. If the * number of attributes is negative, any output type is supported (ie, raw type). If the number of attributes is * between 0 and 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively. * * @param numberOfAttributes * The number of attributes of the emitted tuples per output stream. * @param taskId * The ID of the producer task (negative value for unknown). * @param flinkOutput * The Flink output object to be used. * @throws UnsupportedOperationException * if the specified number of attributes is greater than 25 */ BoltCollector(final HashMap<String, Integer> numberOfAttributes, final int taskId, final Collector<OUT> flinkOutput) throws UnsupportedOperationException { super(numberOfAttributes, taskId); assert (flinkOutput != null); this.flinkOutput = flinkOutput; } @Override protected List<Integer> doEmit(final OUT flinkTuple) { this.flinkOutput.collect(flinkTuple); // TODO return null; } @Override public void reportError(final Throwable error) { // not sure, if Flink can support this } @Override public List<Integer> emit(final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) { return this.tansformAndEmit(streamId, tuple); } @Override public void emitDirect(final int taskId, final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) { throw new UnsupportedOperationException("Direct emit is not supported by Flink"); } @Override public void ack(final Tuple input) {} @Override public void fail(final Tuple input) {} @Override public void resetTimeout(Tuple var1) {} }
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/TimestampedCollector.javabootstrap
/** * Wrapper around an {@link Output} for user functions that expect a {@link Collector}. * Before giving the {@link TimestampedCollector} to a user function you must set * the timestamp that should be attached to emitted elements. Most operators * would set the timestamp of the incoming * {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord} here. * * @param <T> The type of the elements that can be emitted. */ @Internal public class TimestampedCollector<T> implements Collector<T> { private final Output<StreamRecord<T>> output; private final StreamRecord<T> reuse; /** * Creates a new {@link TimestampedCollector} that wraps the given {@link Output}. */ public TimestampedCollector(Output<StreamRecord<T>> output) { this.output = output; this.reuse = new StreamRecord<T>(null); } @Override public void collect(T record) { output.collect(reuse.replace(record)); } public void setTimestamp(StreamRecord<?> timestampBase) { if (timestampBase.hasTimestamp()) { reuse.setTimestamp(timestampBase.getTimestamp()); } else { reuse.eraseTimestamp(); } } public void setAbsoluteTimestamp(long timestamp) { reuse.setTimestamp(timestamp); } public void eraseTimestamp() { reuse.eraseTimestamp(); } @Override public void close() { output.close(); } }
flink-storm_2.11-1.6.2-sources.jar!/org/apache/flink/storm/wrappers/AbstractStormCollector.javaapi
/** * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via {@link #doEmit(Object)} * to the specified output stream. * * @param The * The output stream id. * @param tuple * The Storm tuple to be emitted. * @return the return value of {@link #doEmit(Object)} */ @SuppressWarnings("unchecked") protected final List<Integer> tansformAndEmit(final String streamId, final List<Object> tuple) { List<Integer> taskIds; int numAtt = this.numberOfAttributes.get(streamId); int taskIdIdx = numAtt; if (this.taskId >= 0 && numAtt < 0) { numAtt = 1; taskIdIdx = 0; } if (numAtt >= 0) { assert (tuple.size() == numAtt); Tuple out = this.outputTuple.get(streamId); for (int i = 0; i < numAtt; ++i) { out.setField(tuple.get(i), i); } if (this.taskId >= 0) { out.setField(this.taskId, taskIdIdx); } if (this.split) { this.splitTuple.streamId = streamId; this.splitTuple.value = out; taskIds = doEmit((OUT) this.splitTuple); } else { taskIds = doEmit((OUT) out); } } else { assert (tuple.size() == 1); if (this.split) { this.splitTuple.streamId = streamId; this.splitTuple.value = tuple.get(0); taskIds = doEmit((OUT) this.splitTuple); } else { taskIds = doEmit((OUT) tuple.get(0)); } } this.tupleEmitted = true; return taskIds; }
flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.javaapp
/** * The Task represents one execution of a parallel subtask on a TaskManager. * A Task wraps a Flink operator (which may be a user function) and * runs it, providing all services necessary for example to consume input data, * produce its results (intermediate result partitions) and communicate * with the JobManager. * * <p>The Flink operators (implemented as subclasses of * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks. * The task connects those to the network stack and actor messages, and tracks the state * of the execution and handles exceptions. * * <p>Tasks have no knowledge about how they relate to other tasks, or whether they * are the first attempt to execute the task, or a repeated attempt. All of that * is only known to the JobManager. All the task knows are its own runnable code, * the task's configuration, and the IDs of the intermediate results to consume and * produce (if any). * * <p>Each Task is run by one dedicated thread. */ public class Task implements Runnable, TaskActions, CheckpointListener { //...... /** * The core work method that bootstraps the task and executes its code. */ @Override public void run() { //...... // now load and instantiate the task's invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running notifyObservers(ExecutionState.RUNNING, null); taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); //...... } }
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.javaide
/** * Base class for all streaming tasks. A task is the unit of local processing that is deployed * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form * the Task's operator chain. Operators that are chained together execute synchronously in the * same thread and hence on the same stream partition. A common case for these chains * are successive map/flatmap/filter tasks. * * <p>The task chain contains one "head" operator and multiple chained operators. * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks, * as well as for sources, iteration heads and iteration tails. * * <p>The Task class deals with the setup of the streams read by the head operator, and the streams * produced by the operators at the ends of the operator chain. Note that the chain may fork and * thus have multiple ends. * * <p>The life cycle of the task is set up as follows: * <pre>{@code * -- setInitialState -> provides state of all operators in the chain * * -- invoke() * | * +----> Create basic utils (config, etc) and load the chain of operators * +----> operators.setup() * +----> task specific init() * +----> initialize-operator-states() * +----> open-operators() * +----> run() * +----> close-operators() * +----> dispose-operators() * +----> common cleanup * +----> task specific cleanup() * }</pre> * * <p>The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods * are called concurrently. * * @param <OUT> * @param <OP> */ @Internal public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends AbstractInvokable implements AsyncExceptionHandler { //...... @Override public final void invoke() throws Exception { boolean disposed = false; try { //...... // let the task do its work isRunning = true; run(); // if this left the run() method cleanly despite the fact that this was canceled, // make sure the "clean shutdown" is not attempted if (canceled) { throw new CancelTaskException(); } LOG.debug("Finished task {}", getName()); //...... } finally { // clean up everything we initialized isRunning = false; //...... } } }
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.javathis
@Override protected void run() throws Exception { // cache processor reference on the stack, to make the code more JIT friendly final StreamInputProcessor<IN> inputProcessor = this.inputProcessor; while (running && inputProcessor.processInput()) { // all the work happens in the "processInput" method } }
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/StreamInputProcessor.javadebug
public boolean processInput() throws Exception { if (isFinished) { return false; } if (numRecordsIn == null) { try { numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); } catch (Exception e) { LOG.warn("An exception occurred during the metrics setup.", e); numRecordsIn = new SimpleCounter(); } } while (true) { if (currentRecordDeserializer != null) { DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate); if (result.isBufferConsumed()) { currentRecordDeserializer.getCurrentBuffer().recycleBuffer(); currentRecordDeserializer = null; } if (result.isFullRecord()) { StreamElement recordOrMark = deserializationDelegate.getInstance(); if (recordOrMark.isWatermark()) { // handle watermark statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel); continue; } else if (recordOrMark.isStreamStatus()) { // handle stream status statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel); continue; } else if (recordOrMark.isLatencyMarker()) { // handle latency marker synchronized (lock) { streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker()); } continue; } else { // now we can do the actual processing StreamRecord<IN> record = recordOrMark.asRecord(); synchronized (lock) { numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); streamOperator.processElement(record); } return true; } } } //...... } }
它最後調用BoltCollector.doEmit方法來發射
),針對多個stream的場景,封裝了SplitStreamType的tuple給到doEmit方法;若是隻有一個stream,則僅僅將普通的tuple傳給doEmit方法這裏子類爲OneInputStreamTask
)的run方法,OneInputStreamTask的run方法是不斷循環調用inputProcessor.processInput(),這裏的inputProcessor爲StreamInputProcessor,它的processInput()會調用currentRecordDeserializer.getNextRecord(deserializationDelegate)獲取nextRecord,以後根據條件選擇調用streamOperator.processElement(record)方法,這裏的streamOperator爲BoltWrapper,而BoltWrapper的processElement正好調用storm bolt的execute方法來執行bolt邏輯並使用flink的BoltCollector進行發射