本文主要研究一下flink的AbstractTtlStatehtml
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/internal/InternalKvState.javajava
/** * The {@code InternalKvState} is the root of the internal state type hierarchy, similar to the * {@link State} being the root of the public API state hierarchy. * * <p>The internal state classes give access to the namespace getters and setters and access to * additional functionality, like raw value access or state merging. * * <p>The public API state hierarchy is intended to be programmed against by Flink applications. * The internal state hierarchy holds all the auxiliary methods that are used by the runtime and not * intended to be used by user applications. These internal methods are considered of limited use to users and * only confusing, and are usually not regarded as stable across releases. * * <p>Each specific type in the internal state hierarchy extends the type from the public * state hierarchy: * * <pre> * State * | * +-------------------InternalKvState * | | * MergingState | * | | * +-----------------InternalMergingState * | | * +--------+------+ | * | | | * ReducingState ListState +-----+-----------------+ * | | | | * +-----------+ +----------- -----------------InternalListState * | | * +---------InternalReducingState * </pre> * * @param <K> The type of key the state is associated to * @param <N> The type of the namespace * @param <V> The type of values kept internally in state */ public interface InternalKvState<K, N, V> extends State { TypeSerializer<K> getKeySerializer(); TypeSerializer<N> getNamespaceSerializer(); TypeSerializer<V> getValueSerializer(); void setCurrentNamespace(N namespace); byte[] getSerializedValue( final byte[] serializedKeyAndNamespace, final TypeSerializer<K> safeKeySerializer, final TypeSerializer<N> safeNamespaceSerializer, final TypeSerializer<V> safeValueSerializer) throws Exception; }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/AbstractTtlState.javaapache
/** * Base class for TTL logic wrappers of state objects. * * @param <K> The type of key the state is associated to * @param <N> The type of the namespace * @param <SV> The type of values kept internally in state without TTL * @param <TTLSV> The type of values kept internally in state with TTL * @param <S> Type of originally wrapped state object */ abstract class AbstractTtlState<K, N, SV, TTLSV, S extends InternalKvState<K, N, TTLSV>> extends AbstractTtlDecorator<S> implements InternalKvState<K, N, SV> { private final TypeSerializer<SV> valueSerializer; AbstractTtlState(S original, StateTtlConfig config, TtlTimeProvider timeProvider, TypeSerializer<SV> valueSerializer) { super(original, config, timeProvider); this.valueSerializer = valueSerializer; } <SE extends Throwable, CE extends Throwable, T> T getWithTtlCheckAndUpdate( SupplierWithException<TtlValue<T>, SE> getter, ThrowingConsumer<TtlValue<T>, CE> updater) throws SE, CE { return getWithTtlCheckAndUpdate(getter, updater, original::clear); } @Override public TypeSerializer<K> getKeySerializer() { return original.getKeySerializer(); } @Override public TypeSerializer<N> getNamespaceSerializer() { return original.getNamespaceSerializer(); } @Override public TypeSerializer<SV> getValueSerializer() { return valueSerializer; } @Override public void setCurrentNamespace(N namespace) { original.setCurrentNamespace(namespace); } @Override public byte[] getSerializedValue( byte[] serializedKeyAndNamespace, TypeSerializer<K> safeKeySerializer, TypeSerializer<N> safeNamespaceSerializer, TypeSerializer<SV> safeValueSerializer) { throw new FlinkRuntimeException("Queryable state is not currently supported with TTL."); } @Override public void clear() { original.clear(); } }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.javaapp
/** * Base class for TTL logic wrappers. * * @param <T> Type of originally wrapped object */ abstract class AbstractTtlDecorator<T> { /** Wrapped original state handler. */ final T original; final StateTtlConfig config; final TtlTimeProvider timeProvider; /** Whether to renew expiration timestamp on state read access. */ final boolean updateTsOnRead; /** Whether to renew expiration timestamp on state read access. */ final boolean returnExpired; /** State value time to live in milliseconds. */ final long ttl; AbstractTtlDecorator( T original, StateTtlConfig config, TtlTimeProvider timeProvider) { Preconditions.checkNotNull(original); Preconditions.checkNotNull(config); Preconditions.checkNotNull(timeProvider); this.original = original; this.config = config; this.timeProvider = timeProvider; this.updateTsOnRead = config.getUpdateType() == StateTtlConfig.UpdateType.OnReadAndWrite; this.returnExpired = config.getStateVisibility() == StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp; this.ttl = config.getTtl().toMilliseconds(); } <V> V getUnexpired(TtlValue<V> ttlValue) { return ttlValue == null || (expired(ttlValue) && !returnExpired) ? null : ttlValue.getUserValue(); } <V> boolean expired(TtlValue<V> ttlValue) { return TtlUtils.expired(ttlValue, ttl, timeProvider); } <V> TtlValue<V> wrapWithTs(V value) { return TtlUtils.wrapWithTs(value, timeProvider.currentTimestamp()); } <V> TtlValue<V> rewrapWithNewTs(TtlValue<V> ttlValue) { return wrapWithTs(ttlValue.getUserValue()); } <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> V getWithTtlCheckAndUpdate( SupplierWithException<TtlValue<V>, SE> getter, ThrowingConsumer<TtlValue<V>, CE> updater, ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE { TtlValue<V> ttlValue = getWrappedWithTtlCheckAndUpdate(getter, updater, stateClear); return ttlValue == null ? null : ttlValue.getUserValue(); } <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate( SupplierWithException<TtlValue<V>, SE> getter, ThrowingConsumer<TtlValue<V>, CE> updater, ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE { TtlValue<V> ttlValue = getter.get(); if (ttlValue == null) { return null; } else if (expired(ttlValue)) { stateClear.run(); if (!returnExpired) { return null; } } else if (updateTsOnRead) { updater.accept(rewrapWithNewTs(ttlValue)); } return ttlValue; } }
TtlUtils.expired(ttlValue, ttl, timeProvider)
),若是過時了則調用stateClear(ThrowingRunnable類型,這裏是original::clear
),對於非returnExpired的則直接返回null;對於沒有expired的,則判斷是否updateTsOnRead,如果則調用updater進行處理,最後返回ttlValueflink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/ttl/TtlUtils.javaide
/** Common functions related to State TTL. */ class TtlUtils { static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, TtlTimeProvider timeProvider) { return expired(ttlValue, ttl, timeProvider.currentTimestamp()); } static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, long currentTimestamp) { return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, currentTimestamp); } private static boolean expired(long ts, long ttl, long currentTimestamp) { return getExpirationTimestamp(ts, ttl) <= currentTimestamp; } private static long getExpirationTimestamp(long ts, long ttl) { long ttlWithoutOverflow = ts > 0 ? Math.min(Long.MAX_VALUE - ts, ttl) : ttl; return ts + ttlWithoutOverflow; } //...... }
flink-core-1.7.0-sources.jar!/org/apache/flink/util/function/ThrowingRunnable.javathis
/** * Similar to a {@link Runnable}, this interface is used to capture a block of code * to be executed. In contrast to {@code Runnable}, this interface allows throwing * checked exceptions. */ @PublicEvolving @FunctionalInterface public interface ThrowingRunnable<E extends Throwable> { /** * The work method. * * @throws E Exceptions may be thrown. */ void run() throws E; /** * Converts a {@link ThrowingRunnable} into a {@link Runnable} which throws all checked exceptions * as unchecked. * * @param throwingRunnable to convert into a {@link Runnable} * @return {@link Runnable} which throws all checked exceptions as unchecked. */ static Runnable unchecked(ThrowingRunnable<?> throwingRunnable) { return () -> { try { throwingRunnable.run(); } catch (Throwable t) { ExceptionUtils.rethrow(t); } }; } }
TtlUtils.expired(ttlValue, ttl, timeProvider)
),若是過時了則調用stateClear(ThrowingRunnable類型,這裏是original::clear
),對於非returnExpired的則直接返回null;對於沒有expired的,則判斷是否updateTsOnRead,如果則調用updater進行處理,最後返回ttlValue