本文主要研究一下flink StreamOperator的initializeState方法html
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskmanager/Task.javajava
public class Task implements Runnable, TaskActions, CheckpointListener { public void run() { // ---------------------------- // Initial State transition // ---------------------------- while (true) { ExecutionState current = this.executionState; if (current == ExecutionState.CREATED) { if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) { // success, we can start our work break; } } else if (current == ExecutionState.FAILED) { // we were immediately failed. tell the TaskManager that we reached our final state notifyFinalState(); if (metrics != null) { metrics.close(); } return; } else if (current == ExecutionState.CANCELING) { if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) { // we were immediately canceled. tell the TaskManager that we reached our final state notifyFinalState(); if (metrics != null) { metrics.close(); } return; } } else { if (metrics != null) { metrics.close(); } throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.'); } } // all resource acquisitions and registrations from here on // need to be undone in the end Map<String, Future<Path>> distributedCacheEntries = new HashMap<>(); AbstractInvokable invokable = null; try { // ---------------------------- // Task Bootstrap - We periodically // check for canceling as a shortcut // ---------------------------- //...... // ---------------------------------------------------------------- // call the user code initialization methods // ---------------------------------------------------------------- TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId()); Environment env = new RuntimeEnvironment( jobId, vertexId, executionId, executionConfig, taskInfo, jobConfiguration, taskConfiguration, userCodeClassLoader, memoryManager, ioManager, broadcastVariableManager, taskStateManager, accumulatorRegistry, kvStateRegistry, inputSplitProvider, distributedCacheEntries, producedPartitions, inputGates, network.getTaskEventDispatcher(), checkpointResponder, taskManagerConfig, metrics, this); // now load and instantiate the task's invokable code invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env); // ---------------------------------------------------------------- // actual task core work // ---------------------------------------------------------------- // we must make strictly sure that the invokable is accessible to the cancel() call // by the time we switched to running. this.invokable = invokable; // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { throw new CancelTaskException(); } // notify everyone that we switched to running taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING)); // make sure the user code classloader is accessible thread-locally executingThread.setContextClassLoader(userCodeClassLoader); // run the invokable invokable.invoke(); // make sure, we enter the catch block if the task leaves the invoke() method due // to the fact that it has been canceled if (isCanceledOrFailed()) { throw new CancelTaskException(); } // ---------------------------------------------------------------- // finalization of a successful execution // ---------------------------------------------------------------- // finish the produced partitions. if this fails, we consider the execution failed. for (ResultPartition partition : producedPartitions) { if (partition != null) { partition.finish(); } } // try to mark the task as finished // if that fails, the task was canceled/failed in the meantime if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) { throw new CancelTaskException(); } } catch (Throwable t) { //...... } finally { //...... } } //...... }
flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.javaapache
@Internal public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends AbstractInvokable implements AsyncExceptionHandler { @Override public final void invoke() throws Exception { boolean disposed = false; try { // -------- Initialize --------- LOG.debug("Initializing {}.", getName()); asyncOperationsThreadPool = Executors.newCachedThreadPool(); CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory(); synchronousCheckpointExceptionHandler = cpExceptionHandlerFactory.createCheckpointExceptionHandler( getExecutionConfig().isFailTaskOnCheckpointError(), getEnvironment()); asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this); stateBackend = createStateBackend(); checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID()); // if the clock is not already set, then assign a default TimeServiceProvider if (timerService == null) { ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName(), getUserCodeClassLoader()); timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory); } operatorChain = new OperatorChain<>(this, streamRecordWriters); headOperator = operatorChain.getHeadOperator(); // task specific initialization init(); // save the work of reloading state, etc, if the task is already canceled if (canceled) { throw new CancelTaskException(); } // -------- Invoke -------- LOG.debug("Invoking {}", getName()); // we need to make sure that any triggers scheduled in open() cannot be // executed before all operators are opened synchronized (lock) { // both the following operations are protected by the lock // so that we avoid race conditions in the case that initializeState() // registers a timer, that fires before the open() is called. initializeState(); openAllOperators(); } // final check to exit early before starting to run if (canceled) { throw new CancelTaskException(); } // let the task do its work isRunning = true; run(); // if this left the run() method cleanly despite the fact that this was canceled, // make sure the "clean shutdown" is not attempted if (canceled) { throw new CancelTaskException(); } LOG.debug("Finished task {}", getName()); // make sure no further checkpoint and notification actions happen. // we make sure that no other thread is currently in the locked scope before // we close the operators by trying to acquire the checkpoint scope lock // we also need to make sure that no triggers fire concurrently with the close logic // at the same time, this makes sure that during any "regular" exit where still synchronized (lock) { // this is part of the main logic, so if this fails, the task is considered failed closeAllOperators(); // make sure no new timers can come timerService.quiesce(); // only set the StreamTask to not running after all operators have been closed! // See FLINK-7430 isRunning = false; } // make sure all timers finish timerService.awaitPendingAfterQuiesce(); LOG.debug("Closed operators for task {}", getName()); // make sure all buffered data is flushed operatorChain.flushOutputs(); // make an attempt to dispose the operators such that failures in the dispose call // still let the computation fail tryDisposeAllOperators(); disposed = true; } finally { //...... } } private void initializeState() throws Exception { StreamOperator<?>[] allOperators = operatorChain.getAllOperators(); for (StreamOperator<?> operator : allOperators) { if (null != operator) { operator.initializeState(); } } } //...... }
StreamOperator
),調用其initializeState方法;好比這裏的operator爲StreamSourceflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/StreamOperator.javaapi
@PublicEvolving public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Disposable, Serializable { /** * Provides a context to initialize all state in the operator. */ void initializeState() throws Exception; //...... }
flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/StreamSource.javaapp
@Internal public class StreamSource<OUT, SRC extends SourceFunction<OUT>> extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> { //...... }
flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.javaasync
@PublicEvolving public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>, Serializable { @Override public final void initializeState() throws Exception { final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader()); final StreamTask<?, ?> containingTask = Preconditions.checkNotNull(getContainingTask()); final CloseableRegistry streamTaskCloseableRegistry = Preconditions.checkNotNull(containingTask.getCancelables()); final StreamTaskStateInitializer streamTaskStateManager = Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer()); final StreamOperatorStateContext context = streamTaskStateManager.streamOperatorStateContext( getOperatorID(), getClass().getSimpleName(), this, keySerializer, streamTaskCloseableRegistry, metrics); this.operatorStateBackend = context.operatorStateBackend(); this.keyedStateBackend = context.keyedStateBackend(); if (keyedStateBackend != null) { this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig()); } timeServiceManager = context.internalTimerServiceManager(); CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs(); CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs(); try { StateInitializationContext initializationContext = new StateInitializationContextImpl( context.isRestored(), // information whether we restore or start for the first time operatorStateBackend, // access to operator state backend keyedStateStore, // access to keyed state backend keyedStateInputs, // access to keyed state stream operatorStateInputs); // access to operator state stream initializeState(initializationContext); } finally { closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry); closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry); } } /** * Stream operators with state which can be restored need to override this hook method. * * @param context context that allows to register different states. */ public void initializeState(StateInitializationContext context) throws Exception { } //...... }
flink-streaming-java_2.11/1.7.0/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.javaide
@PublicEvolving public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> { @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); StreamingFunctionUtils.restoreFunctionState(context, userFunction); } //...... }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.javaui
public static void restoreFunctionState( StateInitializationContext context, Function userFunction) throws Exception { Preconditions.checkNotNull(context); while (true) { if (tryRestoreFunction(context, userFunction)) { break; } // inspect if the user function is wrapped, then unwrap and try again if we can restore the inner function if (userFunction instanceof WrappingFunction) { userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction(); } else { break; } } } private static boolean tryRestoreFunction( StateInitializationContext context, Function userFunction) throws Exception { if (userFunction instanceof CheckpointedFunction) { ((CheckpointedFunction) userFunction).initializeState(context); return true; } if (context.isRestored() && userFunction instanceof ListCheckpointed) { @SuppressWarnings("unchecked") ListCheckpointed<Serializable> listCheckpointedFun = (ListCheckpointed<Serializable>) userFunction; ListState<Serializable> listState = context.getOperatorStateStore(). getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME); List<Serializable> list = new ArrayList<>(); for (Serializable serializable : listState.get()) { list.add(serializable); } try { listCheckpointedFun.restoreState(list); } catch (Exception e) { throw new Exception("Failed to restore state to function: " + e.getMessage(), e); } return true; } return false; }
StreamOperator
),調用其initializeState方法;好比這裏的operator爲StreamSource,它繼承了AbstractUdfStreamOperator