本文主要研究一下flink的PartitionableListStatehtml
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/DefaultOperatorStateBackend.javajava
/** * Implementation of operator list state. * * @param <S> the type of an operator state partition. */ static final class PartitionableListState<S> implements ListState<S> { /** * Meta information of the state, including state name, assignment mode, and serializer */ private RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo; /** * The internal list the holds the elements of the state */ private final ArrayList<S> internalList; /** * A serializer that allows to perform deep copies of internalList */ private final ArrayListSerializer<S> internalListCopySerializer; PartitionableListState(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) { this(stateMetaInfo, new ArrayList<S>()); } private PartitionableListState( RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo, ArrayList<S> internalList) { this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo); this.internalList = Preconditions.checkNotNull(internalList); this.internalListCopySerializer = new ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer()); } private PartitionableListState(PartitionableListState<S> toCopy) { this(toCopy.stateMetaInfo.deepCopy(), toCopy.internalListCopySerializer.copy(toCopy.internalList)); } public void setStateMetaInfo(RegisteredOperatorStateBackendMetaInfo<S> stateMetaInfo) { this.stateMetaInfo = stateMetaInfo; } public RegisteredOperatorStateBackendMetaInfo<S> getStateMetaInfo() { return stateMetaInfo; } public PartitionableListState<S> deepCopy() { return new PartitionableListState<>(this); } @Override public void clear() { internalList.clear(); } @Override public Iterable<S> get() { return internalList; } @Override public void add(S value) { Preconditions.checkNotNull(value, "You cannot add null to a ListState."); internalList.add(value); } @Override public String toString() { return "PartitionableListState{" + "stateMetaInfo=" + stateMetaInfo + ", internalList=" + internalList + '}'; } public long[] write(FSDataOutputStream out) throws IOException { long[] partitionOffsets = new long[internalList.size()]; DataOutputView dov = new DataOutputViewStreamWrapper(out); for (int i = 0; i < internalList.size(); ++i) { S element = internalList.get(i); partitionOffsets[i] = out.getPos(); getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov); } return partitionOffsets; } @Override public void update(List<S> values) { internalList.clear(); addAll(values); } @Override public void addAll(List<S> values) { if (values != null && !values.isEmpty()) { internalList.addAll(values); } } }
internalList
)來存儲state,而stateMetaInfo使用的是RegisteredOperatorStateBackendMetaInfo;其write方法將internalList的數據序列化到FSDataOutputStream,並返回每一個記錄對應的offset數組(partitionOffsets
)flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/ListState.javaapache
/** * {@link State} interface for partitioned list state in Operations. * The state is accessed and modified by user functions, and checkpointed consistently * by the system as part of the distributed snapshots. * * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is * automatically supplied by the system, so the function always sees the value mapped to the * key of the current element. That way, the system can handle stream and state partitioning * consistently together. * * @param <T> Type of values that this list state keeps. */ @PublicEvolving public interface ListState<T> extends MergingState<T, Iterable<T>> { /** * Updates the operator state accessible by {@link #get()} by updating existing values to * to the given list of values. The next time {@link #get()} is called (for the same state * partition) the returned state will represent the updated list. * * <p>If null or an empty list is passed in, the state value will be null. * * @param values The new values for the state. * * @throws Exception The method may forward exception thrown internally (by I/O or functions). */ void update(List<T> values) throws Exception; /** * Updates the operator state accessible by {@link #get()} by adding the given values * to existing list of values. The next time {@link #get()} is called (for the same state * partition) the returned state will represent the updated list. * * <p>If null or an empty list is passed in, the state value remains unchanged. * * @param values The new values to be added to the state. * * @throws Exception The method may forward exception thrown internally (by I/O or functions). */ void addAll(List<T> values) throws Exception; }
指定OUT的泛型爲Iterable<T>
),同時聲明瞭兩個方法;其中update用於全量更新state,若是參數爲null或者empty,那麼state會被清空;addAll方法用於增量更新,若是參數爲null或者empty,則保持不變,不然則新增給定的valuesflink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/MergingState.javaapi
/** * Extension of {@link AppendingState} that allows merging of state. That is, two instances * of {@link MergingState} can be combined into a single instance that contains all the * information of the two merged states. * * @param <IN> Type of the value that can be added to the state. * @param <OUT> Type of the value that can be retrieved from the state. */ @PublicEvolving public interface MergingState<IN, OUT> extends AppendingState<IN, OUT> { }
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/AppendingState.java數組
/** * Base interface for partitioned state that supports adding elements and inspecting the current * state. Elements can either be kept in a buffer (list-like) or aggregated into one value. * * <p>The state is accessed and modified by user functions, and checkpointed consistently * by the system as part of the distributed snapshots. * * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is * automatically supplied by the system, so the function always sees the value mapped to the * key of the current element. That way, the system can handle stream and state partitioning * consistently together. * * @param <IN> Type of the value that can be added to the state. * @param <OUT> Type of the value that can be retrieved from the state. */ @PublicEvolving public interface AppendingState<IN, OUT> extends State { /** * Returns the current value for the state. When the state is not * partitioned the returned value is the same for all inputs in a given * operator instance. If state partitioning is applied, the value returned * depends on the current operator input, as the operator maintains an * independent state for each partition. * * <p><b>NOTE TO IMPLEMENTERS:</b> if the state is empty, then this method * should return {@code null}. * * @return The operator state value corresponding to the current input or {@code null} * if the state is empty. * * @throws Exception Thrown if the system cannot access the state. */ OUT get() throws Exception; /** * Updates the operator state accessible by {@link #get()} by adding the given value * to the list of values. The next time {@link #get()} is called (for the same state * partition) the returned state will represent the updated list. * * <p>If null is passed in, the state value will remain unchanged. * * @param value The new value for the state. * * @throws Exception Thrown if the system cannot access the state. */ void add(IN value) throws Exception; }
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/State.javaapp
/** * Interface that different types of partitioned state must implement. * * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is * automatically supplied by the system, so the function always sees the value mapped to the * key of the current element. That way, the system can handle stream and state partitioning * consistently together. */ @PublicEvolving public interface State { /** * Removes the value mapped under the current key. */ void clear(); }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.javaide
/** * Compound meta information for a registered state in an operator state backend. * This contains the state name, assignment mode, and state partition serializer. * * @param <S> Type of the state. */ public class RegisteredOperatorStateBackendMetaInfo<S> extends RegisteredStateMetaInfoBase { /** * The mode how elements in this state are assigned to tasks during restore */ @Nonnull private final OperatorStateHandle.Mode assignmentMode; /** * The type serializer for the elements in the state list */ @Nonnull private final TypeSerializer<S> partitionStateSerializer; public RegisteredOperatorStateBackendMetaInfo( @Nonnull String name, @Nonnull TypeSerializer<S> partitionStateSerializer, @Nonnull OperatorStateHandle.Mode assignmentMode) { super(name); this.partitionStateSerializer = partitionStateSerializer; this.assignmentMode = assignmentMode; } private RegisteredOperatorStateBackendMetaInfo(@Nonnull RegisteredOperatorStateBackendMetaInfo<S> copy) { this( Preconditions.checkNotNull(copy).name, copy.partitionStateSerializer.duplicate(), copy.assignmentMode); } @SuppressWarnings("unchecked") public RegisteredOperatorStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) { this( snapshot.getName(), (TypeSerializer<S>) Preconditions.checkNotNull( snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)), OperatorStateHandle.Mode.valueOf( snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE))); Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.OPERATOR == snapshot.getBackendStateType()); } /** * Creates a deep copy of the itself. */ @Nonnull public RegisteredOperatorStateBackendMetaInfo<S> deepCopy() { return new RegisteredOperatorStateBackendMetaInfo<>(this); } @Nonnull @Override public StateMetaInfoSnapshot snapshot() { return computeSnapshot(); } //...... @Nonnull private StateMetaInfoSnapshot computeSnapshot() { Map<String, String> optionsMap = Collections.singletonMap( StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(), assignmentMode.toString()); String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(); Map<String, TypeSerializer<?>> serializerMap = Collections.singletonMap(valueSerializerKey, partitionStateSerializer.duplicate()); Map<String, TypeSerializerSnapshot<?>> serializerConfigSnapshotsMap = Collections.singletonMap(valueSerializerKey, partitionStateSerializer.snapshotConfiguration()); return new StateMetaInfoSnapshot( name, StateMetaInfoSnapshot.BackendStateType.OPERATOR, optionsMap, serializerConfigSnapshotsMap, serializerMap); } }
internalList
)來存儲state,而stateMetaInfo使用的是RegisteredOperatorStateBackendMetaInfoupdate、addAll方法
);而ListState接口繼承了MergingState接口(指定OUT的泛型爲Iterable<T>
);MergingState接口沒有聲明其餘方法,它繼承了AppendingState接口;AppendingState接口繼承了State接口,同時聲明瞭get、add方法;State接口則定義了clear方法