聊聊flink的Managed Keyed State

本文主要研究一下flink的Managed Keyed Statehtml

State

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/State.javajava

/**
 * Interface that different types of partitioned state must implement.
 *
 * <p>The state is only accessible by functions applied on a {@code KeyedStream}. The key is
 * automatically supplied by the system, so the function always sees the value mapped to the
 * key of the current element. That way, the system can handle stream and state partitioning
 * consistently together.
 */
@PublicEvolving
public interface State {

	/**
	 * Removes the value mapped under the current key.
	 */
	void clear();
}
  • State是全部不一樣類型的State必須實現的接口,它定義了clear方法

ValueState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/ValueState.javaapache

@PublicEvolving
public interface ValueState<T> extends State {

	/**
	 * Returns the current value for the state. When the state is not
	 * partitioned the returned value is the same for all inputs in a given
	 * operator instance. If state partitioning is applied, the value returned
	 * depends on the current operator input, as the operator maintains an
	 * independent state for each partition.
	 *
	 * <p>If you didn't specify a default value when creating the {@link ValueStateDescriptor}
	 * this will return {@code null} when to value was previously set using {@link #update(Object)}.
	 *
	 * @return The state value corresponding to the current input.
	 *
	 * @throws IOException Thrown if the system cannot access the state.
	 */
	T value() throws IOException;

	/**
	 * Updates the operator state accessible by {@link #value()} to the given
	 * value. The next time {@link #value()} is called (for the same state
	 * partition) the returned state will represent the updated value. When a
	 * partitioned state is updated with null, the state for the current key
	 * will be removed and the default value is returned on the next access.
	 *
	 * @param value The new value for the state.
	 *
	 * @throws IOException Thrown if the system cannot access the state.
	 */
	void update(T value) throws IOException;

}
  • ValueState繼承了State接口,它定義了value、update兩個方法,一個用於取值,一個用於更新值

AppendingState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/AppendingState.javaapi

@PublicEvolving
public interface AppendingState<IN, OUT> extends State {

	/**
	 * Returns the current value for the state. When the state is not
	 * partitioned the returned value is the same for all inputs in a given
	 * operator instance. If state partitioning is applied, the value returned
	 * depends on the current operator input, as the operator maintains an
	 * independent state for each partition.
	 *
	 * <p><b>NOTE TO IMPLEMENTERS:</b> if the state is empty, then this method
	 * should return {@code null}.
	 *
	 * @return The operator state value corresponding to the current input or {@code null}
	 * if the state is empty.
	 *
	 * @throws Exception Thrown if the system cannot access the state.
	 */
	OUT get() throws Exception;

	/**
	 * Updates the operator state accessible by {@link #get()} by adding the given value
	 * to the list of values. The next time {@link #get()} is called (for the same state
	 * partition) the returned state will represent the updated list.
	 *
	 * <p>If null is passed in, the state value will remain unchanged.
	 *
	 * @param value The new value for the state.
	 *
	 * @throws Exception Thrown if the system cannot access the state.
	 */
	void add(IN value) throws Exception;

}
  • AppendingState繼承了State接口,它定義了get、add方法,該State接收IN、OUT兩個泛型

FoldingState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/FoldingState.javaapp

@PublicEvolving
@Deprecated
public interface FoldingState<T, ACC> extends AppendingState<T, ACC> {}
  • FoldingState繼承了AppendingState,其中OUT泛型表示ACC,即累積值;FoldingState在Flink 1.4版本被標記爲廢棄,後續會被移除掉,可以使用AggregatingState替代

MergingState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/MergingState.javathis

/**
 * Extension of {@link AppendingState} that allows merging of state. That is, two instances
 * of {@link MergingState} can be combined into a single instance that contains all the
 * information of the two merged states.
 *
 * @param <IN> Type of the value that can be added to the state.
 * @param <OUT> Type of the value that can be retrieved from the state.
 */
@PublicEvolving
public interface MergingState<IN, OUT> extends AppendingState<IN, OUT> { }
  • MergingState繼承了AppendingState,這裏用命名錶達merge state的意思,它有幾個子接口,分別是ListState、ReducingState、AggregatingState

ListState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/ListState.javacode

@PublicEvolving
public interface ListState<T> extends MergingState<T, Iterable<T>> {

	/**
	 * Updates the operator state accessible by {@link #get()} by updating existing values to
	 * to the given list of values. The next time {@link #get()} is called (for the same state
	 * partition) the returned state will represent the updated list.
	 *
	 * <p>If null or an empty list is passed in, the state value will be null.
	 *
	 * @param values The new values for the state.
	 *
	 * @throws Exception The method may forward exception thrown internally (by I/O or functions).
	 */
	void update(List<T> values) throws Exception;

