聊聊flink的OperatorStateBackend

本文主要研究一下flink的OperatorStateBackendhtml

OperatorStateBackend

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/OperatorStateBackend.javajava

/**
 * Interface that combines both, the user facing {@link OperatorStateStore} interface and the system interface
 * {@link Snapshotable}
 *
 */
public interface OperatorStateBackend extends
    OperatorStateStore,
    Snapshotable<SnapshotResult<OperatorStateHandle>, Collection<OperatorStateHandle>>,
    Closeable,
    Disposable {

    @Override
    void dispose();
}
  • OperatorStateBackend接口繼承了OperatorStateStore、Snapshotable、Closeable、Disposable接口

OperatorStateStore

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/OperatorStateStore.javaapache

/**
 * This interface contains methods for registering operator state with a managed store.
 */
@PublicEvolving
public interface OperatorStateStore {

    <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception;

    <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;

    <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;

    Set<String> getRegisteredStateNames();

    Set<String> getRegisteredBroadcastStateNames();

    // -------------------------------------------------------------------------------------------
    //  Deprecated methods
    // -------------------------------------------------------------------------------------------

    @Deprecated
    <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception;


    @Deprecated
    <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception;
}
  • OperatorStateStore定義了getBroadcastState、getListState、getUnionListState方法用於create或restore BroadcastState或者ListState;同時也定義了getRegisteredStateNames、getRegisteredBroadcastStateNames用於返回當前註冊的state的名稱

Snapshotable

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/Snapshotable.javaapi

/**
 * Interface for operators that can perform snapshots of their state.
 *
 * @param <S> Generic type of the state object that is created as handle to snapshots.
 * @param <R> Generic type of the state object that used in restore.
 */
@Internal
public interface Snapshotable<S extends StateObject, R> extends SnapshotStrategy<S> {

    /**
     * Restores state that was previously snapshotted from the provided parameters. Typically the parameters are state
     * handles from which the old state is read.
     *
     * @param state the old state to restore.
     */
    void restore(@Nullable R state) throws Exception;
}
  • Snapshotable接口繼承了SnapshotStrategy接口,同時定義了restore方法用於restore state

SnapshotStrategy

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/SnapshotStrategy.javaapp

/**
 * Interface for different snapshot approaches in state backends. Implementing classes should ideally be stateless or at
 * least threadsafe, i.e. this is a functional interface and is can be called in parallel by multiple checkpoints.
 *
 * @param <S> type of the returned state object that represents the result of the snapshot operation.
 */
@Internal
public interface SnapshotStrategy<S extends StateObject> {

    /**
     * Operation that writes a snapshot into a stream that is provided by the given {@link CheckpointStreamFactory} and
     * returns a @{@link RunnableFuture} that gives a state handle to the snapshot. It is up to the implementation if
     * the operation is performed synchronous or asynchronous. In the later case, the returned Runnable must be executed
     * first before obtaining the handle.
     *
     * @param checkpointId      The ID of the checkpoint.
     * @param timestamp         The timestamp of the checkpoint.
     * @param streamFactory     The factory that we can use for writing our state to streams.
     * @param checkpointOptions Options for how to perform this checkpoint.
     * @return A runnable future that will yield a {@link StateObject}.
     */
    @Nonnull
    RunnableFuture<S> snapshot(
        long checkpointId,
        long timestamp,
        @Nonnull CheckpointStreamFactory streamFactory,
        @Nonnull CheckpointOptions checkpointOptions) throws Exception;
}
  • SnapshotStrategy定義了snapshot方法,給不一樣的snapshot策略去實現,這裏要求snapshot結果返回的類型是StateObject類型

AbstractSnapshotStrategy

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/AbstractSnapshotStrategy.javaless

/**
 * Abstract base class for implementing {@link SnapshotStrategy}, that gives a consistent logging across state backends.
 *
 * @param <T> type of the snapshot result.
 */
public abstract class AbstractSnapshotStrategy<T extends StateObject> implements SnapshotStrategy<SnapshotResult<T>> {

