本文主要研究一下flink的StateTtlConfightml
import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class); stateDescriptor.enableTimeToLive(ttlConfig);
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/StateTtlConfig.javajava
/** * Configuration of state TTL logic. * * <p>Note: The map state with TTL currently supports {@code null} user values * only if the user value serializer can handle {@code null} values. * If the serializer does not support {@code null} values, * it can be wrapped with {@link org.apache.flink.api.java.typeutils.runtime.NullableSerializer} * at the cost of an extra byte in the serialized form. */ public class StateTtlConfig implements Serializable { private static final long serialVersionUID = -7592693245044289793L; public static final StateTtlConfig DISABLED = newBuilder(Time.milliseconds(Long.MAX_VALUE)).setUpdateType(UpdateType.Disabled).build(); /** * This option value configures when to update last access timestamp which prolongs state TTL. */ public enum UpdateType { /** TTL is disabled. State does not expire. */ Disabled, /** Last access timestamp is initialised when state is created and updated on every write operation. */ OnCreateAndWrite, /** The same as <code>OnCreateAndWrite</code> but also updated on read. */ OnReadAndWrite } /** * This option configures whether expired user value can be returned or not. */ public enum StateVisibility { /** Return expired user value if it is not cleaned up yet. */ ReturnExpiredIfNotCleanedUp, /** Never return expired user value. */ NeverReturnExpired } /** * This option configures time scale to use for ttl. */ public enum TimeCharacteristic { /** Processing time, see also <code>TimeCharacteristic.ProcessingTime</code>. */ ProcessingTime } private final UpdateType updateType; private final StateVisibility stateVisibility; private final TimeCharacteristic timeCharacteristic; private final Time ttl; private final CleanupStrategies cleanupStrategies; private StateTtlConfig( UpdateType updateType, StateVisibility stateVisibility, TimeCharacteristic timeCharacteristic, Time ttl, CleanupStrategies cleanupStrategies) { this.updateType = Preconditions.checkNotNull(updateType); this.stateVisibility = Preconditions.checkNotNull(stateVisibility); this.timeCharacteristic = Preconditions.checkNotNull(timeCharacteristic); this.ttl = Preconditions.checkNotNull(ttl); this.cleanupStrategies = cleanupStrategies; Preconditions.checkArgument(ttl.toMilliseconds() > 0, "TTL is expected to be positive"); } @Nonnull public UpdateType getUpdateType() { return updateType; } @Nonnull public StateVisibility getStateVisibility() { return stateVisibility; } @Nonnull public Time getTtl() { return ttl; } @Nonnull public TimeCharacteristic getTimeCharacteristic() { return timeCharacteristic; } public boolean isEnabled() { return updateType != UpdateType.Disabled; } @Nonnull public CleanupStrategies getCleanupStrategies() { return cleanupStrategies; } @Override public String toString() { return "StateTtlConfig{" + "updateType=" + updateType + ", stateVisibility=" + stateVisibility + ", timeCharacteristic=" + timeCharacteristic + ", ttl=" + ttl + '}'; } @Nonnull public static Builder newBuilder(@Nonnull Time ttl) { return new Builder(ttl); } /** * Builder for the {@link StateTtlConfig}. */ public static class Builder { private UpdateType updateType = OnCreateAndWrite; private StateVisibility stateVisibility = NeverReturnExpired; private TimeCharacteristic timeCharacteristic = ProcessingTime; private Time ttl; private CleanupStrategies cleanupStrategies = new CleanupStrategies(); public Builder(@Nonnull Time ttl) { this.ttl = ttl; } /** * Sets the ttl update type. * * @param updateType The ttl update type configures when to update last access timestamp which prolongs state TTL. */ @Nonnull public Builder setUpdateType(UpdateType updateType) { this.updateType = updateType; return this; } @Nonnull public Builder updateTtlOnCreateAndWrite() { return setUpdateType(UpdateType.OnCreateAndWrite); } @Nonnull public Builder updateTtlOnReadAndWrite() { return setUpdateType(UpdateType.OnReadAndWrite); } /** * Sets the state visibility. * * @param stateVisibility The state visibility configures whether expired user value can be returned or not. */ @Nonnull public Builder setStateVisibility(@Nonnull StateVisibility stateVisibility) { this.stateVisibility = stateVisibility; return this; } @Nonnull public Builder returnExpiredIfNotCleanedUp() { return setStateVisibility(StateVisibility.ReturnExpiredIfNotCleanedUp); } @Nonnull public Builder neverReturnExpired() { return setStateVisibility(StateVisibility.NeverReturnExpired); } /** * Sets the time characteristic. * * @param timeCharacteristic The time characteristic configures time scale to use for ttl. */ @Nonnull public Builder setTimeCharacteristic(@Nonnull TimeCharacteristic timeCharacteristic) { this.timeCharacteristic = timeCharacteristic; return this; } @Nonnull public Builder useProcessingTime() { return setTimeCharacteristic(TimeCharacteristic.ProcessingTime); } /** Cleanup expired state in full snapshot on checkpoint. */ @Nonnull public Builder cleanupFullSnapshot() { cleanupStrategies.strategies.put( CleanupStrategies.Strategies.FULL_STATE_SCAN_SNAPSHOT, new CleanupStrategies.CleanupStrategy() { }); return this; } /** * Sets the ttl time. * @param ttl The ttl time. */ @Nonnull public Builder setTtl(@Nonnull Time ttl) { this.ttl = ttl; return this; } @Nonnull public StateTtlConfig build() { return new StateTtlConfig( updateType, stateVisibility, timeCharacteristic, ttl, cleanupStrategies); } } /** * TTL cleanup strategies. * * <p>This class configures when to cleanup expired state with TTL. * By default, state is always cleaned up on explicit read access if found expired. * Currently cleanup of state full snapshot can be additionally activated. */ public static class CleanupStrategies implements Serializable { private static final long serialVersionUID = -1617740467277313524L; /** Fixed strategies ordinals in {@code strategies} config field. */ enum Strategies { FULL_STATE_SCAN_SNAPSHOT } /** Base interface for cleanup strategies configurations. */ interface CleanupStrategy extends Serializable { } final EnumMap<Strategies, CleanupStrategy> strategies = new EnumMap<>(Strategies.class); public boolean inFullSnapshot() { return strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT); } } }
Disabled、OnCreateAndWrite、OnReadAndWrite
)、StateVisibility(ReturnExpiredIfNotCleanedUp、NeverReturnExpired
)、TimeCharacteristic(ProcessingTime
)在checkpoint時清理full snapshot中的expired state
)的選項flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/AbstractKeyedStateBackend.javaapache
/** * @see KeyedStateBackend */ @Override @SuppressWarnings("unchecked") public <N, S extends State, V> S getOrCreateKeyedState( final TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor) throws Exception { checkNotNull(namespaceSerializer, "Namespace serializer"); checkNotNull(keySerializer, "State key serializer has not been configured in the config. " + "This operation cannot use partitioned state."); InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName()); if (kvState == null) { if (!stateDescriptor.isSerializerInitialized()) { stateDescriptor.initializeSerializerUnlessSet(executionConfig); } kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled( namespaceSerializer, stateDescriptor, this, ttlTimeProvider); keyValueStatesByName.put(stateDescriptor.getName(), kvState); publishQueryableStateIfEnabled(stateDescriptor, kvState); } return (S) kvState; }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/TtlStateFactory.javaapi
/** * This state factory wraps state objects, produced by backends, with TTL logic. */ public class TtlStateFactory<N, SV, S extends State, IS extends S> { public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled( TypeSerializer<N> namespaceSerializer, StateDescriptor<S, SV> stateDesc, KeyedStateFactory originalStateFactory, TtlTimeProvider timeProvider) throws Exception { Preconditions.checkNotNull(namespaceSerializer); Preconditions.checkNotNull(stateDesc); Preconditions.checkNotNull(originalStateFactory); Preconditions.checkNotNull(timeProvider); return stateDesc.getTtlConfig().isEnabled() ? new TtlStateFactory<N, SV, S, IS>( namespaceSerializer, stateDesc, originalStateFactory, timeProvider) .createState() : originalStateFactory.createInternalState(namespaceSerializer, stateDesc); } private final Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> stateFactories; private final TypeSerializer<N> namespaceSerializer; private final StateDescriptor<S, SV> stateDesc; private final KeyedStateFactory originalStateFactory; private final StateTtlConfig ttlConfig; private final TtlTimeProvider timeProvider; private final long ttl; private TtlStateFactory( TypeSerializer<N> namespaceSerializer, StateDescriptor<S, SV> stateDesc, KeyedStateFactory originalStateFactory, TtlTimeProvider timeProvider) { this.namespaceSerializer = namespaceSerializer; this.stateDesc = stateDesc; this.originalStateFactory = originalStateFactory; this.ttlConfig = stateDesc.getTtlConfig(); this.timeProvider = timeProvider; this.ttl = ttlConfig.getTtl().toMilliseconds(); this.stateFactories = createStateFactories(); } private Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> createStateFactories() { return Stream.of( Tuple2.of(ValueStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createValueState), Tuple2.of(ListStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createListState), Tuple2.of(MapStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createMapState), Tuple2.of(ReducingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createReducingState), Tuple2.of(AggregatingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createAggregatingState), Tuple2.of(FoldingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createFoldingState) ).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); } private IS createState() throws Exception { SupplierWithException<IS, Exception> stateFactory = stateFactories.get(stateDesc.getClass()); if (stateFactory == null) { String message = String.format("State %s is not supported by %s", stateDesc.getClass(), TtlStateFactory.class); throw new FlinkRuntimeException(message); } return stateFactory.get(); } @SuppressWarnings("unchecked") private IS createValueState() throws Exception { ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>( stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer())); return (IS) new TtlValueState<>( originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()), ttlConfig, timeProvider, stateDesc.getSerializer()); } @SuppressWarnings("unchecked") private <T> IS createListState() throws Exception { ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) stateDesc; ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>( stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer())); return (IS) new TtlListState<>( originalStateFactory.createInternalState( namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()), ttlConfig, timeProvider, listStateDesc.getSerializer()); } @SuppressWarnings("unchecked") private <UK, UV> IS createMapState() throws Exception { MapStateDescriptor<UK, UV> mapStateDesc = (MapStateDescriptor<UK, UV>) stateDesc; MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor = new MapStateDescriptor<>( stateDesc.getName(), mapStateDesc.getKeySerializer(), new TtlSerializer<>(mapStateDesc.getValueSerializer())); return (IS) new TtlMapState<>( originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()), ttlConfig, timeProvider, mapStateDesc.getSerializer()); } @SuppressWarnings("unchecked") private IS createReducingState() throws Exception { ReducingStateDescriptor<SV> reducingStateDesc = (ReducingStateDescriptor<SV>) stateDesc; ReducingStateDescriptor<TtlValue<SV>> ttlDescriptor = new ReducingStateDescriptor<>( stateDesc.getName(), new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider), new TtlSerializer<>(stateDesc.getSerializer())); return (IS) new TtlReducingState<>( originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()), ttlConfig, timeProvider, stateDesc.getSerializer()); } @SuppressWarnings("unchecked") private <IN, OUT> IS createAggregatingState() throws Exception { AggregatingStateDescriptor<IN, SV, OUT> aggregatingStateDescriptor = (AggregatingStateDescriptor<IN, SV, OUT>) stateDesc; TtlAggregateFunction<IN, SV, OUT> ttlAggregateFunction = new TtlAggregateFunction<>( aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider); AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = new AggregatingStateDescriptor<>( stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer())); return (IS) new TtlAggregatingState<>( originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()), ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction); } @SuppressWarnings({"deprecation", "unchecked"}) private <T> IS createFoldingState() throws Exception { FoldingStateDescriptor<T, SV> foldingStateDescriptor = (FoldingStateDescriptor<T, SV>) stateDesc; SV initAcc = stateDesc.getDefaultValue(); TtlValue<SV> ttlInitAcc = initAcc == null ? null : new TtlValue<>(initAcc, Long.MAX_VALUE); FoldingStateDescriptor<T, TtlValue<SV>> ttlDescriptor = new FoldingStateDescriptor<>( stateDesc.getName(), ttlInitAcc, new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider, initAcc), new TtlSerializer<>(stateDesc.getSerializer())); return (IS) new TtlFoldingState<>( originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()), ttlConfig, timeProvider, stateDesc.getSerializer()); } //...... }