本文主要研究一下flink的consecutive windowed operationshtml
DataStream<Integer> input = ...; DataStream<Integer> resultsPerKey = input .keyBy(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .reduce(new Summer()); DataStream<Integer> globalResults = resultsPerKey .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new TopKWindowFunction());
能夠解決相似top-k elements的問題
)flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.javajava
public class TimestampsAndPeriodicWatermarksOperator<T> extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>> implements OneInputStreamOperator<T, T>, ProcessingTimeCallback { private static final long serialVersionUID = 1L; private transient long watermarkInterval; private transient long currentWatermark; public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) { super(assigner); this.chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void open() throws Exception { super.open(); currentWatermark = Long.MIN_VALUE; watermarkInterval = getExecutionConfig().getAutoWatermarkInterval(); if (watermarkInterval > 0) { long now = getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer(now + watermarkInterval, this); } } @Override public void processElement(StreamRecord<T> element) throws Exception { final long newTimestamp = userFunction.extractTimestamp(element.getValue(), element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE); output.collect(element.replace(element.getValue(), newTimestamp)); } @Override public void onProcessingTime(long timestamp) throws Exception { // register next timer Watermark newWatermark = userFunction.getCurrentWatermark(); if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) { currentWatermark = newWatermark.getTimestamp(); // emit watermark output.emitWatermark(newWatermark); } long now = getProcessingTimeService().getCurrentProcessingTime(); getProcessingTimeService().registerTimer(now + watermarkInterval, this); } /** * Override the base implementation to completely ignore watermarks propagated from * upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit * watermarks from here). */ @Override public void processWatermark(Watermark mark) throws Exception { // if we receive a Long.MAX_VALUE watermark we forward it since it is used // to signal the end of input and to not block watermark progress downstream if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) { currentWatermark = Long.MAX_VALUE; output.emitWatermark(mark); } } @Override public void close() throws Exception { super.close(); // emit a final watermark Watermark newWatermark = userFunction.getCurrentWatermark(); if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) { currentWatermark = newWatermark.getTimestamp(); // emit watermark output.emitWatermark(newWatermark); } } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.javareact
public class SystemProcessingTimeService extends ProcessingTimeService { private static final Logger LOG = LoggerFactory.getLogger(SystemProcessingTimeService.class); private static final int STATUS_ALIVE = 0; private static final int STATUS_QUIESCED = 1; private static final int STATUS_SHUTDOWN = 2; // ------------------------------------------------------------------------ /** The containing task that owns this time service provider. */ private final AsyncExceptionHandler task; /** The lock that timers acquire upon triggering. */ private final Object checkpointLock; /** The executor service that schedules and calls the triggers of this task. */ private final ScheduledThreadPoolExecutor timerService; private final AtomicInteger status; public SystemProcessingTimeService(AsyncExceptionHandler failureHandler, Object checkpointLock) { this(failureHandler, checkpointLock, null); } public SystemProcessingTimeService( AsyncExceptionHandler task, Object checkpointLock, ThreadFactory threadFactory) { this.task = checkNotNull(task); this.checkpointLock = checkNotNull(checkpointLock); this.status = new AtomicInteger(STATUS_ALIVE); if (threadFactory == null) { this.timerService = new ScheduledThreadPoolExecutor(1); } else { this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory); } // tasks should be removed if the future is canceled this.timerService.setRemoveOnCancelPolicy(true); // make sure shutdown removes all pending tasks this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); } @Override public long getCurrentProcessingTime() { return System.currentTimeMillis(); } @Override public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) { // delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark // T says we won't see elements in the future with a timestamp smaller or equal to T. // With processing time, we therefore need to delay firing the timer by one ms. long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1; // we directly try to register the timer and only react to the status on exception // that way we save unnecessary volatile accesses for each timer try { return timerService.schedule( new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException e) { final int status = this.status.get(); if (status == STATUS_QUIESCED) { return new NeverCompleteFuture(delay); } else if (status == STATUS_SHUTDOWN) { throw new IllegalStateException("Timer service is shut down"); } else { // something else happened, so propagate the exception throw e; } } } //...... }
這裏爲TimestampsAndPeriodicWatermarksOperator
)的onProcessingTime方法flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.javaapache
@Internal public class WindowOperator<K, IN, ACC, OUT, W extends Window> extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>> implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> { //...... @Override public void processElement(StreamRecord<IN> element) throws Exception { final Collection<W> elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext); //if element is handled by none of assigned elementWindows boolean isSkippedElement = true; final K key = this.<K>getKeyedStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) { //...... } else { for (W window: elementWindows) { // drop if the window is already late if (isWindowLate(window)) { continue; } isSkippedElement = false; windowState.setCurrentNamespace(window); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = window; TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null) { continue; } emitWindowContents(window, contents); } if (triggerResult.isPurge()) { windowState.clear(); } registerCleanupTimer(window); } } // side output input event if // element not handled by any window // late arriving tag has been set // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp if (isSkippedElement && isElementLate(element)) { if (lateDataOutputTag != null){ sideOutput(element); } else { this.numLateRecordsDropped.inc(); } } } /** * Emits the contents of the given window using the {@link InternalWindowFunction}. */ @SuppressWarnings("unchecked") private void emitWindowContents(W window, ACC contents) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); processContext.window = window; userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector); } //...... }
裏頭使用的是trigger.onElement方法,這裏的trigger爲EventTimeTrigger
)獲取TriggerResult,若是須要fire,則會觸發emitWindowContents,若是須要purge則會清空windowState;emitWindowContents則是調用userFunction.process執行用戶定義的窗口操做flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.javawindows
@PublicEvolving public class EventTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private EventTimeTrigger() {} @Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { // if the watermark is already past the window fire immediately return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteEventTimeTimer(window.maxTimestamp()); } @Override public boolean canMerge() { return true; } @Override public void onMerge(TimeWindow window, OnMergeContext ctx) { // only register a timer if the watermark is not yet past the end of the merged window // this is in line with the logic in onElement(). If the watermark is past the end of // the window onElement() will fire and setting a timer here would fire the window twice. long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentWatermark()) { ctx.registerEventTimeTimer(windowMaxTimestamp); } } @Override public String toString() { return "EventTimeTrigger()"; } public static EventTimeTrigger create() { return new EventTimeTrigger(); } }
能夠解決相似top-k elements的問題
)窗口能夠根據本身設定的時間範圍,藉助trigger判斷是否能夠關閉窗口而後開始對該窗口數據執行相關操做
);對於consecutive windowed operations來講,上游的watermark會forward給下游的operations返回TriggerResult.FIRE的狀況下
),對於EventTimeTrigger來講,其onElement方法的判斷邏輯跟watermark相關,若是window.maxTimestamp() <= ctx.getCurrentWatermark()則會返回TriggerResult.FIRE