    private static final Logger LOG = LoggerFactory.getLogger(AbstractSnapshotStrategy.class);

    private static final String LOG_SYNC_COMPLETED_TEMPLATE = "{} ({}, synchronous part) in thread {} took {} ms.";
    private static final String LOG_ASYNC_COMPLETED_TEMPLATE = "{} ({}, asynchronous part) in thread {} took {} ms.";

    /** Descriptive name of the snapshot strategy that will appear in the log outputs and {@link #toString()}. */
    @Nonnull
    protected final String description;

    protected AbstractSnapshotStrategy(@Nonnull String description) {
        this.description = description;
    }

    /**
     * Logs the duration of the synchronous snapshot part from the given start time.
     */
    public void logSyncCompleted(@Nonnull Object checkpointOutDescription, long startTime) {
        logCompletedInternal(LOG_SYNC_COMPLETED_TEMPLATE, checkpointOutDescription, startTime);
    }

    /**
     * Logs the duration of the asynchronous snapshot part from the given start time.
     */
    public void logAsyncCompleted(@Nonnull Object checkpointOutDescription, long startTime) {
        logCompletedInternal(LOG_ASYNC_COMPLETED_TEMPLATE, checkpointOutDescription, startTime);
    }

    private void logCompletedInternal(
        @Nonnull String template,
        @Nonnull Object checkpointOutDescription,
        long startTime) {

        long duration = (System.currentTimeMillis() - startTime);

        LOG.debug(
            template,
            description,
            checkpointOutDescription,
            Thread.currentThread(),
            duration);
    }

    @Override
    public String toString() {
        return "SnapshotStrategy {" + description + "}";
    }
}
  • AbstractSnapshotStrategy是個抽象類,它沒有實現SnapshotStrategy定義的snapshot方法,這裏只是提供了logSyncCompleted方法打印debug信息

StateObject

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/StateObject.javaasync

/**
 * Base of all handles that represent checkpointed state in some form. The object may hold
 * the (small) state directly, or contain a file path (state is in the file), or contain the
 * metadata to access the state stored in some external database.
 *
 * <p>State objects define how to {@link #discardState() discard state} and how to access the
 * {@link #getStateSize() size of the state}.
 * 
 * <p>State Objects are transported via RPC between <i>JobManager</i> and
 * <i>TaskManager</i> and must be {@link java.io.Serializable serializable} to support that.
 * 
 * <p>Some State Objects are stored in the checkpoint/savepoint metadata. For long-term
 * compatibility, they are not stored via {@link java.io.Serializable Java Serialization},
 * but through custom serializers.
 */
public interface StateObject extends Serializable {

    void discardState() throws Exception;

    long getStateSize();
}
  • StateObject繼承了Serializable接口,由於會經過rpc在JobManager及TaskManager之間進行傳輸;這個接口定義了discardState及getStateSize方法,discardState用於清理資源,而getStateSize用於返回state的大小

StreamStateHandle

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/StreamStateHandle.javaide

/**
 * A {@link StateObject} that represents state that was written to a stream. The data can be read
 * back via {@link #openInputStream()}.
 */
public interface StreamStateHandle extends StateObject {

    /**
     * Returns an {@link FSDataInputStream} that can be used to read back the data that
     * was previously written to the stream.
     */
    FSDataInputStream openInputStream() throws IOException;
}
  • StreamStateHandle繼承了StateObject接口,多定義了openInputStream方法

OperatorStateHandle

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/OperatorStateHandle.javaui

/**
 * Interface of a state handle for operator state.
 */
public interface OperatorStateHandle extends StreamStateHandle {

    /**
     * Returns a map of meta data for all contained states by their name.
     */
    Map<String, StateMetaInfo> getStateNameToPartitionOffsets();

    /**
     * Returns an input stream to read the operator state information.
     */
    @Override
    FSDataInputStream openInputStream() throws IOException;

    /**
     * Returns the underlying stream state handle that points to the state data.
     */
    StreamStateHandle getDelegateStateHandle();

