本文主要研究一下flink的InternalTimeServiceManagerhtml
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.javajava
@Internal public class InternalTimeServiceManager<K> { @VisibleForTesting static final String TIMER_STATE_PREFIX = "_timer_state"; @VisibleForTesting static final String PROCESSING_TIMER_PREFIX = TIMER_STATE_PREFIX + "/processing_"; @VisibleForTesting static final String EVENT_TIMER_PREFIX = TIMER_STATE_PREFIX + "/event_"; private final KeyGroupRange localKeyGroupRange; private final KeyContext keyContext; private final PriorityQueueSetFactory priorityQueueSetFactory; private final ProcessingTimeService processingTimeService; private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices; private final boolean useLegacySynchronousSnapshots; InternalTimeServiceManager( KeyGroupRange localKeyGroupRange, KeyContext keyContext, PriorityQueueSetFactory priorityQueueSetFactory, ProcessingTimeService processingTimeService, boolean useLegacySynchronousSnapshots) { this.localKeyGroupRange = Preconditions.checkNotNull(localKeyGroupRange); this.priorityQueueSetFactory = Preconditions.checkNotNull(priorityQueueSetFactory); this.keyContext = Preconditions.checkNotNull(keyContext); this.processingTimeService = Preconditions.checkNotNull(processingTimeService); this.useLegacySynchronousSnapshots = useLegacySynchronousSnapshots; this.timerServices = new HashMap<>(); } @SuppressWarnings("unchecked") public <N> InternalTimerService<N> getInternalTimerService( String name, TimerSerializer<K, N> timerSerializer, Triggerable<K, N> triggerable) { InternalTimerServiceImpl<K, N> timerService = registerOrGetTimerService(name, timerSerializer); timerService.startTimerService( timerSerializer.getKeySerializer(), timerSerializer.getNamespaceSerializer(), triggerable); return timerService; } @SuppressWarnings("unchecked") <N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, TimerSerializer<K, N> timerSerializer) { InternalTimerServiceImpl<K, N> timerService = (InternalTimerServiceImpl<K, N>) timerServices.get(name); if (timerService == null) { timerService = new InternalTimerServiceImpl<>( localKeyGroupRange, keyContext, processingTimeService, createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer), createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer)); timerServices.put(name, timerService); } return timerService; } Map<String, InternalTimerServiceImpl<K, ?>> getRegisteredTimerServices() { return Collections.unmodifiableMap(timerServices); } private <N> KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> createTimerPriorityQueue( String name, TimerSerializer<K, N> timerSerializer) { return priorityQueueSetFactory.create( name, timerSerializer); } public void advanceWatermark(Watermark watermark) throws Exception { for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) { service.advanceWatermark(watermark.getTimestamp()); } } ////////////////// Fault Tolerance Methods /////////////////// public void snapshotStateForKeyGroup(DataOutputView stream, int keyGroupIdx) throws IOException { Preconditions.checkState(useLegacySynchronousSnapshots); InternalTimerServiceSerializationProxy<K> serializationProxy = new InternalTimerServiceSerializationProxy<>(this, keyGroupIdx); serializationProxy.write(stream); } public void restoreStateForKeyGroup( InputStream stream, int keyGroupIdx, ClassLoader userCodeClassLoader) throws IOException { InternalTimerServiceSerializationProxy<K> serializationProxy = new InternalTimerServiceSerializationProxy<>( this, userCodeClassLoader, keyGroupIdx); serializationProxy.read(stream); } //////////////////// Methods used ONLY IN TESTS //////////////////// @VisibleForTesting public int numProcessingTimeTimers() { int count = 0; for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) { count += timerService.numProcessingTimeTimers(); } return count; } @VisibleForTesting public int numEventTimeTimers() { int count = 0; for (InternalTimerServiceImpl<?, ?> timerService : timerServices.values()) { count += timerService.numEventTimeTimers(); } return count; } }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/PriorityQueueSetFactory.javaapache
public interface PriorityQueueSetFactory { @Nonnull <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> KeyGroupedInternalPriorityQueue<T> create( @Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer); }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/heap/HeapPriorityQueueElement.javaapi
@Internal public interface HeapPriorityQueueElement { /** * The index that indicates that a {@link HeapPriorityQueueElement} object is not contained in and managed by any * {@link HeapPriorityQueue}. We do not strictly enforce that internal indexes must be reset to this value when * elements are removed from a {@link HeapPriorityQueue}. */ int NOT_CONTAINED = Integer.MIN_VALUE; /** * Returns the current index of this object in the internal array of {@link HeapPriorityQueue}. */ int getInternalIndex(); /** * Sets the current index of this object in the {@link HeapPriorityQueue} and should only be called by the owning * {@link HeapPriorityQueue}. * * @param newIndex the new index in the timer heap. */ void setInternalIndex(int newIndex); }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/PriorityComparable.javaide
public interface PriorityComparable<T> { int comparePriorityTo(@Nonnull T other); }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/Keyed.javathis
public interface Keyed<K> { K getKey(); }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/InternalTimer.javaspa
@Internal public interface InternalTimer<K, N> extends PriorityComparable<InternalTimer<?, ?>>, Keyed<K> { /** Function to extract the key from a {@link InternalTimer}. */ KeyExtractorFunction<InternalTimer<?, ?>> KEY_EXTRACTOR_FUNCTION = InternalTimer::getKey; /** Function to compare instances of {@link InternalTimer}. */ PriorityComparator<InternalTimer<?, ?>> TIMER_COMPARATOR = (left, right) -> Long.compare(left.getTimestamp(), right.getTimestamp()); /** * Returns the timestamp of the timer. This value determines the point in time when the timer will fire. */ long getTimestamp(); /** * Returns the key that is bound to this timer. */ @Nonnull @Override K getKey(); /** * Returns the namespace that is bound to this timer. */ @Nonnull N getNamespace(); }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.javarest
@Internal public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>, HeapPriorityQueueElement { /** The key for which the timer is scoped. */ @Nonnull private final K key; /** The namespace for which the timer is scoped. */ @Nonnull private final N namespace; /** The expiration timestamp. */ private final long timestamp; private transient int timerHeapIndex; TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) { this.timestamp = timestamp; this.key = key; this.namespace = namespace; this.timerHeapIndex = NOT_CONTAINED; } @Override public long getTimestamp() { return timestamp; } @Nonnull @Override public K getKey() { return key; } @Nonnull @Override public N getNamespace() { return namespace; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o instanceof InternalTimer) { InternalTimer<?, ?> timer = (InternalTimer<?, ?>) o; return timestamp == timer.getTimestamp() && key.equals(timer.getKey()) && namespace.equals(timer.getNamespace()); } return false; } @Override public int getInternalIndex() { return timerHeapIndex; } @Override public void setInternalIndex(int newIndex) { this.timerHeapIndex = newIndex; } void removedFromTimerQueue() { setInternalIndex(NOT_CONTAINED); } @Override public int hashCode() { int result = (int) (timestamp ^ (timestamp >>> 32)); result = 31 * result + key.hashCode(); result = 31 * result + namespace.hashCode(); return result; } @Override public String toString() { return "Timer{" + "timestamp=" + timestamp + ", key=" + key + ", namespace=" + namespace + '}'; } @Override public int comparePriorityTo(@Nonnull InternalTimer<?, ?> other) { return Long.compare(timestamp, other.getTimestamp()); } }
flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/heap/HeapPriorityQueueSetFactory.javacode
public class HeapPriorityQueueSetFactory implements PriorityQueueSetFactory { @Nonnull private final KeyGroupRange keyGroupRange; @Nonnegative private final int totalKeyGroups; @Nonnegative private final int minimumCapacity; public HeapPriorityQueueSetFactory( @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int totalKeyGroups, @Nonnegative int minimumCapacity) { this.keyGroupRange = keyGroupRange; this.totalKeyGroups = totalKeyGroups; this.minimumCapacity = minimumCapacity; } @Nonnull @Override public <T extends HeapPriorityQueueElement & PriorityComparable & Keyed> HeapPriorityQueueSet<T> create( @Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) { return new HeapPriorityQueueSet<>( PriorityComparator.forPriorityComparableObjects(), KeyExtractorFunction.forKeyedObjects(), minimumCapacity, keyGroupRange, totalKeyGroups); } }
InternalTimer繼承了PriorityComparable、Keyed接口,TimerHeapInternalTimer實現了InternalTimer及HeapPriorityQueueElement接口
);HeapPriorityQueueSetFactory實現了PriorityQueueSetFactory接口,其create方法建立的是HeapPriorityQueueSet