本文主要研究一下flink的SourceFunctionhtml
// set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource = env.addSource(new RandomWordSource()); dataStreamSource.map(new UpperCaseMapFunc()).print(); env.execute("sourceFunctionDemo");
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.javajava
/** * Base interface for all stream data sources in Flink. The contract of a stream source * is the following: When the source should start emitting elements, the {@link #run} method * is called with a {@link SourceContext} that can be used for emitting elements. * The run method can run for as long as necessary. The source must, however, react to an * invocation of {@link #cancel()} by breaking out of its main loop. * * <h3>CheckpointedFunction Sources</h3> * * <p>Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction} * interface must ensure that state checkpointing, updating of internal state and emission of * elements are not done concurrently. This is achieved by using the provided checkpointing lock * object to protect update of state and emission of elements in a synchronized block. * * <p>This is the basic pattern one should follow when implementing a checkpointed source: * * <pre>{@code * public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction { * private long count = 0L; * private volatile boolean isRunning = true; * * private transient ListState<Long> checkpointedCount; * * public void run(SourceContext<T> ctx) { * while (isRunning && count < 1000) { * // this synchronized block ensures that state checkpointing, * // internal state updates and emission of elements are an atomic operation * synchronized (ctx.getCheckpointLock()) { * ctx.collect(count); * count++; * } * } * } * * public void cancel() { * isRunning = false; * } * * public void initializeState(FunctionInitializationContext context) { * this.checkpointedCount = context * .getOperatorStateStore() * .getListState(new ListStateDescriptor<>("count", Long.class)); * * if (context.isRestored()) { * for (Long count : this.checkpointedCount.get()) { * this.count = count; * } * } * } * * public void snapshotState(FunctionSnapshotContext context) { * this.checkpointedCount.clear(); * this.checkpointedCount.add(count); * } * } * }</pre> * * * <h3>Timestamps and watermarks:</h3> * Sources may assign timestamps to elements and may manually emit watermarks. * However, these are only interpreted if the streaming program runs on * {@link TimeCharacteristic#EventTime}. On other time characteristics * ({@link TimeCharacteristic#IngestionTime} and {@link TimeCharacteristic#ProcessingTime}), * the watermarks from the source function are ignored. * * <h3>Gracefully Stopping Functions</h3> * Functions may additionally implement the {@link org.apache.flink.api.common.functions.StoppableFunction} * interface. "Stopping" a function, in contrast to "canceling" means a graceful exit that leaves the * state and the emitted elements in a consistent state. * * <p>When a source is stopped, the executing thread is not interrupted, but expected to leave the * {@link #run(SourceContext)} method in reasonable time on its own, preserving the atomicity * of state updates and element emission. * * @param <T> The type of the elements produced by this source. * * @see org.apache.flink.api.common.functions.StoppableFunction * @see org.apache.flink.streaming.api.TimeCharacteristic */ @Public public interface SourceFunction<T> extends Function, Serializable { /** * Starts the source. Implementations can use the {@link SourceContext} emit * elements. * * <p>Sources that implement {@link org.apache.flink.streaming.api.checkpoint.CheckpointedFunction} * must lock on the checkpoint lock (using a synchronized block) before updating internal * state and emitting elements, to make both an atomic operation: * * <pre>{@code * public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction { * private long count = 0L; * private volatile boolean isRunning = true; * * private transient ListState<Long> checkpointedCount; * * public void run(SourceContext<T> ctx) { * while (isRunning && count < 1000) { * // this synchronized block ensures that state checkpointing, * // internal state updates and emission of elements are an atomic operation * synchronized (ctx.getCheckpointLock()) { * ctx.collect(count); * count++; * } * } * } * * public void cancel() { * isRunning = false; * } * * public void initializeState(FunctionInitializationContext context) { * this.checkpointedCount = context * .getOperatorStateStore() * .getListState(new ListStateDescriptor<>("count", Long.class)); * * if (context.isRestored()) { * for (Long count : this.checkpointedCount.get()) { * this.count = count; * } * } * } * * public void snapshotState(FunctionSnapshotContext context) { * this.checkpointedCount.clear(); * this.checkpointedCount.add(count); * } * } * }</pre> * * @param ctx The context to emit elements to and for accessing locks. */ void run(SourceContext<T> ctx) throws Exception; /** * Cancels the source. Most sources will have a while loop inside the * {@link #run(SourceContext)} method. The implementation needs to ensure that the * source will break out of that loop after this method is called. * * <p>A typical pattern is to have an {@code "volatile boolean isRunning"} flag that is set to * {@code false} in this method. That flag is checked in the loop condition. * * <p>When a source is canceled, the executing thread will also be interrupted * (via {@link Thread#interrupt()}). The interruption happens strictly after this * method has been called, so any interruption handler can rely on the fact that * this method has completed. It is good practice to make any flags altered by * this method "volatile", in order to guarantee the visibility of the effects of * this method to any interruption handler. */ void cancel(); // ------------------------------------------------------------------------ // source context // ------------------------------------------------------------------------ /** * Interface that source functions use to emit elements, and possibly watermarks. * * @param <T> The type of the elements produced by the source. */ @Public // Interface might be extended in the future with additional methods. interface SourceContext<T> { //...... } }
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/functions/source/SourceFunction.javareact
/** * Interface that source functions use to emit elements, and possibly watermarks. * * @param <T> The type of the elements produced by the source. */ @Public // Interface might be extended in the future with additional methods. interface SourceContext<T> { /** * Emits one element from the source, without attaching a timestamp. In most cases, * this is the default way of emitting elements. * * <p>The timestamp that the element will get assigned depends on the time characteristic of * the streaming program: * <ul> * <li>On {@link TimeCharacteristic#ProcessingTime}, the element has no timestamp.</li> * <li>On {@link TimeCharacteristic#IngestionTime}, the element gets the system's * current time as the timestamp.</li> * <li>On {@link TimeCharacteristic#EventTime}, the element will have no timestamp initially. * It needs to get a timestamp (via a {@link TimestampAssigner}) before any time-dependent * operation (like time windows).</li> * </ul> * * @param element The element to emit */ void collect(T element); /** * Emits one element from the source, and attaches the given timestamp. This method * is relevant for programs using {@link TimeCharacteristic#EventTime}, where the * sources assign timestamps themselves, rather than relying on a {@link TimestampAssigner} * on the stream. * * <p>On certain time characteristics, this timestamp may be ignored or overwritten. * This allows programs to switch between the different time characteristics and behaviors * without changing the code of the source functions. * <ul> * <li>On {@link TimeCharacteristic#ProcessingTime}, the timestamp will be ignored, * because processing time never works with element timestamps.</li> * <li>On {@link TimeCharacteristic#IngestionTime}, the timestamp is overwritten with the * system's current time, to realize proper ingestion time semantics.</li> * <li>On {@link TimeCharacteristic#EventTime}, the timestamp will be used.</li> * </ul> * * @param element The element to emit * @param timestamp The timestamp in milliseconds since the Epoch */ @PublicEvolving void collectWithTimestamp(T element, long timestamp); /** * Emits the given {@link Watermark}. A Watermark of value {@code t} declares that no * elements with a timestamp {@code t' <= t} will occur any more. If further such * elements will be emitted, those elements are considered <i>late</i>. * * <p>This method is only relevant when running on {@link TimeCharacteristic#EventTime}. * On {@link TimeCharacteristic#ProcessingTime},Watermarks will be ignored. On * {@link TimeCharacteristic#IngestionTime}, the Watermarks will be replaced by the * automatic ingestion time watermarks. * * @param mark The Watermark to emit */ @PublicEvolving void emitWatermark(Watermark mark); /** * Marks the source to be temporarily idle. This tells the system that this source will * temporarily stop emitting records and watermarks for an indefinite amount of time. This * is only relevant when running on {@link TimeCharacteristic#IngestionTime} and * {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their * watermarks without the need to wait for watermarks from this source while it is idle. * * <p>Source functions should make a best effort to call this method as soon as they * acknowledge themselves to be idle. The system will consider the source to resume activity * again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)}, * or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source. */ @PublicEvolving void markAsTemporarilyIdle(); /** * Returns the checkpoint lock. Please refer to the class-level comment in * {@link SourceFunction} for details about how to write a consistent checkpointed * source. * * @return The object to use as the lock */ Object getCheckpointLock(); /** * This method is called by the system to shut down the context. */ void close(); }
若是數據自己沒有時間,則在使用TimeCharacteristic.EventTime的時候,能夠使用TimestampAssigner在進行依賴時間的相關操做時指定timestamp;若是是配合TimeCharacteristic.IngestionTime,則無需指定,系統會自動生成timestamp
);除了collect方法外,還有collectWithTimestamp發射數據同時指定timestamp(配合TimeCharacteristic.EventTime使用
)上游
)flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.javaapache
/** * The Task represents one execution of a parallel subtask on a TaskManager. * A Task wraps a Flink operator (which may be a user function) and * runs it, providing all services necessary for example to consume input data, * produce its results (intermediate result partitions) and communicate * with the JobManager. * * <p>The Flink operators (implemented as subclasses of * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks. * The task connects those to the network stack and actor messages, and tracks the state * of the execution and handles exceptions. * * <p>Tasks have no knowledge about how they relate to other tasks, or whether they * are the first attempt to execute the task, or a repeated attempt. All of that * is only known to the JobManager. All the task knows are its own runnable code, * the task's configuration, and the IDs of the intermediate results to consume and * produce (if any). * * <p>Each Task is run by one dedicated thread. */ public class Task implements Runnable, TaskActions, CheckpointListener { //...... /** * The core work method that bootstraps the task and executes its code. */ @Override public void run() { //...... // now load and instantiate the task's invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running notifyObservers(ExecutionState.RUNNING, null); taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); //...... } }
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.javabootstrap
/** * Base class for all streaming tasks. A task is the unit of local processing that is deployed * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form * the Task's operator chain. Operators that are chained together execute synchronously in the * same thread and hence on the same stream partition. A common case for these chains * are successive map/flatmap/filter tasks. * * <p>The task chain contains one "head" operator and multiple chained operators. * The StreamTask is specialized for the type of the head operator: one-input and two-input tasks, * as well as for sources, iteration heads and iteration tails. * * <p>The Task class deals with the setup of the streams read by the head operator, and the streams * produced by the operators at the ends of the operator chain. Note that the chain may fork and * thus have multiple ends. * * <p>The life cycle of the task is set up as follows: * <pre>{@code * -- setInitialState -> provides state of all operators in the chain * * -- invoke() * | * +----> Create basic utils (config, etc) and load the chain of operators * +----> operators.setup() * +----> task specific init() * +----> initialize-operator-states() * +----> open-operators() * +----> run() * +----> close-operators() * +----> dispose-operators() * +----> common cleanup * +----> task specific cleanup() * }</pre> * * <p>The {@code StreamTask} has a lock object called {@code lock}. All calls to methods on a * {@code StreamOperator} must be synchronized on this lock object to ensure that no methods * are called concurrently. * * @param <OUT> * @param <OP> */ @Internal public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends AbstractInvokable implements AsyncExceptionHandler { //...... @Override public final void invoke() throws Exception { boolean disposed = false; try { //...... // let the task do its work isRunning = true; run(); // if this left the run() method cleanly despite the fact that this was canceled, // make sure the "clean shutdown" is not attempted if (canceled) { throw new CancelTaskException(); } LOG.debug("Finished task {}", getName()); //...... } finally { // clean up everything we initialized isRunning = false; //...... } } }
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.javac#
@Override protected void run() throws Exception { headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()); }
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.javawindows
public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception { run(lockingObject, streamStatusMaintainer, output); } public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer, final Output<StreamRecord<OUT>> collector) throws Exception { final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic(); final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(); final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured() ? getExecutionConfig().getLatencyTrackingInterval() : configuration.getLong(MetricOptions.LATENCY_INTERVAL); LatencyMarksEmitter<OUT> latencyEmitter = null; if (latencyTrackingInterval > 0) { latencyEmitter = new LatencyMarksEmitter<>( getProcessingTimeService(), collector, latencyTrackingInterval, this.getOperatorID(), getRuntimeContext().getIndexOfThisSubtask()); } final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval(); this.ctx = StreamSourceContexts.getSourceContext( timeCharacteristic, getProcessingTimeService(), lockingObject, streamStatusMaintainer, collector, watermarkInterval, -1); try { userFunction.run(ctx); // if we get here, then the user function either exited after being done (finite source) // or the function was canceled or stopped. For the finite source case, we should emit // a final watermark that indicates that we reached the end of event-time if (!isCanceledOrStopped()) { ctx.emitWatermark(Watermark.MAX_WATERMARK); } } finally { // make sure that the context is closed in any case ctx.close(); if (latencyEmitter != null) { latencyEmitter.close(); } } }
這裏要注意在調用userFunction.run(ctx)以前,若是latencyTrackingInterval大於0,還建立了LatencyMarksEmitter
)public class RandomWordSource implements SourceFunction<String> { private static final Logger LOGGER = LoggerFactory.getLogger(RandomWordSource.class); private volatile boolean isRunning = true; private static final String[] words = new String[]{"The", "brown", "fox", "quick", "jump", "sucky", "5dolla"}; @Override public void run(SourceContext<String> ctx) throws Exception { while (isRunning) { Thread.sleep(300); int rnd = (int) (Math.random() * 10 % words.length); LOGGER.info("emit word: {}", words[rnd]); ctx.collect(words[rnd]); } } @Override public void cancel() { isRunning = false; } }
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.javaapi
private static class LatencyMarksEmitter<OUT> { private final ScheduledFuture<?> latencyMarkTimer; public LatencyMarksEmitter( final ProcessingTimeService processingTimeService, final Output<StreamRecord<OUT>> output, long latencyTrackingInterval, final OperatorID operatorId, final int subtaskIndex) { latencyMarkTimer = processingTimeService.scheduleAtFixedRate( new ProcessingTimeCallback() { @Override public void onProcessingTime(long timestamp) throws Exception { try { // ProcessingTimeService callbacks are executed under the checkpointing lock output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex)); } catch (Throwable t) { // we catch the Throwables here so that we don't trigger the processing // timer services async exception handler LOG.warn("Error while emitting latency marker.", t); } } }, 0L, latencyTrackingInterval); } public void close() { latencyMarkTimer.cancel(true); } }
若是latencyTrackingInterval>0的話
),這裏的latencyTrackingInterval先調用getExecutionConfig().isLatencyTrackingConfigured()判斷executionConfig是否有配置該值,有配置的話則使用getExecutionConfig().getLatencyTrackingInterval()返回的值,沒有配置的話則使用configuration.getLong(MetricOptions.LATENCY_INTERVAL)返回的值,後者默認是2000L(這裏使用的是後者的配置,即爲2000
)flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.javaapp
@Override public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) { long nextTimestamp = getCurrentProcessingTime() + initialDelay; // we directly try to register the timer and only react to the status on exception // that way we save unnecessary volatile accesses for each timer try { return timerService.scheduleAtFixedRate( new RepeatedTriggerTask(status, task, checkpointLock, callback, nextTimestamp, period), initialDelay, period, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException e) { final int status = this.status.get(); if (status == STATUS_QUIESCED) { return new NeverCompleteFuture(initialDelay); } else if (status == STATUS_SHUTDOWN) { throw new IllegalStateException("Timer service is shut down"); } else { // something else happened, so propagate the exception throw e; } } } @Override public long getCurrentProcessingTime() { return System.currentTimeMillis(); }
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.javadom
/** * Internal task which is repeatedly called by the processing time service. */ private static final class RepeatedTriggerTask implements Runnable { private final AtomicInteger serviceStatus; private final Object lock; private final ProcessingTimeCallback target; private final long period; private final AsyncExceptionHandler exceptionHandler; private long nextTimestamp; private RepeatedTriggerTask( final AtomicInteger serviceStatus, final AsyncExceptionHandler exceptionHandler, final Object lock, final ProcessingTimeCallback target, final long nextTimestamp, final long period) { this.serviceStatus = Preconditions.checkNotNull(serviceStatus); this.lock = Preconditions.checkNotNull(lock); this.target = Preconditions.checkNotNull(target); this.period = period; this.exceptionHandler = Preconditions.checkNotNull(exceptionHandler); this.nextTimestamp = nextTimestamp; } @Override public void run() { synchronized (lock) { try { if (serviceStatus.get() == STATUS_ALIVE) { target.onProcessingTime(nextTimestamp); } nextTimestamp += period; } catch (Throwable t) { TimerException asyncException = new TimerException(t); exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", asyncException); } } } }
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
/** * Wrapping {@link Output} that updates metrics on the number of emitted elements. */ public static class CountingOutput<OUT> implements Output<StreamRecord<OUT>> { private final Output<StreamRecord<OUT>> output; private final Counter numRecordsOut; public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) { this.output = output; this.numRecordsOut = counter; } @Override public void emitWatermark(Watermark mark) { output.emitWatermark(mark); } @Override public void emitLatencyMarker(LatencyMarker latencyMarker) { output.emitLatencyMarker(latencyMarker); } @Override public void collect(StreamRecord<OUT> record) { numRecordsOut.inc(); output.collect(record); } @Override public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) { numRecordsOut.inc(); output.collect(outputTag, record); } @Override public void close() { output.close(); } }
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
/** * Implementation of {@link Output} that sends data using a {@link RecordWriter}. */ @Internal public class RecordWriterOutput<OUT> implements OperatorChain.WatermarkGaugeExposingOutput<StreamRecord<OUT>> { private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter; private SerializationDelegate<StreamElement> serializationDelegate; //...... @Override public void emitLatencyMarker(LatencyMarker latencyMarker) { serializationDelegate.setInstance(latencyMarker); try { recordWriter.randomEmit(serializationDelegate); } catch (Exception e) { throw new RuntimeException(e.getMessage(), e); } } }
它其實是經過父類RecordWriter來發射
),來發射LatencyMarkerflink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
/** * This is used to send LatencyMarks to a random target channel. */ public void randomEmit(T record) throws IOException, InterruptedException { sendToTarget(record, rng.nextInt(numChannels)); } private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { RecordSerializer<T> serializer = serializers[targetChannel]; SerializationResult result = serializer.addRecord(record); while (result.isFullBuffer()) { if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { // If this was a full record, we are done. Not breaking // out of the loop at this point will lead to another // buffer request before breaking out (that would not be // a problem per se, but it can lead to stalls in the // pipeline). if (result.isFullRecord()) { break; } } BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel); result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder); } checkState(!serializer.hasSerializedData(), "All data should be written at once"); if (flushAlways) { targetPartition.flush(targetChannel); } }
下游
)flink-runtime_2.11-1.6.2-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java
/** * The Task represents one execution of a parallel subtask on a TaskManager. * A Task wraps a Flink operator (which may be a user function) and * runs it, providing all services necessary for example to consume input data, * produce its results (intermediate result partitions) and communicate * with the JobManager. * * <p>The Flink operators (implemented as subclasses of * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks. * The task connects those to the network stack and actor messages, and tracks the state * of the execution and handles exceptions. * * <p>Tasks have no knowledge about how they relate to other tasks, or whether they * are the first attempt to execute the task, or a repeated attempt. All of that * is only known to the JobManager. All the task knows are its own runnable code, * the task's configuration, and the IDs of the intermediate results to consume and * produce (if any). * * <p>Each Task is run by one dedicated thread. */ public class Task implements Runnable, TaskActions, CheckpointListener { //...... /** * The core work method that bootstraps the task and executes its code. */ @Override public void run() { //...... // now load and instantiate the task's invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running notifyObservers(ExecutionState.RUNNING, null); taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); //...... } }
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@Override protected void run() throws Exception { // cache processor reference on the stack, to make the code more JIT friendly final StreamInputProcessor<IN> inputProcessor = this.inputProcessor; while (running && inputProcessor.processInput()) { // all the work happens in the "processInput" method } }
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
public boolean processInput() throws Exception { if (isFinished) { return false; } if (numRecordsIn == null) { try { numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); } catch (Exception e) { LOG.warn("An exception occurred during the metrics setup.", e); numRecordsIn = new SimpleCounter(); } } while (true) { if (currentRecordDeserializer != null) { DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate); if (result.isBufferConsumed()) { currentRecordDeserializer.getCurrentBuffer().recycleBuffer(); currentRecordDeserializer = null; } if (result.isFullRecord()) { StreamElement recordOrMark = deserializationDelegate.getInstance(); if (recordOrMark.isWatermark()) { // handle watermark statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel); continue; } else if (recordOrMark.isStreamStatus()) { // handle stream status statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel); continue; } else if (recordOrMark.isLatencyMarker()) { // handle latency marker synchronized (lock) { streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker()); } continue; } else { // now we can do the actual processing StreamRecord<IN> record = recordOrMark.asRecord(); synchronized (lock) { numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); streamOperator.processElement(record); } return true; } } } final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked(); if (bufferOrEvent != null) { if (bufferOrEvent.isBuffer()) { currentChannel = bufferOrEvent.getChannelIndex(); currentRecordDeserializer = recordDeserializers[currentChannel]; currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); } else { // Event received final AbstractEvent event = bufferOrEvent.getEvent(); if (event.getClass() != EndOfPartitionEvent.class) { throw new IOException("Unexpected event: " + event); } } } else { isFinished = true; if (!barrierHandler.isEmpty()) { throw new IllegalStateException("Trailing data in checkpoint barrier handler."); } return false; } } }
flink-streaming-java_2.11-1.6.2-sources.jar!/org/apache/flink/streaming/api/operators/StreamMap.java
/** * A {@link StreamOperator} for executing {@link MapFunction MapFunctions}. */ @Internal public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT> { private static final long serialVersionUID = 1L; public StreamMap(MapFunction<IN, OUT> mapper) { super(mapper); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void processElement(StreamRecord<IN> element) throws Exception { output.collect(element.replace(userFunction.map(element.getValue()))); } }
RandomWordSource.run
);SourceStream.run裏頭在調用userFunction.run以前會判斷latencyTrackingInterval是否大於0,若是大於0則會建立LatencyMarksEmitter,它註冊了定時任務來定時回調ProcessingTimeCallback的onProcessingTime方法,來觸發output.emitLatencyMarker(new LatencyMarker(timestamp, operatorId, subtaskIndex))UpperCaseMapFunc.map
)