    //......
}
  • OperatorStateHandle繼承了StreamStateHandle,它多定義了getStateNameToPartitionOffsets、getDelegateStateHandle方法,其中getStateNameToPartitionOffsets提供了state name到可用partitions的offset的映射信息

OperatorStreamStateHandle

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/OperatorStreamStateHandle.javathis

/**
 * State handle for partitionable operator state. Besides being a {@link StreamStateHandle}, this also provides a
 * map that contains the offsets to the partitions of named states in the stream.
 */
public class OperatorStreamStateHandle implements OperatorStateHandle {

    private static final long serialVersionUID = 35876522969227335L;

    /**
     * unique state name -> offsets for available partitions in the handle stream
     */
    private final Map<String, StateMetaInfo> stateNameToPartitionOffsets;
    private final StreamStateHandle delegateStateHandle;

    public OperatorStreamStateHandle(
            Map<String, StateMetaInfo> stateNameToPartitionOffsets,
            StreamStateHandle delegateStateHandle) {

        this.delegateStateHandle = Preconditions.checkNotNull(delegateStateHandle);
        this.stateNameToPartitionOffsets = Preconditions.checkNotNull(stateNameToPartitionOffsets);
    }

    @Override
    public Map<String, StateMetaInfo> getStateNameToPartitionOffsets() {
        return stateNameToPartitionOffsets;
    }

    @Override
    public void discardState() throws Exception {
        delegateStateHandle.discardState();
    }

    @Override
    public long getStateSize() {
        return delegateStateHandle.getStateSize();
    }

    @Override
    public FSDataInputStream openInputStream() throws IOException {
        return delegateStateHandle.openInputStream();
    }

    @Override
    public StreamStateHandle getDelegateStateHandle() {
        return delegateStateHandle;
    }

    //......
}
  • OperatorStreamStateHandle實現了OperatorStateHandle接口,它定義了stateNameToPartitionOffsets屬性(Map<String, StateMetaInfo>),而getStateNameToPartitionOffsets方法就是返回的stateNameToPartitionOffsets屬性

SnapshotResult

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/SnapshotResult.java

/**
 * This class contains the combined results from the snapshot of a state backend:
 * <ul>
 *   <li>A state object representing the state that will be reported to the Job Manager to acknowledge the checkpoint.</li>
 *   <li>A state object that represents the state for the {@link TaskLocalStateStoreImpl}.</li>
 * </ul>
 *
 * Both state objects are optional and can be null, e.g. if there was no state to snapshot in the backend. A local
 * state object that is not null also requires a state to report to the job manager that is not null, because the
 * Job Manager always owns the ground truth about the checkpointed state.
 */
public class SnapshotResult<T extends StateObject> implements StateObject {

    private static final long serialVersionUID = 1L;

    /** An singleton instance to represent an empty snapshot result. */
    private static final SnapshotResult<?> EMPTY = new SnapshotResult<>(null, null);

    /** This is the state snapshot that will be reported to the Job Manager to acknowledge a checkpoint. */
    private final T jobManagerOwnedSnapshot;

    /** This is the state snapshot that will be reported to the Job Manager to acknowledge a checkpoint. */
    private final T taskLocalSnapshot;

    /**
     * Creates a {@link SnapshotResult} for the given jobManagerOwnedSnapshot and taskLocalSnapshot. If the
     * jobManagerOwnedSnapshot is null, taskLocalSnapshot must also be null.
     *
     * @param jobManagerOwnedSnapshot Snapshot for report to job manager. Can be null.
     * @param taskLocalSnapshot Snapshot for report to local state manager. This is optional and requires
     *                             jobManagerOwnedSnapshot to be not null if this is not also null.
     */
    private SnapshotResult(T jobManagerOwnedSnapshot, T taskLocalSnapshot) {

        if (jobManagerOwnedSnapshot == null && taskLocalSnapshot != null) {
            throw new IllegalStateException("Cannot report local state snapshot without corresponding remote state!");
        }

        this.jobManagerOwnedSnapshot = jobManagerOwnedSnapshot;
        this.taskLocalSnapshot = taskLocalSnapshot;
    }

