聊聊flink的StateDescriptor

本文主要研究一下flink的StateDescriptorhtml

RuntimeContext.getState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/RuntimeContext.javajava

/**
 * A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
 * of the function will have a context through which it can access static contextual information (such as
 * the current parallelism) and other constructs like accumulators and broadcast variables.
 *
 * <p>A function can, during runtime, obtain the RuntimeContext via a call to
 * {@link AbstractRichFunction#getRuntimeContext()}.
 */
@Public
public interface RuntimeContext {
	//......

	@PublicEvolving
	<T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties);

	@PublicEvolving
	<T> ListState<T> getListState(ListStateDescriptor<T> stateProperties);

	@PublicEvolving
	<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties);

	@PublicEvolving
	<IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties);

	@PublicEvolving
	@Deprecated
	<T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties);

	@PublicEvolving
	<UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties);

}
  • RuntimeContext針對各類state提供了根據對應StateDescriptor的get方法,好比提供了getState方法,經過ValueStateDescriptor參數來獲取ValueState;getListState經過ListStateDescriptor獲取ListState;getReducingState經過ReducingStateDescriptor獲取ReducingState;getAggregatingState經過AggregatingStateDescriptor獲取AggregatingState;getFoldingState經過FoldingStateDescriptor獲取FoldingState;getMapState經過MapStateDescriptor獲取MapState

StateDescriptor

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/StateDescriptor.javaapache

/**
 * Base class for state descriptors. A {@code StateDescriptor} is used for creating partitioned
 * {@link State} in stateful operations.
 *
 * <p>Subclasses must correctly implement {@link #equals(Object)} and {@link #hashCode()}.
 *
 * @param <S> The type of the State objects created from this {@code StateDescriptor}.
 * @param <T> The type of the value of the state object described by this state descriptor.
 */
@PublicEvolving
public abstract class StateDescriptor<S extends State, T> implements Serializable {

	/**
	 * An enumeration of the types of supported states. Used to identify the state type
	 * when writing and restoring checkpoints and savepoints.
	 */
	// IMPORTANT: Do not change the order of the elements in this enum, ordinal is used in serialization
	public enum Type {
		/**
		 * @deprecated Enum for migrating from old checkpoints/savepoint versions.
		 */
		@Deprecated
		UNKNOWN,
		VALUE,
		LIST,
		REDUCING,
		FOLDING,
		AGGREGATING,
		MAP
	}

	private static final long serialVersionUID = 1L;

	// ------------------------------------------------------------------------

	/** Name that uniquely identifies state created from this StateDescriptor. */
	protected final String name;

	/** The serializer for the type. May be eagerly initialized in the constructor,
	 * or lazily once the {@link #initializeSerializerUnlessSet(ExecutionConfig)} method
	 * is called. */
	@Nullable
	protected TypeSerializer<T> serializer;

	/** The type information describing the value type. Only used to if the serializer
	 * is created lazily. */
	@Nullable
	private TypeInformation<T> typeInfo;

	/** Name for queries against state created from this StateDescriptor. */
	@Nullable
	private String queryableStateName;

	/** Name for queries against state created from this StateDescriptor. */
	@Nonnull
	private StateTtlConfig ttlConfig = StateTtlConfig.DISABLED;

	/** The default value returned by the state when no other value is bound to a key. */
	@Nullable
	protected transient T defaultValue;

	// ------------------------------------------------------------------------

	/**
	 * Create a new {@code StateDescriptor} with the given name and the given type serializer.
	 *
	 * @param name The name of the {@code StateDescriptor}.
	 * @param serializer The type serializer for the values in the state.
	 * @param defaultValue The default value that will be set when requesting state without setting
	 *                     a value before.
	 */
	protected StateDescriptor(String name, TypeSerializer<T> serializer, @Nullable T defaultValue) {
		this.name = checkNotNull(name, "name must not be null");
		this.serializer = checkNotNull(serializer, "serializer must not be null");
		this.defaultValue = defaultValue;
	}

	/**
	 * Create a new {@code StateDescriptor} with the given name and the given type information.
	 *
	 * @param name The name of the {@code StateDescriptor}.
	 * @param typeInfo The type information for the values in the state.
	 * @param defaultValue The default value that will be set when requesting state without setting
	 *                     a value before.
	 */
	protected StateDescriptor(String name, TypeInformation<T> typeInfo, @Nullable T defaultValue) {
		this.name = checkNotNull(name, "name must not be null");
		this.typeInfo = checkNotNull(typeInfo, "type information must not be null");
		this.defaultValue = defaultValue;
	}