	/**
	 * Updates the operator state accessible by {@link #get()} by adding the given values
	 * to existing list of values. The next time {@link #get()} is called (for the same state
	 * partition) the returned state will represent the updated list.
	 *
	 * <p>If null or an empty list is passed in, the state value remains unchanged.
	 *
	 * @param values The new values to be added to the state.
	 *
	 * @throws Exception The method may forward exception thrown internally (by I/O or functions).
	 */
	void addAll(List<T> values) throws Exception;
}
  • ListState繼承了MergingState,它的OUT類型爲Iterable<IN>;它主要用於operation存儲partitioned list state,它繼承了MergingState接口(指定OUT的泛型爲Iterable<T>),同時聲明瞭兩個方法;其中update用於全量更新state,若是參數爲null或者empty,那麼state會被清空;addAll方法用於增量更新,若是參數爲null或者empty,則保持不變,不然則新增給定的values

ReducingState

flink-core/1.7.0/flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/ReducingState.javaorm

@PublicEvolving
public interface ReducingState<T> extends MergingState<T, T> {}
  • ReducingState繼承了MergingState,它的IN、OUT類型相同

AggregatingState

flink-core/1.7.0/flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/AggregatingState.javahtm

@PublicEvolving
public interface AggregatingState<IN, OUT> extends MergingState<IN, OUT> {}
  • AggregatingState繼承了MergingState,它與ReducingState不一樣,IN、OUT類型能夠不一樣

MapState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/MapState.java繼承

@PublicEvolving
public interface MapState<UK, UV> extends State {

	/**
	 * Returns the current value associated with the given key.
	 *
	 * @param key The key of the mapping
	 * @return The value of the mapping with the given key
	 *
	 * @throws Exception Thrown if the system cannot access the state.
	 */
	UV get(UK key) throws Exception;

	/**
	 * Associates a new value with the given key.
	 *
	 * @param key The key of the mapping
	 * @param value The new value of the mapping
	 *
	 * @throws Exception Thrown if the system cannot access the state.
	 */
	void put(UK key, UV value) throws Exception;

	/**
	 * Copies all of the mappings from the given map into the state.
	 *
	 * @param map The mappings to be stored in this state
	 *
	 * @throws Exception Thrown if the system cannot access the state.
	 */
	void putAll(Map<UK, UV> map) throws Exception;

	/**
	 * Deletes the mapping of the given key.
	 *
	 * @param key The key of the mapping
	 *
	 * @throws Exception Thrown if the system cannot access the state.
	 */
	void remove(UK key) throws Exception;

	/**
	 * Returns whether there exists the given mapping.
	 *
	 * @param key The key of the mapping
	 * @return True if there exists a mapping whose key equals to the given key
	 *
	 * @throws Exception Thrown if the system cannot access the state.
	 */
	boolean contains(UK key) throws Exception;

	/**
	 * Returns all the mappings in the state.
	 *
	 * @return An iterable view of all the key-value pairs in the state.
	 *
	 * @throws Exception Thrown if the system cannot access the state.
	 */
	Iterable<Map.Entry<UK, UV>> entries() throws Exception;

	/**
	 * Returns all the keys in the state.
	 *
	 * @return An iterable view of all the keys in the state.
	 *
	 * @throws Exception Thrown if the system cannot access the state.
	 */
	Iterable<UK> keys() throws Exception;

	/**
	 * Returns all the values in the state.
	 *
	 * @return An iterable view of all the values in the state.
	 *
	 * @throws Exception Thrown if the system cannot access the state.
	 */
	Iterable<UV> values() throws Exception;

	/**
	 * Iterates over all the mappings in the state.
	 *
	 * @return An iterator over all the mappings in the state
	 *
	 * @throws Exception Thrown if the system cannot access the state.
	 */
	Iterator<Map.Entry<UK, UV>> iterator() throws Exception;
}
  • MapState直接繼承了State,它接收UK、UV兩個泛型,分別是map的key和value的類型

小結

  • flink提供了好幾個不一樣類型的Managed Keyed State,有ValueState<T>、ListState<T>、ReducingState<T>、AggregatingState<IN, OUT>、FoldingState<T, ACC>、MapState<UK, UV>
  • ValueState<T>和MapState<UK, UV>是直接繼承State接口;FoldingState繼承了AppendingState<IN, OUT>(AppendingState直接繼承了State);ListState、ReducingState、AggregatingState繼承了MergingState<IN, OUT>(MergingState繼承了AppendingState)
  • FoldingState在Flink 1.4版本被標記爲廢棄,後續會被移除掉,可以使用AggregatingState替代

doc

相關文章
相關標籤/搜索