    public T getJobManagerOwnedSnapshot() {
        return jobManagerOwnedSnapshot;
    }

    public T getTaskLocalSnapshot() {
        return taskLocalSnapshot;
    }

    @Override
    public void discardState() throws Exception {

        Exception aggregatedExceptions = null;

        if (jobManagerOwnedSnapshot != null) {
            try {
                jobManagerOwnedSnapshot.discardState();
            } catch (Exception remoteDiscardEx) {
                aggregatedExceptions = remoteDiscardEx;
            }
        }

        if (taskLocalSnapshot != null) {
            try {
                taskLocalSnapshot.discardState();
            } catch (Exception localDiscardEx) {
                aggregatedExceptions = ExceptionUtils.firstOrSuppressed(localDiscardEx, aggregatedExceptions);
            }
        }

        if (aggregatedExceptions != null) {
            throw aggregatedExceptions;
        }
    }

    @Override
    public long getStateSize() {
        return jobManagerOwnedSnapshot != null ? jobManagerOwnedSnapshot.getStateSize() : 0L;
    }

    @SuppressWarnings("unchecked")
    public static <T extends StateObject> SnapshotResult<T> empty() {
        return (SnapshotResult<T>) EMPTY;
    }

    public static <T extends StateObject> SnapshotResult<T> of(@Nullable T jobManagerState) {
        return jobManagerState != null ? new SnapshotResult<>(jobManagerState, null) : empty();
    }

    public static <T extends StateObject> SnapshotResult<T> withLocalState(
        @Nonnull T jobManagerState,
        @Nonnull T localState) {
        return new SnapshotResult<>(jobManagerState, localState);
    }
}
  • SnapshotResult類實現了StateObject接口,它包裝了snapshot的結果,這裏包括jobManagerOwnedSnapshot、taskLocalSnapshot;它實現的discardState方法,調用了jobManagerOwnedSnapshot及taskLocalSnapshot的discardState方法;getStateSize方法則返回的是jobManagerOwnedSnapshot的stateSize

DefaultOperatorStateBackend

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java

/**
 * Default implementation of OperatorStateStore that provides the ability to make snapshots.
 */
@Internal
public class DefaultOperatorStateBackend implements OperatorStateBackend {
    
    private static final Logger LOG = LoggerFactory.getLogger(DefaultOperatorStateBackend.class);

    /**
     * The default namespace for state in cases where no state name is provided
     */
    public static final String DEFAULT_OPERATOR_STATE_NAME = "_default_";

    /**
     * Map for all registered operator states. Maps state name -> state
     */
    private final Map<String, PartitionableListState<?>> registeredOperatorStates;

    /**
     * Map for all registered operator broadcast states. Maps state name -> state
     */
    private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;

    /**
     * CloseableRegistry to participate in the tasks lifecycle.
     */
    private final CloseableRegistry closeStreamOnCancelRegistry;

    /**
     * Default serializer. Only used for the default operator state.
     */
    private final JavaSerializer<Serializable> javaSerializer;

    /**
     * The user code classloader.
     */
    private final ClassLoader userClassloader;

    /**
     * The execution configuration.
     */
    private final ExecutionConfig executionConfig;

    /**
     * Flag to de/activate asynchronous snapshots.
     */
    private final boolean asynchronousSnapshots;

    /**
     * Map of state names to their corresponding restored state meta info.
     *
     * <p>TODO this map can be removed when eager-state registration is in place.
     * TODO we currently need this cached to check state migration strategies when new serializers are registered.
     */
    private final Map<String, StateMetaInfoSnapshot> restoredOperatorStateMetaInfos;

    /**
     * Map of state names to their corresponding restored broadcast state meta info.
     */
    private final Map<String, StateMetaInfoSnapshot> restoredBroadcastStateMetaInfos;

