聊聊flink的StateTtlConfig

本文主要研究一下flink的StateTtlConfightml

實例

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
    
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
  • 這裏利用builder建立StateTtlConfig,以後經過StateDescriptor的enableTimeToLive方法傳遞該config

StateTtlConfig

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/StateTtlConfig.javajava

/**
 * Configuration of state TTL logic.
 *
 * <p>Note: The map state with TTL currently supports {@code null} user values
 * only if the user value serializer can handle {@code null} values.
 * If the serializer does not support {@code null} values,
 * it can be wrapped with {@link org.apache.flink.api.java.typeutils.runtime.NullableSerializer}
 * at the cost of an extra byte in the serialized form.
 */
public class StateTtlConfig implements Serializable {

    private static final long serialVersionUID = -7592693245044289793L;

    public static final StateTtlConfig DISABLED =
        newBuilder(Time.milliseconds(Long.MAX_VALUE)).setUpdateType(UpdateType.Disabled).build();

    /**
     * This option value configures when to update last access timestamp which prolongs state TTL.
     */
    public enum UpdateType {
        /** TTL is disabled. State does not expire. */
        Disabled,
        /** Last access timestamp is initialised when state is created and updated on every write operation. */
        OnCreateAndWrite,
        /** The same as <code>OnCreateAndWrite</code> but also updated on read. */
        OnReadAndWrite
    }

    /**
     * This option configures whether expired user value can be returned or not.
     */
    public enum StateVisibility {
        /** Return expired user value if it is not cleaned up yet. */
        ReturnExpiredIfNotCleanedUp,
        /** Never return expired user value. */
        NeverReturnExpired
    }

    /**
     * This option configures time scale to use for ttl.
     */
    public enum TimeCharacteristic {
        /** Processing time, see also <code>TimeCharacteristic.ProcessingTime</code>. */
        ProcessingTime
    }

    private final UpdateType updateType;
    private final StateVisibility stateVisibility;
    private final TimeCharacteristic timeCharacteristic;
    private final Time ttl;
    private final CleanupStrategies cleanupStrategies;

    private StateTtlConfig(
        UpdateType updateType,
        StateVisibility stateVisibility,
        TimeCharacteristic timeCharacteristic,
        Time ttl,
        CleanupStrategies cleanupStrategies) {
        this.updateType = Preconditions.checkNotNull(updateType);
        this.stateVisibility = Preconditions.checkNotNull(stateVisibility);
        this.timeCharacteristic = Preconditions.checkNotNull(timeCharacteristic);
        this.ttl = Preconditions.checkNotNull(ttl);
        this.cleanupStrategies = cleanupStrategies;
        Preconditions.checkArgument(ttl.toMilliseconds() > 0,
            "TTL is expected to be positive");
    }

    @Nonnull
    public UpdateType getUpdateType() {
        return updateType;
    }

    @Nonnull
    public StateVisibility getStateVisibility() {
        return stateVisibility;
    }

    @Nonnull
    public Time getTtl() {
        return ttl;
    }

    @Nonnull
    public TimeCharacteristic getTimeCharacteristic() {
        return timeCharacteristic;
    }

    public boolean isEnabled() {
        return updateType != UpdateType.Disabled;
    }

    @Nonnull
    public CleanupStrategies getCleanupStrategies() {
        return cleanupStrategies;
    }

    @Override
    public String toString() {
        return "StateTtlConfig{" +
            "updateType=" + updateType +
            ", stateVisibility=" + stateVisibility +
            ", timeCharacteristic=" + timeCharacteristic +
            ", ttl=" + ttl +
            '}';
    }

    @Nonnull
    public static Builder newBuilder(@Nonnull Time ttl) {
        return new Builder(ttl);
    }

    /**
     * Builder for the {@link StateTtlConfig}.
     */
    public static class Builder {

        private UpdateType updateType = OnCreateAndWrite;
        private StateVisibility stateVisibility = NeverReturnExpired;
        private TimeCharacteristic timeCharacteristic = ProcessingTime;
        private Time ttl;
        private CleanupStrategies cleanupStrategies = new CleanupStrategies();

        public Builder(@Nonnull Time ttl) {
            this.ttl = ttl;
        }

