本文主要研究一下flink的Triggershtml
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/Trigger.javajava
@PublicEvolving public abstract class Trigger<T, W extends Window> implements Serializable { private static final long serialVersionUID = -4104633972991191369L; public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception; public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception; public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception; public boolean canMerge() { return false; } public void onMerge(W window, OnMergeContext ctx) throws Exception { throw new UnsupportedOperationException("This trigger does not support merging."); } public abstract void clear(W window, TriggerContext ctx) throws Exception; // ------------------------------------------------------------------------ public interface TriggerContext { long getCurrentProcessingTime(); MetricGroup getMetricGroup(); long getCurrentWatermark(); void registerProcessingTimeTimer(long time); void registerEventTimeTimer(long time); void deleteProcessingTimeTimer(long time); void deleteEventTimeTimer(long time); <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor); @Deprecated <S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState); @Deprecated <S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState); } public interface OnMergeContext extends TriggerContext { <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor); } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.javaapache
public enum TriggerResult { CONTINUE(false, false), FIRE_AND_PURGE(true, true), FIRE(true, false), PURGE(false, true); // ------------------------------------------------------------------------ private final boolean fire; private final boolean purge; TriggerResult(boolean fire, boolean purge) { this.purge = purge; this.fire = fire; } public boolean isFire() { return fire; } public boolean isPurge() { return purge; } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.javawindows
@PublicEvolving public class EventTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private EventTimeTrigger() {} @Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { // if the watermark is already past the window fire immediately return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteEventTimeTimer(window.maxTimestamp()); } @Override public boolean canMerge() { return true; } @Override public void onMerge(TimeWindow window, OnMergeContext ctx) { // only register a timer if the watermark is not yet past the end of the merged window // this is in line with the logic in onElement(). If the watermark is past the end of // the window onElement() will fire and setting a timer here would fire the window twice. long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentWatermark()) { ctx.registerEventTimeTimer(windowMaxTimestamp); } } @Override public String toString() { return "EventTimeTrigger()"; } public static EventTimeTrigger create() { return new EventTimeTrigger(); } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.javaapi
@PublicEvolving public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> { private static final long serialVersionUID = 1L; private ProcessingTimeTrigger() {} @Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) { ctx.registerProcessingTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteProcessingTimeTimer(window.maxTimestamp()); } @Override public boolean canMerge() { return true; } @Override public void onMerge(TimeWindow window, OnMergeContext ctx) { // only register a timer if the time is not yet past the end of the merged window // this is in line with the logic in onElement(). If the time is past the end of // the window onElement() will fire and setting a timer here would fire the window twice. long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) { ctx.registerProcessingTimeTimer(windowMaxTimestamp); } } @Override public String toString() { return "ProcessingTimeTrigger()"; } public static ProcessingTimeTrigger create() { return new ProcessingTimeTrigger(); } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.javaide
@Internal public static class NeverTrigger extends Trigger<Object, GlobalWindow> { private static final long serialVersionUID = 1L; @Override public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {} @Override public void onMerge(GlobalWindow window, OnMergeContext ctx) { } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java函數
@PublicEvolving public class CountTrigger<W extends Window> extends Trigger<Object, W> { private static final long serialVersionUID = 1L; private final long maxCount; private final ReducingStateDescriptor<Long> stateDesc = new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE); private CountTrigger(long maxCount) { this.maxCount = maxCount; } @Override public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { ReducingState<Long> count = ctx.getPartitionedState(stateDesc); count.add(1L); if (count.get() >= maxCount) { count.clear(); return TriggerResult.FIRE; } return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, W window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(W window, TriggerContext ctx) throws Exception { ctx.getPartitionedState(stateDesc).clear(); } @Override public boolean canMerge() { return true; } @Override public void onMerge(W window, OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(stateDesc); } @Override public String toString() { return "CountTrigger(" + maxCount + ")"; } public static <W extends Window> CountTrigger<W> of(long maxCount) { return new CountTrigger<>(maxCount); } private static class Sum implements ReduceFunction<Long> { private static final long serialVersionUID = 1L; @Override public Long reduce(Long value1, Long value2) throws Exception { return value1 + value2; } } }
它使用的是本身定義的Sum函數
),在onElement方法裏頭,當計數大於等於maxCount時,則會清空計數,而後返回TriggerResult.FIRE,不然返回TriggerResult.CONTINUE;onEventTime、onProcessingTime均返回TriggerResult.CONTINUE;canMerge返回true;onMerge執行的是ctx.mergePartitionedState(stateDesc);clear執行的是ctx.getPartitionedState(stateDesc).clear()flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.javathis
@PublicEvolving public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> { private static final long serialVersionUID = 1L; private Trigger<T, W> nestedTrigger; private PurgingTrigger(Trigger<T, W> nestedTrigger) { this.nestedTrigger = nestedTrigger; } @Override public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception { TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx); return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult; } @Override public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception { TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx); return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult; } @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, window, ctx); return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult; } @Override public void clear(W window, TriggerContext ctx) throws Exception { nestedTrigger.clear(window, ctx); } @Override public boolean canMerge() { return nestedTrigger.canMerge(); } @Override public void onMerge(W window, OnMergeContext ctx) throws Exception { nestedTrigger.onMerge(window, ctx); } @Override public String toString() { return "PurgingTrigger(" + nestedTrigger.toString() + ")"; } public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) { return new PurgingTrigger<>(nestedTrigger); } @VisibleForTesting public Trigger<T, W> getNestedTrigger() { return nestedTrigger; } }
fire表示是否要觸發window的computation操做;而purge表示是否要清理window的窗口數據
),CONTINUE、FIRE_AND_PURGE、FIRE、PURGE五個枚舉