    /**
     * Cache of already accessed states.
     *
     * <p>In contrast to {@link #registeredOperatorStates} and {@link #restoredOperatorStateMetaInfos} which may be repopulated
     * with restored state, this map is always empty at the beginning.
     *
     * <p>TODO this map should be moved to a base class once we have proper hierarchy for the operator state backends.
     *
     * @see <a href="https://issues.apache.org/jira/browse/FLINK-6849">FLINK-6849</a>
     */
    private final HashMap<String, PartitionableListState<?>> accessedStatesByName;

    private final Map<String, BackendWritableBroadcastState<?, ?>> accessedBroadcastStatesByName;

    private final AbstractSnapshotStrategy<OperatorStateHandle> snapshotStrategy;

    public DefaultOperatorStateBackend(
        ClassLoader userClassLoader,
        ExecutionConfig executionConfig,
        boolean asynchronousSnapshots) {

        this.closeStreamOnCancelRegistry = new CloseableRegistry();
        this.userClassloader = Preconditions.checkNotNull(userClassLoader);
        this.executionConfig = executionConfig;
        this.javaSerializer = new JavaSerializer<>();
        this.registeredOperatorStates = new HashMap<>();
        this.registeredBroadcastStates = new HashMap<>();
        this.asynchronousSnapshots = asynchronousSnapshots;
        this.accessedStatesByName = new HashMap<>();
        this.accessedBroadcastStatesByName = new HashMap<>();
        this.restoredOperatorStateMetaInfos = new HashMap<>();
        this.restoredBroadcastStateMetaInfos = new HashMap<>();
        this.snapshotStrategy = new DefaultOperatorStateBackendSnapshotStrategy();
    }

    @Override
    public Set<String> getRegisteredStateNames() {
        return registeredOperatorStates.keySet();
    }

    @Override
    public Set<String> getRegisteredBroadcastStateNames() {
        return registeredBroadcastStates.keySet();
    }

    @Override
    public void close() throws IOException {
        closeStreamOnCancelRegistry.close();
    }

    @Override
    public void dispose() {
        IOUtils.closeQuietly(closeStreamOnCancelRegistry);
        registeredOperatorStates.clear();
        registeredBroadcastStates.clear();
    }

    // -------------------------------------------------------------------------------------------
    //  State access methods
    // -------------------------------------------------------------------------------------------

    @SuppressWarnings("unchecked")
    @Override
    public <K, V> BroadcastState<K, V> getBroadcastState(final MapStateDescriptor<K, V> stateDescriptor) throws StateMigrationException {
        //......
    }

    @Override
    public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
        return getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
    }

    @Override
    public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
        return getListState(stateDescriptor, OperatorStateHandle.Mode.UNION);
    }

    @Nonnull
    @Override
    public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
        long checkpointId,
        long timestamp,
        @Nonnull CheckpointStreamFactory streamFactory,
        @Nonnull CheckpointOptions checkpointOptions) throws Exception {

        long syncStartTime = System.currentTimeMillis();

        RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshotRunner =
            snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);

        snapshotStrategy.logSyncCompleted(streamFactory, syncStartTime);
        return snapshotRunner;
    }

    //......
}
  • DefaultOperatorStateBackend實現了OperatorStateBackend接口
  • getRegisteredStateNames方法返回的是registeredOperatorStates.keySet();getRegisteredBroadcastStateNames方法返回的是registeredBroadcastStates.keySet(),能夠看到這兩個都是基於內存的Map來實現的
  • close方法主要是調用closeStreamOnCancelRegistry的close方法;dispose方法也會關閉closeStreamOnCancelRegistry,同時清空registeredOperatorStates及registeredBroadcastStates
  • getListState及getUnionListState方法都調用了getListState(ListStateDescriptor<S> stateDescriptor,OperatorStateHandle.Mode mode)方法
  • snapshot方法使用的snapshotStrategy是DefaultOperatorStateBackendSnapshotStrategy

DefaultOperatorStateBackend.getListState

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java