	/**
	 * Create a new {@code StateDescriptor} with the given name and the given type information.
	 *
	 * <p>If this constructor fails (because it is not possible to describe the type via a class),
	 * consider using the {@link #StateDescriptor(String, TypeInformation, Object)} constructor.
	 *
	 * @param name The name of the {@code StateDescriptor}.
	 * @param type The class of the type of values in the state.
	 * @param defaultValue The default value that will be set when requesting state without setting
	 *                     a value before.
	 */
	protected StateDescriptor(String name, Class<T> type, @Nullable T defaultValue) {
		this.name = checkNotNull(name, "name must not be null");
		checkNotNull(type, "type class must not be null");

		try {
			this.typeInfo = TypeExtractor.createTypeInfo(type);
		} catch (Exception e) {
			throw new RuntimeException(
					"Could not create the type information for '" + type.getName() + "'. " +
					"The most common reason is failure to infer the generic type information, due to Java's type erasure. " +
					"In that case, please pass a 'TypeHint' instead of a class to describe the type. " +
					"For example, to describe 'Tuple2<String, String>' as a generic type, use " +
					"'new PravegaDeserializationSchema<>(new TypeHint<Tuple2<String, String>>(){}, serializer);'", e);
		}

		this.defaultValue = defaultValue;
	}

	// ------------------------------------------------------------------------

	/**
	 * Returns the name of this {@code StateDescriptor}.
	 */
	public String getName() {
		return name;
	}

	/**
	 * Returns the default value.
	 */
	public T getDefaultValue() {
		if (defaultValue != null) {
			if (serializer != null) {
				return serializer.copy(defaultValue);
			} else {
				throw new IllegalStateException("Serializer not yet initialized.");
			}
		} else {
			return null;
		}
	}

	/**
	 * Returns the {@link TypeSerializer} that can be used to serialize the value in the state.
	 * Note that the serializer may initialized lazily and is only guaranteed to exist after
	 * calling {@link #initializeSerializerUnlessSet(ExecutionConfig)}.
	 */
	public TypeSerializer<T> getSerializer() {
		if (serializer != null) {
			return serializer.duplicate();
		} else {
			throw new IllegalStateException("Serializer not yet initialized.");
		}
	}

	/**
	 * Sets the name for queries of state created from this descriptor.
	 *
	 * <p>If a name is set, the created state will be published for queries
	 * during runtime. The name needs to be unique per job. If there is another
	 * state instance published under the same name, the job will fail during runtime.
	 *
	 * @param queryableStateName State name for queries (unique name per job)
	 * @throws IllegalStateException If queryable state name already set
	 */
	public void setQueryable(String queryableStateName) {
		Preconditions.checkArgument(
			ttlConfig.getUpdateType() == StateTtlConfig.UpdateType.Disabled,
			"Queryable state is currently not supported with TTL");
		if (this.queryableStateName == null) {
			this.queryableStateName = Preconditions.checkNotNull(queryableStateName, "Registration name");
		} else {
			throw new IllegalStateException("Queryable state name already set");
		}
	}

	/**
	 * Returns the queryable state name.
	 *
	 * @return Queryable state name or <code>null</code> if not set.
	 */
	@Nullable
	public String getQueryableStateName() {
		return queryableStateName;
	}

	/**
	 * Returns whether the state created from this descriptor is queryable.
	 *
	 * @return <code>true</code> if state is queryable, <code>false</code>
	 * otherwise.
	 */
	public boolean isQueryable() {
		return queryableStateName != null;
	}

	/**
	 * Configures optional activation of state time-to-live (TTL).
	 *
	 * <p>State user value will expire, become unavailable and be cleaned up in storage
	 * depending on configured {@link StateTtlConfig}.
	 *
	 * @param ttlConfig configuration of state TTL
	 */
	public void enableTimeToLive(StateTtlConfig ttlConfig) {
		Preconditions.checkNotNull(ttlConfig);
		Preconditions.checkArgument(
			ttlConfig.getUpdateType() != StateTtlConfig.UpdateType.Disabled &&
				queryableStateName == null,
			"Queryable state is currently not supported with TTL");
		this.ttlConfig = ttlConfig;
	}

	@Nonnull
	@Internal
	public StateTtlConfig getTtlConfig() {
		return ttlConfig;
	}

	// ------------------------------------------------------------------------