        /**
         * Sets the ttl update type.
         *
         * @param updateType The ttl update type configures when to update last access timestamp which prolongs state TTL.
         */
        @Nonnull
        public Builder setUpdateType(UpdateType updateType) {
            this.updateType = updateType;
            return this;
        }

        @Nonnull
        public Builder updateTtlOnCreateAndWrite() {
            return setUpdateType(UpdateType.OnCreateAndWrite);
        }

        @Nonnull
        public Builder updateTtlOnReadAndWrite() {
            return setUpdateType(UpdateType.OnReadAndWrite);
        }

        /**
         * Sets the state visibility.
         *
         * @param stateVisibility The state visibility configures whether expired user value can be returned or not.
         */
        @Nonnull
        public Builder setStateVisibility(@Nonnull StateVisibility stateVisibility) {
            this.stateVisibility = stateVisibility;
            return this;
        }

        @Nonnull
        public Builder returnExpiredIfNotCleanedUp() {
            return setStateVisibility(StateVisibility.ReturnExpiredIfNotCleanedUp);
        }

        @Nonnull
        public Builder neverReturnExpired() {
            return setStateVisibility(StateVisibility.NeverReturnExpired);
        }

        /**
         * Sets the time characteristic.
         *
         * @param timeCharacteristic The time characteristic configures time scale to use for ttl.
         */
        @Nonnull
        public Builder setTimeCharacteristic(@Nonnull TimeCharacteristic timeCharacteristic) {
            this.timeCharacteristic = timeCharacteristic;
            return this;
        }

        @Nonnull
        public Builder useProcessingTime() {
            return setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        }

        /** Cleanup expired state in full snapshot on checkpoint. */
        @Nonnull
        public Builder cleanupFullSnapshot() {
            cleanupStrategies.strategies.put(
                CleanupStrategies.Strategies.FULL_STATE_SCAN_SNAPSHOT,
                new CleanupStrategies.CleanupStrategy() {  });
            return this;
        }

        /**
         * Sets the ttl time.
         * @param ttl The ttl time.
         */
        @Nonnull
        public Builder setTtl(@Nonnull Time ttl) {
            this.ttl = ttl;
            return this;
        }

        @Nonnull
        public StateTtlConfig build() {
            return new StateTtlConfig(
                updateType,
                stateVisibility,
                timeCharacteristic,
                ttl,
                cleanupStrategies);
        }
    }

    /**
     * TTL cleanup strategies.
     *
     * <p>This class configures when to cleanup expired state with TTL.
     * By default, state is always cleaned up on explicit read access if found expired.
     * Currently cleanup of state full snapshot can be additionally activated.
     */
    public static class CleanupStrategies implements Serializable {
        private static final long serialVersionUID = -1617740467277313524L;

        /** Fixed strategies ordinals in {@code strategies} config field. */
        enum Strategies {
            FULL_STATE_SCAN_SNAPSHOT
        }

        /** Base interface for cleanup strategies configurations. */
        interface CleanupStrategy extends Serializable {

        }

        final EnumMap<Strategies, CleanupStrategy> strategies = new EnumMap<>(Strategies.class);

        public boolean inFullSnapshot() {
            return strategies.containsKey(Strategies.FULL_STATE_SCAN_SNAPSHOT);
        }
    }
}
  • StateTtlConfig用於設置state的TTL屬性,這裏定義了三個枚舉,分別是UpdateType(Disabled、OnCreateAndWrite、OnReadAndWrite)、StateVisibility(ReturnExpiredIfNotCleanedUp、NeverReturnExpired)、TimeCharacteristic(ProcessingTime)
  • StateTtlConfig定義了CleanupStrategies,即TTL state的清理策略,默認在讀取到expired的state時會進行清理,目前還額外提供在FULL_STATE_SCAN_SNAPSHOT的時候進行清理(在checkpoint時清理full snapshot中的expired state)的選項
  • StateTtlConfig還提供了一個Builder,用於快速設置UpdateType、StateVisibility、TimeCharacteristic、Time、CleanupStrategies

AbstractKeyedStateBackend

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/AbstractKeyedStateBackend.javaapache