private <S> ListState<S> getListState(
            ListStateDescriptor<S> stateDescriptor,
            OperatorStateHandle.Mode mode) throws StateMigrationException {

        Preconditions.checkNotNull(stateDescriptor);
        String name = Preconditions.checkNotNull(stateDescriptor.getName());

        @SuppressWarnings("unchecked")
        PartitionableListState<S> previous = (PartitionableListState<S>) accessedStatesByName.get(name);
        if (previous != null) {
            checkStateNameAndMode(
                    previous.getStateMetaInfo().getName(),
                    name,
                    previous.getStateMetaInfo().getAssignmentMode(),
                    mode);
            return previous;
        }

        // end up here if its the first time access after execution for the
        // provided state name; check compatibility of restored state, if any
        // TODO with eager registration in place, these checks should be moved to restore()

        stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
        TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getElementSerializer());

        @SuppressWarnings("unchecked")
        PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredOperatorStates.get(name);

        if (null == partitionableListState) {
            // no restored state for the state name; simply create new state holder

            partitionableListState = new PartitionableListState<>(
                new RegisteredOperatorStateBackendMetaInfo<>(
                    name,
                    partitionStateSerializer,
                    mode));

            registeredOperatorStates.put(name, partitionableListState);
        } else {
            // has restored state; check compatibility of new state access

            checkStateNameAndMode(
                    partitionableListState.getStateMetaInfo().getName(),
                    name,
                    partitionableListState.getStateMetaInfo().getAssignmentMode(),
                    mode);

            StateMetaInfoSnapshot restoredSnapshot = restoredOperatorStateMetaInfos.get(name);
            RegisteredOperatorStateBackendMetaInfo<S> metaInfo =
                new RegisteredOperatorStateBackendMetaInfo<>(restoredSnapshot);

            // check compatibility to determine if state migration is required
            TypeSerializer<S> newPartitionStateSerializer = partitionStateSerializer.duplicate();

            @SuppressWarnings("unchecked")
            TypeSerializerSnapshot<S> stateSerializerSnapshot = Preconditions.checkNotNull(
                (TypeSerializerSnapshot<S>) restoredSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));

            TypeSerializerSchemaCompatibility<S> stateCompatibility =
                stateSerializerSnapshot.resolveSchemaCompatibility(newPartitionStateSerializer);
            if (stateCompatibility.isIncompatible()) {
                throw new StateMigrationException("The new state serializer for operator state must not be incompatible.");
            }

            partitionableListState.setStateMetaInfo(
                new RegisteredOperatorStateBackendMetaInfo<>(name, newPartitionStateSerializer, mode));
        }

        accessedStatesByName.put(name, partitionableListState);
        return partitionableListState;
    }
  • 從registeredOperatorStates獲取對應PartitionableListState,沒有的話則建立,有的話則檢查下兼容性,而後往partitionableListState設置stateMetaInfo

DefaultOperatorStateBackendSnapshotStrategy

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java

