本文主要研究一下flink的StateDescriptorhtml
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/RuntimeContext.javajava
/** * A RuntimeContext contains information about the context in which functions are executed. Each parallel instance * of the function will have a context through which it can access static contextual information (such as * the current parallelism) and other constructs like accumulators and broadcast variables. * * <p>A function can, during runtime, obtain the RuntimeContext via a call to * {@link AbstractRichFunction#getRuntimeContext()}. */ @Public public interface RuntimeContext { //...... @PublicEvolving <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties); @PublicEvolving <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties); @PublicEvolving <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties); @PublicEvolving <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties); @PublicEvolving @Deprecated <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties); @PublicEvolving <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties); }
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/StateDescriptor.javaapache
/** * Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned * {@link State} in stateful operations. * * <p>Subclasses must correctly implement {@link #equals(Object)} and {@link #hashCode()}. * * @param <S> The type of the State objects created from this {@code StateDescriptor}. * @param <T> The type of the value of the state object described by this state descriptor. */ @PublicEvolving public abstract class StateDescriptor<S extends State, T> implements Serializable { /** * An enumeration of the types of supported states. Used to identify the state type * when writing and restoring checkpoints and savepoints. */ // IMPORTANT: Do not change the order of the elements in this enum, ordinal is used in serialization public enum Type { /** * @deprecated Enum for migrating from old checkpoints/savepoint versions. */ @Deprecated UNKNOWN, VALUE, LIST, REDUCING, FOLDING, AGGREGATING, MAP } private static final long serialVersionUID = 1L; // ------------------------------------------------------------------------ /** Name that uniquely identifies state created from this StateDescriptor. */ protected final String name; /** The serializer for the type. May be eagerly initialized in the constructor, * or lazily once the {@link #initializeSerializerUnlessSet(ExecutionConfig)} method * is called. */ @Nullable protected TypeSerializer<T> serializer; /** The type information describing the value type. Only used to if the serializer * is created lazily. */ @Nullable private TypeInformation<T> typeInfo; /** Name for queries against state created from this StateDescriptor. */ @Nullable private String queryableStateName; /** Name for queries against state created from this StateDescriptor. */ @Nonnull private StateTtlConfig ttlConfig = StateTtlConfig.DISABLED; /** The default value returned by the state when no other value is bound to a key. */ @Nullable protected transient T defaultValue; // ------------------------------------------------------------------------ /** * Create a new {@code StateDescriptor} with the given name and the given type serializer. * * @param name The name of the {@code StateDescriptor}. * @param serializer The type serializer for the values in the state. * @param defaultValue The default value that will be set when requesting state without setting * a value before. */ protected StateDescriptor(String name, TypeSerializer<T> serializer, @Nullable T defaultValue) { this.name = checkNotNull(name, "name must not be null"); this.serializer = checkNotNull(serializer, "serializer must not be null"); this.defaultValue = defaultValue; } /** * Create a new {@code StateDescriptor} with the given name and the given type information. * * @param name The name of the {@code StateDescriptor}. * @param typeInfo The type information for the values in the state. * @param defaultValue The default value that will be set when requesting state without setting * a value before. */ protected StateDescriptor(String name, TypeInformation<T> typeInfo, @Nullable T defaultValue) { this.name = checkNotNull(name, "name must not be null"); this.typeInfo = checkNotNull(typeInfo, "type information must not be null"); this.defaultValue = defaultValue; } /** * Create a new {@code StateDescriptor} with the given name and the given type information. * * <p>If this constructor fails (because it is not possible to describe the type via a class), * consider using the {@link #StateDescriptor(String, TypeInformation, Object)} constructor. * * @param name The name of the {@code StateDescriptor}. * @param type The class of the type of values in the state. * @param defaultValue The default value that will be set when requesting state without setting * a value before. */ protected StateDescriptor(String name, Class<T> type, @Nullable T defaultValue) { this.name = checkNotNull(name, "name must not be null"); checkNotNull(type, "type class must not be null"); try { this.typeInfo = TypeExtractor.createTypeInfo(type); } catch (Exception e) { throw new RuntimeException( "Could not create the type information for '" + type.getName() + "'. " + "The most common reason is failure to infer the generic type information, due to Java's type erasure. " + "In that case, please pass a 'TypeHint' instead of a class to describe the type. " + "For example, to describe 'Tuple2<String, String>' as a generic type, use " + "'new PravegaDeserializationSchema<>(new TypeHint<Tuple2<String, String>>(){}, serializer);'", e); } this.defaultValue = defaultValue; } // ------------------------------------------------------------------------ /** * Returns the name of this {@code StateDescriptor}. */ public String getName() { return name; } /** * Returns the default value. */ public T getDefaultValue() { if (defaultValue != null) { if (serializer != null) { return serializer.copy(defaultValue); } else { throw new IllegalStateException("Serializer not yet initialized."); } } else { return null; } } /** * Returns the {@link TypeSerializer} that can be used to serialize the value in the state. * Note that the serializer may initialized lazily and is only guaranteed to exist after * calling {@link #initializeSerializerUnlessSet(ExecutionConfig)}. */ public TypeSerializer<T> getSerializer() { if (serializer != null) { return serializer.duplicate(); } else { throw new IllegalStateException("Serializer not yet initialized."); } } /** * Sets the name for queries of state created from this descriptor. * * <p>If a name is set, the created state will be published for queries * during runtime. The name needs to be unique per job. If there is another * state instance published under the same name, the job will fail during runtime. * * @param queryableStateName State name for queries (unique name per job) * @throws IllegalStateException If queryable state name already set */ public void setQueryable(String queryableStateName) { Preconditions.checkArgument( ttlConfig.getUpdateType() == StateTtlConfig.UpdateType.Disabled, "Queryable state is currently not supported with TTL"); if (this.queryableStateName == null) { this.queryableStateName = Preconditions.checkNotNull(queryableStateName, "Registration name"); } else { throw new IllegalStateException("Queryable state name already set"); } } /** * Returns the queryable state name. * * @return Queryable state name or <code>null</code> if not set. */ @Nullable public String getQueryableStateName() { return queryableStateName; } /** * Returns whether the state created from this descriptor is queryable. * * @return <code>true</code> if state is queryable, <code>false</code> * otherwise. */ public boolean isQueryable() { return queryableStateName != null; } /** * Configures optional activation of state time-to-live (TTL). * * <p>State user value will expire, become unavailable and be cleaned up in storage * depending on configured {@link StateTtlConfig}. * * @param ttlConfig configuration of state TTL */ public void enableTimeToLive(StateTtlConfig ttlConfig) { Preconditions.checkNotNull(ttlConfig); Preconditions.checkArgument( ttlConfig.getUpdateType() != StateTtlConfig.UpdateType.Disabled && queryableStateName == null, "Queryable state is currently not supported with TTL"); this.ttlConfig = ttlConfig; } @Nonnull @Internal public StateTtlConfig getTtlConfig() { return ttlConfig; } // ------------------------------------------------------------------------ /** * Checks whether the serializer has been initialized. Serializer initialization is lazy, * to allow parametrization of serializers with an {@link ExecutionConfig} via * {@link #initializeSerializerUnlessSet(ExecutionConfig)}. * * @return True if the serializers have been initialized, false otherwise. */ public boolean isSerializerInitialized() { return serializer != null; } /** * Initializes the serializer, unless it has been initialized before. * * @param executionConfig The execution config to use when creating the serializer. */ public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { if (serializer == null) { checkState(typeInfo != null, "no serializer and no type info"); // instantiate the serializer serializer = typeInfo.createSerializer(executionConfig); // we can drop the type info now, no longer needed typeInfo = null; } } // ------------------------------------------------------------------------ // Standard Utils // ------------------------------------------------------------------------ @Override public final int hashCode() { return name.hashCode() + 31 * getClass().hashCode(); } @Override public final boolean equals(Object o) { if (o == this) { return true; } else if (o != null && o.getClass() == this.getClass()) { final StateDescriptor<?, ?> that = (StateDescriptor<?, ?>) o; return this.name.equals(that.name); } else { return false; } } @Override public String toString() { return getClass().getSimpleName() + "{name=" + name + ", defaultValue=" + defaultValue + ", serializer=" + serializer + (isQueryable() ? ", queryableStateName=" + queryableStateName + "" : "") + '}'; } public abstract Type getType(); // ------------------------------------------------------------------------ // Serialization // ------------------------------------------------------------------------ private void writeObject(final ObjectOutputStream out) throws IOException { // write all the non-transient fields out.defaultWriteObject(); // write the non-serializable default value field if (defaultValue == null) { // we don't have a default value out.writeBoolean(false); } else { // we have a default value out.writeBoolean(true); byte[] serializedDefaultValue; try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos)) { TypeSerializer<T> duplicateSerializer = serializer.duplicate(); duplicateSerializer.serialize(defaultValue, outView); outView.flush(); serializedDefaultValue = baos.toByteArray(); } catch (Exception e) { throw new IOException("Unable to serialize default value of type " + defaultValue.getClass().getSimpleName() + ".", e); } out.writeInt(serializedDefaultValue.length); out.write(serializedDefaultValue); } } private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException { // read the non-transient fields in.defaultReadObject(); // read the default value field boolean hasDefaultValue = in.readBoolean(); if (hasDefaultValue) { int size = in.readInt(); byte[] buffer = new byte[size]; in.readFully(buffer); try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer); DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) { defaultValue = serializer.deserialize(inView); } catch (Exception e) { throw new IOException("Unable to deserialize default value.", e); } } else { defaultValue = null; } } }
VALUE,LIST,EDUCING,FOLDING,AGGREGATING,MAP
),用於各個子類表達本身的Type類型VALUE,LIST,EDUCING,FOLDING,AGGREGATING,MAP
),用於各個子類表達本身的Type類型