本文主要研究一下flink KeyedStream的reduce操做java
@Test public void testWordCount() throws Exception { // Checking input parameters // final ParameterTool params = ParameterTool.fromArgs(args); // set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // make parameters available in the web interface // env.getConfig().setGlobalJobParameters(params); // get input data DataStream<String> text = env.fromElements(WORDS); DataStream<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .keyBy(0) .reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) { System.out.println("value1:"+value1.f1+";value2:"+value2.f1); return new Tuple2<>(value1.f0, value1.f1 + value2.f1); } }); // emit result System.out.println("Printing result to stdout. Use --output to specify output path."); counts.print(); // execute program env.execute("Streaming WordCount"); }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.javaweb
@Public public class KeyedStream<T, KEY> extends DataStream<T> { //...... /** * Applies a reduce transformation on the grouped data stream grouped on by * the given key position. The {@link ReduceFunction} will receive input * values based on the key value. Only input values with the same key will * go to the same reducer. * * @param reducer * The {@link ReduceFunction} that will be called for every * element of the input values with the same key. * @return The transformed DataStream. */ public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reducer) { return transform("Keyed Reduce", getType(), new StreamGroupedReduce<T>( clean(reducer), getType().createSerializer(getExecutionConfig()))); } @Override @PublicEvolving public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { SingleOutputStreamOperator<R> returnStream = super.transform(operatorName, outTypeInfo, operator); // inject the key selector and key type OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation(); transform.setStateKeySelector(keySelector); transform.setStateKeyType(keyType); return returnStream; } //...... }
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/ReduceFunction.javaapache
@Public @FunctionalInterface public interface ReduceFunction<T> extends Function, Serializable { /** * The core method of ReduceFunction, combining two values into one value of the same type. * The reduce function is consecutively applied to all values of a group until only a single value remains. * * @param value1 The first value to combine. * @param value2 The second value to combine. * @return The combined value of both input values. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ T reduce(T value1, T value2) throws Exception; }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskmanager/Task.javabootstrap
/** * The Task represents one execution of a parallel subtask on a TaskManager. * A Task wraps a Flink operator (which may be a user function) and * runs it, providing all services necessary for example to consume input data, * produce its results (intermediate result partitions) and communicate * with the JobManager. * * <p>The Flink operators (implemented as subclasses of * {@link AbstractInvokable} have only data readers, -writers, and certain event callbacks. * The task connects those to the network stack and actor messages, and tracks the state * of the execution and handles exceptions. * * <p>Tasks have no knowledge about how they relate to other tasks, or whether they * are the first attempt to execute the task, or a repeated attempt. All of that * is only known to the JobManager. All the task knows are its own runnable code, * the task's configuration, and the IDs of the intermediate results to consume and * produce (if any). * * <p>Each Task is run by one dedicated thread. */ public class Task implements Runnable, TaskActions, CheckpointListener { //...... /** * The core work method that bootstraps the task and executes its code. */ @Override public void run() { // ---------------------------- // Initial State transition // ---------------------------- //...... // all resource acquisitions and registrations from here on // need to be undone in the end Map<String, Future<Path>> distributedCacheEntries = new HashMap<>(); AbstractInvokable invokable = null; try { // now load and instantiate the task's invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); //...... } catch (Throwable t) { //...... } finally { //...... } } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.javaapi
@Internal public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends AbstractInvokable implements AsyncExceptionHandler { //...... protected abstract void run() throws Exception; @Override public final void invoke() throws Exception { boolean disposed = false; try { // -------- Initialize --------- LOG.debug("Initializing {}.", getName()); asyncOperationsThreadPool = Executors.newCachedThreadPool(); CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory(); synchronousCheckpointExceptionHandler = cpExceptionHandlerFactory.createCheckpointExceptionHandler( getExecutionConfig().isFailTaskOnCheckpointError(), getEnvironment()); asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this); stateBackend = createStateBackend(); checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID()); // if the clock is not already set, then assign a default TimeServiceProvider if (timerService == null) { ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName(), getUserCodeClassLoader()); timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory); } operatorChain = new OperatorChain<>(this, streamRecordWriters); headOperator = operatorChain.getHeadOperator(); // task specific initialization init(); // save the work of reloading state, etc, if the task is already canceled if (canceled) { throw new CancelTaskException(); } // -------- Invoke -------- LOG.debug("Invoking {}", getName()); // we need to make sure that any triggers scheduled in open() cannot be // executed before all operators are opened synchronized (lock) { // both the following operations are protected by the lock // so that we avoid race conditions in the case that initializeState() // registers a timer, that fires before the open() is called. initializeState(); openAllOperators(); } // final check to exit early before starting to run if (canceled) { throw new CancelTaskException(); } // let the task do its work isRunning = true; run(); // if this left the run() method cleanly despite the fact that this was canceled, // make sure the "clean shutdown" is not attempted if (canceled) { throw new CancelTaskException(); } LOG.debug("Finished task {}", getName()); //...... } finally { //...... } } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.javaapp
@Internal public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> { private StreamInputProcessor<IN> inputProcessor; private volatile boolean running = true; private final WatermarkGauge inputWatermarkGauge = new WatermarkGauge(); /** * Constructor for initialization, possibly with initial state (recovery / savepoint / etc). * * @param env The task environment for this task. */ public OneInputStreamTask(Environment env) { super(env); } /** * Constructor for initialization, possibly with initial state (recovery / savepoint / etc). * * <p>This constructor accepts a special {@link ProcessingTimeService}. By default (and if * null is passes for the time provider) a {@link SystemProcessingTimeService DefaultTimerService} * will be used. * * @param env The task environment for this task. * @param timeProvider Optionally, a specific time provider to use. */ @VisibleForTesting public OneInputStreamTask( Environment env, @Nullable ProcessingTimeService timeProvider) { super(env, timeProvider); } @Override public void init() throws Exception { StreamConfig configuration = getConfiguration(); TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader()); int numberOfInputs = configuration.getNumberOfInputs(); if (numberOfInputs > 0) { InputGate[] inputGates = getEnvironment().getAllInputGates(); inputProcessor = new StreamInputProcessor<>( inputGates, inSerializer, this, configuration.getCheckpointMode(), getCheckpointLock(), getEnvironment().getIOManager(), getEnvironment().getTaskManagerInfo().getConfiguration(), getStreamStatusMaintainer(), this.headOperator, getEnvironment().getMetricGroup().getIOMetricGroup(), inputWatermarkGauge); } headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge); // wrap watermark gauge since registered metrics must be unique getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue); } @Override protected void run() throws Exception { // cache processor reference on the stack, to make the code more JIT friendly final StreamInputProcessor<IN> inputProcessor = this.inputProcessor; while (running && inputProcessor.processInput()) { // all the work happens in the "processInput" method } } @Override protected void cleanup() throws Exception { if (inputProcessor != null) { inputProcessor.cleanup(); } } @Override protected void cancelTask() { running = false; } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/io/StreamInputProcessor.javaasync
@Internal public class StreamInputProcessor<IN> { //...... public boolean processInput() throws Exception { if (isFinished) { return false; } if (numRecordsIn == null) { try { numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); } catch (Exception e) { LOG.warn("An exception occurred during the metrics setup.", e); numRecordsIn = new SimpleCounter(); } } while (true) { if (currentRecordDeserializer != null) { DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate); if (result.isBufferConsumed()) { currentRecordDeserializer.getCurrentBuffer().recycleBuffer(); currentRecordDeserializer = null; } if (result.isFullRecord()) { StreamElement recordOrMark = deserializationDelegate.getInstance(); if (recordOrMark.isWatermark()) { // handle watermark statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel); continue; } else if (recordOrMark.isStreamStatus()) { // handle stream status statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel); continue; } else if (recordOrMark.isLatencyMarker()) { // handle latency marker synchronized (lock) { streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker()); } continue; } else { // now we can do the actual processing StreamRecord<IN> record = recordOrMark.asRecord(); synchronized (lock) { numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); streamOperator.processElement(record); } return true; } } } //...... } } //...... }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/StreamGroupedReduce.javaide
/** * A {@link StreamOperator} for executing a {@link ReduceFunction} on a * {@link org.apache.flink.streaming.api.datastream.KeyedStream}. */ @Internal public class StreamGroupedReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>> implements OneInputStreamOperator<IN, IN> { private static final long serialVersionUID = 1L; private static final String STATE_NAME = "_op_state"; private transient ValueState<IN> values; private TypeSerializer<IN> serializer; public StreamGroupedReduce(ReduceFunction<IN> reducer, TypeSerializer<IN> serializer) { super(reducer); this.serializer = serializer; } @Override public void open() throws Exception { super.open(); ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer); values = getPartitionedState(stateId); } @Override public void processElement(StreamRecord<IN> element) throws Exception { IN value = element.getValue(); IN currentValue = values.value(); if (currentValue != null) { IN reduced = userFunction.reduce(currentValue, value); values.update(reduced); output.collect(element.replace(reduced)); } else { values.update(value); output.collect(element.replace(value)); } } }