/**
     * Snapshot strategy for this backend.
     */
    private class DefaultOperatorStateBackendSnapshotStrategy extends AbstractSnapshotStrategy<OperatorStateHandle> {

        protected DefaultOperatorStateBackendSnapshotStrategy() {
            super("DefaultOperatorStateBackend snapshot");
        }

        @Nonnull
        @Override
        public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
            final long checkpointId,
            final long timestamp,
            @Nonnull final CheckpointStreamFactory streamFactory,
            @Nonnull final CheckpointOptions checkpointOptions) throws IOException {

            if (registeredOperatorStates.isEmpty() && registeredBroadcastStates.isEmpty()) {
                return DoneFuture.of(SnapshotResult.empty());
            }

            final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies =
                new HashMap<>(registeredOperatorStates.size());
            final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies =
                new HashMap<>(registeredBroadcastStates.size());

            ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
            Thread.currentThread().setContextClassLoader(userClassloader);
            try {
                // eagerly create deep copies of the list and the broadcast states (if any)
                // in the synchronous phase, so that we can use them in the async writing.

                if (!registeredOperatorStates.isEmpty()) {
                    for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStates.entrySet()) {
                        PartitionableListState<?> listState = entry.getValue();
                        if (null != listState) {
                            listState = listState.deepCopy();
                        }
                        registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);
                    }
                }

                if (!registeredBroadcastStates.isEmpty()) {
                    for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry : registeredBroadcastStates.entrySet()) {
                        BackendWritableBroadcastState<?, ?> broadcastState = entry.getValue();
                        if (null != broadcastState) {
                            broadcastState = broadcastState.deepCopy();
                        }
                        registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);
                    }
                }
            } finally {
                Thread.currentThread().setContextClassLoader(snapshotClassLoader);
            }

            AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>> snapshotCallable =
                new AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>() {

                    @Override
                    protected SnapshotResult<OperatorStateHandle> callInternal() throws Exception {

                        CheckpointStreamFactory.CheckpointStateOutputStream localOut =
                            streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
                        registerCloseableForCancellation(localOut);

                        // get the registered operator state infos ...
                        List<StateMetaInfoSnapshot> operatorMetaInfoSnapshots =
                            new ArrayList<>(registeredOperatorStatesDeepCopies.size());

                        for (Map.Entry<String, PartitionableListState<?>> entry :
                            registeredOperatorStatesDeepCopies.entrySet()) {
                            operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
                        }

                        // ... get the registered broadcast operator state infos ...
                        List<StateMetaInfoSnapshot> broadcastMetaInfoSnapshots =
                            new ArrayList<>(registeredBroadcastStatesDeepCopies.size());

                        for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
                            registeredBroadcastStatesDeepCopies.entrySet()) {
                            broadcastMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
                        }

                        // ... write them all in the checkpoint stream ...
                        DataOutputView dov = new DataOutputViewStreamWrapper(localOut);

                        OperatorBackendSerializationProxy backendSerializationProxy =
                            new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);

                        backendSerializationProxy.write(dov);

                        // ... and then go for the states ...

                        // we put BOTH normal and broadcast state metadata here
                        int initialMapCapacity =
                            registeredOperatorStatesDeepCopies.size() + registeredBroadcastStatesDeepCopies.size();
                        final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
                            new HashMap<>(initialMapCapacity);

                        for (Map.Entry<String, PartitionableListState<?>> entry :
                            registeredOperatorStatesDeepCopies.entrySet()) {

                            PartitionableListState<?> value = entry.getValue();
                            long[] partitionOffsets = value.write(localOut);
                            OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
                            writtenStatesMetaData.put(
                                entry.getKey(),
                                new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
                        }

                        // ... and the broadcast states themselves ...
                        for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
                            registeredBroadcastStatesDeepCopies.entrySet()) {

                            BackendWritableBroadcastState<?, ?> value = entry.getValue();
                            long[] partitionOffsets = {value.write(localOut)};
                            OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
                            writtenStatesMetaData.put(
                                entry.getKey(),
                                new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
                        }

                        // ... and, finally, create the state handle.
                        OperatorStateHandle retValue = null;

                        if (unregisterCloseableFromCancellation(localOut)) {

                            StreamStateHandle stateHandle = localOut.closeAndGetHandle();

                            if (stateHandle != null) {
                                retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle);
                            }

                            return SnapshotResult.of(retValue);
                        } else {
                            throw new IOException("Stream was already unregistered.");
                        }
                    }

                    @Override
                    protected void cleanupProvidedResources() {
                        // nothing to do
                    }

                    @Override
                    protected void logAsyncSnapshotComplete(long startTime) {
                        if (asynchronousSnapshots) {
                            logAsyncCompleted(streamFactory, startTime);
                        }
                    }
                };

            final FutureTask<SnapshotResult<OperatorStateHandle>> task =
                snapshotCallable.toAsyncSnapshotFutureTask(closeStreamOnCancelRegistry);

            if (!asynchronousSnapshots) {
                task.run();
            }

            return task;
        }
    }
  • DefaultOperatorStateBackendSnapshotStrategy繼承了AbstractSnapshotStrategy,它實現的snapshot方法主要是建立registeredOperatorStatesDeepCopies及registeredBroadcastStatesDeepCopies,而後經過AsyncSnapshotCallable來實現
  • AsyncSnapshotCallable抽象類實現了Callable接口的call方法,該方法會調用callInternal方法,而後再執行logAsyncSnapshotComplete方法
  • AsyncSnapshotCallable的callInternal方法返回的是SnapshotResult<OperatorStateHandle>,它裏頭主要是將registeredOperatorStatesDeepCopies及registeredBroadcastStatesDeepCopies的數據寫入到CheckpointStreamFactory(好比MemCheckpointStreamFactory).CheckpointStateOutputStream及writtenStatesMetaData,最後經過CheckpointStateOutputStream的closeAndGetHandle返回的stateHandle及writtenStatesMetaData建立OperatorStreamStateHandle返回