	/**
	 * Checks whether the serializer has been initialized. Serializer initialization is lazy,
	 * to allow parametrization of serializers with an {@link ExecutionConfig} via
	 * {@link #initializeSerializerUnlessSet(ExecutionConfig)}.
	 *
	 * @return True if the serializers have been initialized, false otherwise.
	 */
	public boolean isSerializerInitialized() {
		return serializer != null;
	}

	/**
	 * Initializes the serializer, unless it has been initialized before.
	 *
	 * @param executionConfig The execution config to use when creating the serializer.
	 */
	public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
		if (serializer == null) {
			checkState(typeInfo != null, "no serializer and no type info");

			// instantiate the serializer
			serializer = typeInfo.createSerializer(executionConfig);

			// we can drop the type info now, no longer needed
			typeInfo  = null;
		}
	}

	// ------------------------------------------------------------------------
	//  Standard Utils
	// ------------------------------------------------------------------------

	@Override
	public final int hashCode() {
		return name.hashCode() + 31 * getClass().hashCode();
	}

	@Override
	public final boolean equals(Object o) {
		if (o == this) {
			return true;
		}
		else if (o != null && o.getClass() == this.getClass()) {
			final StateDescriptor<?, ?> that = (StateDescriptor<?, ?>) o;
			return this.name.equals(that.name);
		}
		else {
			return false;
		}
	}

	@Override
	public String toString() {
		return getClass().getSimpleName() +
				"{name=" + name +
				", defaultValue=" + defaultValue +
				", serializer=" + serializer +
				(isQueryable() ? ", queryableStateName=" + queryableStateName + "" : "") +
				'}';
	}

	public abstract Type getType();

	// ------------------------------------------------------------------------
	//  Serialization
	// ------------------------------------------------------------------------

	private void writeObject(final ObjectOutputStream out) throws IOException {
		// write all the non-transient fields
		out.defaultWriteObject();

		// write the non-serializable default value field
		if (defaultValue == null) {
			// we don't have a default value
			out.writeBoolean(false);
		} else {
			// we have a default value
			out.writeBoolean(true);

			byte[] serializedDefaultValue;
			try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
					DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(baos)) {

				TypeSerializer<T> duplicateSerializer = serializer.duplicate();
				duplicateSerializer.serialize(defaultValue, outView);

				outView.flush();
				serializedDefaultValue = baos.toByteArray();
			}
			catch (Exception e) {
				throw new IOException("Unable to serialize default value of type " +
						defaultValue.getClass().getSimpleName() + ".", e);
			}

			out.writeInt(serializedDefaultValue.length);
			out.write(serializedDefaultValue);
		}
	}

	private void readObject(final ObjectInputStream in) throws IOException, ClassNotFoundException {
		// read the non-transient fields
		in.defaultReadObject();

		// read the default value field
		boolean hasDefaultValue = in.readBoolean();
		if (hasDefaultValue) {
			int size = in.readInt();

			byte[] buffer = new byte[size];

			in.readFully(buffer);

			try (ByteArrayInputStream bais = new ByteArrayInputStream(buffer);
					DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(bais)) {

				defaultValue = serializer.deserialize(inView);
			}
			catch (Exception e) {
				throw new IOException("Unable to deserialize default value.", e);
			}
		} else {
			defaultValue = null;
		}
	}
}
  • StateDescriptor是ValueStateDescriptor、ListStateDescriptor、ReducingStateDescriptor、FoldingStateDescriptor、AggregatingStateDescriptor、MapStateDescriptor的基類,它定義了一個抽象方法,返回Type類型(VALUE,LIST,EDUCING,FOLDING,AGGREGATING,MAP),用於各個子類表達本身的Type類型
  • StateDescriptor提供了幾個構造器,用於傳遞name、TypeSerializer或TypeInformation或Class類型信息、defaultValue
  • StateDescriptor重寫了equals及hashCode方法;它還實現了Serializable接口,另外還經過writeObject及readObject自定義序列化過程

小結

  • RuntimeContext針對各類state提供了根據對應StateDescriptor的get方法,好比getState、getListState、getReducingState、getAggregatingState、getFoldingState、getMapState
  • StateDescriptor是ValueStateDescriptor、ListStateDescriptor、ReducingStateDescriptor、FoldingStateDescriptor、AggregatingStateDescriptor、MapStateDescriptor的基類,它定義了一個抽象方法,返回Type類型(VALUE,LIST,EDUCING,FOLDING,AGGREGATING,MAP),用於各個子類表達本身的Type類型
  • StateDescriptor重寫了equals及hashCode方法;它還實現了Serializable接口,另外還經過writeObject及readObject自定義序列化過程

doc

相關文章
相關標籤/搜索