/**
     * @see KeyedStateBackend
     */
    @Override
    @SuppressWarnings("unchecked")
    public <N, S extends State, V> S getOrCreateKeyedState(
            final TypeSerializer<N> namespaceSerializer,
            StateDescriptor<S, V> stateDescriptor) throws Exception {
        checkNotNull(namespaceSerializer, "Namespace serializer");
        checkNotNull(keySerializer, "State key serializer has not been configured in the config. " +
                "This operation cannot use partitioned state.");

        InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
        if (kvState == null) {
            if (!stateDescriptor.isSerializerInitialized()) {
                stateDescriptor.initializeSerializerUnlessSet(executionConfig);
            }
            kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
                namespaceSerializer, stateDescriptor, this, ttlTimeProvider);
            keyValueStatesByName.put(stateDescriptor.getName(), kvState);
            publishQueryableStateIfEnabled(stateDescriptor, kvState);
        }
        return (S) kvState;
    }
  • AbstractKeyedStateBackend的getOrCreateKeyedState方法裏頭使用TtlStateFactory.createStateAndWrapWithTtlIfEnabled來建立InternalKvState

TtlStateFactory

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/TtlStateFactory.javaapi

/**
 * This state factory wraps state objects, produced by backends, with TTL logic.
 */
public class TtlStateFactory<N, SV, S extends State, IS extends S> {
    public static <N, SV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
        TypeSerializer<N> namespaceSerializer,
        StateDescriptor<S, SV> stateDesc,
        KeyedStateFactory originalStateFactory,
        TtlTimeProvider timeProvider) throws Exception {
        Preconditions.checkNotNull(namespaceSerializer);
        Preconditions.checkNotNull(stateDesc);
        Preconditions.checkNotNull(originalStateFactory);
        Preconditions.checkNotNull(timeProvider);
        return  stateDesc.getTtlConfig().isEnabled() ?
            new TtlStateFactory<N, SV, S, IS>(
                namespaceSerializer, stateDesc, originalStateFactory, timeProvider)
                .createState() :
            originalStateFactory.createInternalState(namespaceSerializer, stateDesc);
    }

    private final Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> stateFactories;

    private final TypeSerializer<N> namespaceSerializer;
    private final StateDescriptor<S, SV> stateDesc;
    private final KeyedStateFactory originalStateFactory;
    private final StateTtlConfig ttlConfig;
    private final TtlTimeProvider timeProvider;
    private final long ttl;

    private TtlStateFactory(
        TypeSerializer<N> namespaceSerializer,
        StateDescriptor<S, SV> stateDesc,
        KeyedStateFactory originalStateFactory,
        TtlTimeProvider timeProvider) {
        this.namespaceSerializer = namespaceSerializer;
        this.stateDesc = stateDesc;
        this.originalStateFactory = originalStateFactory;
        this.ttlConfig = stateDesc.getTtlConfig();
        this.timeProvider = timeProvider;
        this.ttl = ttlConfig.getTtl().toMilliseconds();
        this.stateFactories = createStateFactories();
    }

    private Map<Class<? extends StateDescriptor>, SupplierWithException<IS, Exception>> createStateFactories() {
        return Stream.of(
            Tuple2.of(ValueStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createValueState),
            Tuple2.of(ListStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createListState),
            Tuple2.of(MapStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createMapState),
            Tuple2.of(ReducingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createReducingState),
            Tuple2.of(AggregatingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createAggregatingState),
            Tuple2.of(FoldingStateDescriptor.class, (SupplierWithException<IS, Exception>) this::createFoldingState)
        ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
    }

    private IS createState() throws Exception {
        SupplierWithException<IS, Exception> stateFactory = stateFactories.get(stateDesc.getClass());
        if (stateFactory == null) {
            String message = String.format("State %s is not supported by %s",
                stateDesc.getClass(), TtlStateFactory.class);
            throw new FlinkRuntimeException(message);
        }
        return stateFactory.get();
    }

    @SuppressWarnings("unchecked")
    private IS createValueState() throws Exception {
        ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
            stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer()));
        return (IS) new TtlValueState<>(
            originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, stateDesc.getSerializer());
    }

    @SuppressWarnings("unchecked")
    private <T> IS createListState() throws Exception {
        ListStateDescriptor<T> listStateDesc = (ListStateDescriptor<T>) stateDesc;
        ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>(
            stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer()));
        return (IS) new TtlListState<>(
            originalStateFactory.createInternalState(
                namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, listStateDesc.getSerializer());
    }

    @SuppressWarnings("unchecked")
    private <UK, UV> IS createMapState() throws Exception {
        MapStateDescriptor<UK, UV> mapStateDesc = (MapStateDescriptor<UK, UV>) stateDesc;
        MapStateDescriptor<UK, TtlValue<UV>> ttlDescriptor = new MapStateDescriptor<>(
            stateDesc.getName(),
            mapStateDesc.getKeySerializer(),
            new TtlSerializer<>(mapStateDesc.getValueSerializer()));
        return (IS) new TtlMapState<>(
            originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, mapStateDesc.getSerializer());
    }

    @SuppressWarnings("unchecked")
    private IS createReducingState() throws Exception {
        ReducingStateDescriptor<SV> reducingStateDesc = (ReducingStateDescriptor<SV>) stateDesc;
        ReducingStateDescriptor<TtlValue<SV>> ttlDescriptor = new ReducingStateDescriptor<>(
            stateDesc.getName(),
            new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider),
            new TtlSerializer<>(stateDesc.getSerializer()));
        return (IS) new TtlReducingState<>(
            originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, stateDesc.getSerializer());
    }

    @SuppressWarnings("unchecked")
    private <IN, OUT> IS createAggregatingState() throws Exception {
        AggregatingStateDescriptor<IN, SV, OUT> aggregatingStateDescriptor =
            (AggregatingStateDescriptor<IN, SV, OUT>) stateDesc;
        TtlAggregateFunction<IN, SV, OUT> ttlAggregateFunction = new TtlAggregateFunction<>(
            aggregatingStateDescriptor.getAggregateFunction(), ttlConfig, timeProvider);
        AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = new AggregatingStateDescriptor<>(
            stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer()));
        return (IS) new TtlAggregatingState<>(
            originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction);
    }

    @SuppressWarnings({"deprecation", "unchecked"})
    private <T> IS createFoldingState() throws Exception {
        FoldingStateDescriptor<T, SV> foldingStateDescriptor = (FoldingStateDescriptor<T, SV>) stateDesc;
        SV initAcc = stateDesc.getDefaultValue();
        TtlValue<SV> ttlInitAcc = initAcc == null ? null : new TtlValue<>(initAcc, Long.MAX_VALUE);
        FoldingStateDescriptor<T, TtlValue<SV>> ttlDescriptor = new FoldingStateDescriptor<>(
            stateDesc.getName(),
            ttlInitAcc,
            new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider, initAcc),
            new TtlSerializer<>(stateDesc.getSerializer()));
        return (IS) new TtlFoldingState<>(
            originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory()),
            ttlConfig, timeProvider, stateDesc.getSerializer());
    }

    //......
}
  • TtlStateFactory的createStateAndWrapWithTtlIfEnabled方法這裏會根據stateDesc.getTtlConfig().isEnabled()來建立state,若是開啓ttl則調用new TtlStateFactory<N, SV, S, IS>(namespaceSerializer, stateDesc, originalStateFactory, timeProvider).createState(),不然調用originalStateFactory.createInternalState(namespaceSerializer, stateDesc)
  • 這裏createStateFactories建立了不一樣類型的StateDescriptor對應建立方法的map,在createState的時候,根據指定類型自動調用對應的SupplierWithException,省去if else的判斷
  • ValueStateDescriptor對應createValueState方法,建立的是TtlValueState;ListStateDescriptor對應createListState方法,建立的是TtlListState;MapStateDescriptor對應createMapState方法,建立的是TtlMapState;ReducingStateDescriptor對應createReducingState方法,建立的是TtlReducingState;AggregatingStateDescriptor對應createAggregatingState方法,建立的是TtlAggregatingState;FoldingStateDescriptor對應createFoldingState方法,建立的是TtlFoldingState

小結

  • StateTtlConfig用於設置state的TTL屬性,這裏主要設置UpdateType、StateVisibility、TimeCharacteristic、Time、CleanupStrategies這幾個屬性
  • AbstractKeyedStateBackend的getOrCreateKeyedState方法裏頭使用TtlStateFactory.createStateAndWrapWithTtlIfEnabled來建立InternalKvState
  • TtlStateFactory的createStateAndWrapWithTtlIfEnabled方法這裏會根據stateDesc.getTtlConfig().isEnabled()來建立對應的state;TtlStateFactory的createState會根據不一樣類型的StateDescriptor建立對應類型的ttl state

doc

相關文章
相關標籤/搜索