小結

  • OperatorStateBackend接口繼承了OperatorStateStore、Snapshotable、Closeable、Disposable接口
  • OperatorStateStore定義了getBroadcastState、getListState、getUnionListState方法用於create或restore BroadcastState或者ListState;同時也定義了getRegisteredStateNames、getRegisteredBroadcastStateNames用於返回當前註冊的state的名稱;DefaultOperatorStateBackend實現了OperatorStateStore接口,getRegisteredStateNames方法返回的是registeredOperatorStates.keySet();getRegisteredBroadcastStateNames方法返回的是registeredBroadcastStates.keySet()(registeredOperatorStates及registeredBroadcastStates這兩個都是內存的Map);getListState及getUnionListState方法都調用了getListState(ListStateDescriptor<S> stateDescriptor,OperatorStateHandle.Mode mode)方法
  • Snapshotable接口繼承了SnapshotStrategy接口,同時定義了restore方法用於restore state;SnapshotStrategy定義了snapshot方法,給不一樣的snapshot策略去實現,這裏要求snapshot結果返回的類型是StateObject類型;AbstractSnapshotStrategy是個抽象類,它沒有實現SnapshotStrategy定義的snapshot方法,這裏只是提供了logSyncCompleted方法打印debug信息
  • DefaultOperatorStateBackend實現了Snapshotable接口,snapshot方法使用的snapshotStrategy是DefaultOperatorStateBackendSnapshotStrategy;DefaultOperatorStateBackendSnapshotStrategy繼承了AbstractSnapshotStrategy,它實現的snapshot方法主要是建立registeredOperatorStatesDeepCopies及registeredBroadcastStatesDeepCopies,而後經過AsyncSnapshotCallable來實現,它裏頭主要是將registeredOperatorStatesDeepCopies及registeredBroadcastStatesDeepCopies的數據寫入到CheckpointStreamFactory(好比MemCheckpointStreamFactory).CheckpointStateOutputStream及writtenStatesMetaData
  • Snapshotable接口要求source的泛型爲StateObject類型,StateObject繼承了Serializable接口,由於會經過rpc在JobManager及TaskManager之間進行傳輸;OperatorStateBackend繼承Snapshotable接口時,指定source爲SnapshotResult<OperatorStateHandle>,而result的爲Collection<OperatorStateHandle>類型
  • StreamStateHandle繼承了StateObject接口,多定義了openInputStream方法;OperatorStateHandle繼承了StreamStateHandle,它多定義了getStateNameToPartitionOffsets、getDelegateStateHandle方法,其中getStateNameToPartitionOffsets提供了state name到可用partitions的offset的映射信息;OperatorStreamStateHandle實現了OperatorStateHandle接口,它定義了stateNameToPartitionOffsets屬性(Map<String,StateMetaInfo>),而getStateNameToPartitionOffsets方法就是返回的stateNameToPartitionOffsets屬性
  • SnapshotResult類實現了StateObject接口,它包裝了snapshot的結果,這裏包括jobManagerOwnedSnapshot、taskLocalSnapshot;它實現的discardState方法,調用了jobManagerOwnedSnapshot及taskLocalSnapshot的discardState方法;getStateSize方法則返回的是jobManagerOwnedSnapshot的stateSize

doc

相關文章
相關標籤/搜索