聊聊flink的Triggers

本文主要研究一下flink的Triggershtml

Trigger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/Trigger.javajava

@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {

	private static final long serialVersionUID = -4104633972991191369L;

	public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;

	public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;

	public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;

	public boolean canMerge() {
		return false;
	}

	public void onMerge(W window, OnMergeContext ctx) throws Exception {
		throw new UnsupportedOperationException("This trigger does not support merging.");
	}

	public abstract void clear(W window, TriggerContext ctx) throws Exception;

	// ------------------------------------------------------------------------

	public interface TriggerContext {

		long getCurrentProcessingTime();

		MetricGroup getMetricGroup();

		long getCurrentWatermark();

		void registerProcessingTimeTimer(long time);

		void registerEventTimeTimer(long time);

		void deleteProcessingTimeTimer(long time);

		void deleteEventTimeTimer(long time);

		<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);

		@Deprecated
		<S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);

		@Deprecated
		<S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
	}

	public interface OnMergeContext extends TriggerContext {
		<S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);
	}
}
  • Trigger接收兩個泛型,一個是element類型,一個是窗口類型;它定義了onElement、onProcessingTime、onEventTime、canMerge、onMerge、clear幾個方法,其中onElement、onProcessingTime、onEventTime均須要返回TriggerResult
  • onElement在每一個element添加到window的時候會被回調;onProcessingTime在註冊的event-time timer觸發時會被回調;onEventTime在註冊的processing-time timer觸發時會被回調
  • canMerge用於標識是否支持trigger state的合併,默認返回false;onMerge在多個window合併的時候會被觸發;clear用於清除TriggerContext中存儲的相關state
  • Trigger還定義了TriggerContext及OnMergeContext;TriggerContext定義了註冊及刪除EventTimeTimer、ProcessingTimeTimer方法,同時還定義了getCurrentProcessingTime、getMetricGroup、getCurrentWatermark、getPartitionedState、getKeyValueState、getKeyValueState方法
  • OnMergeContext繼承了TriggerContext,它多定義了mergePartitionedState方法

TriggerResult

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.javaapache

public enum TriggerResult {

	CONTINUE(false, false),

	FIRE_AND_PURGE(true, true),

	FIRE(true, false),

	PURGE(false, true);

	// ------------------------------------------------------------------------

	private final boolean fire;
	private final boolean purge;

	TriggerResult(boolean fire, boolean purge) {
		this.purge = purge;
		this.fire = fire;
	}

	public boolean isFire() {
		return fire;
	}

	public boolean isPurge() {
		return purge;
	}
}
  • TriggerResult用於表示trigger在onElement、onProcessingTime、onEventTime被回調時返回的action枚舉,它有fire、purge兩個屬性,CONTINUE、FIRE_AND_PURGE、FIRE、PURGE五個枚舉
  • fire表示是否要觸發window的computation操做;而purge表示是否要清理window的窗口數據
  • CONTINUE表示不對window作任何操做;FIRE_AND_PURGE表示要觸發window的computation操做而後清理window的窗口數據;FIRE表示僅僅觸發window的computation操做但不清理window的窗口數據;PURGE表示不觸發window的computation操做可是要清理window的窗口數據

EventTimeTrigger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.javawindows

@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
	private static final long serialVersionUID = 1L;

	private EventTimeTrigger() {}

	@Override
	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
		if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
			// if the watermark is already past the window fire immediately
			return TriggerResult.FIRE;
		} else {
			ctx.registerEventTimeTimer(window.maxTimestamp());
			return TriggerResult.CONTINUE;
		}
	}

	@Override
	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
		return time == window.maxTimestamp() ?
			TriggerResult.FIRE :
			TriggerResult.CONTINUE;
	}

	@Override
	public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
		return TriggerResult.CONTINUE;
	}

	@Override
	public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
		ctx.deleteEventTimeTimer(window.maxTimestamp());
	}

	@Override
	public boolean canMerge() {
		return true;
	}

	@Override
	public void onMerge(TimeWindow window,
			OnMergeContext ctx) {
		// only register a timer if the watermark is not yet past the end of the merged window
		// this is in line with the logic in onElement(). If the watermark is past the end of
		// the window onElement() will fire and setting a timer here would fire the window twice.
		long windowMaxTimestamp = window.maxTimestamp();
		if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
			ctx.registerEventTimeTimer(windowMaxTimestamp);
		}
	}

	@Override
	public String toString() {
		return "EventTimeTrigger()";
	}

	public static EventTimeTrigger create() {
		return new EventTimeTrigger();
	}
}
  • EventTimeTrigger繼承了Trigger,element類型爲Object,窗口類型爲TimeWindow;SlidingEventTimeWindows、TumblingEventTimeWindows、EventTimeSessionWindows、DynamicEventTimeSessionWindows默認都使用EventTimeTrigger
  • onElement在window.maxTimestamp()小於等於ctx.getCurrentWatermark()的時候,返回TriggerResult.FIRE,不然執行ctx.registerEventTimeTimer(window.maxTimestamp()),而後返回TriggerResult.CONTINUE;onEventTime在time等於window.maxTimestamp()的時候返回TriggerResult.FIRE,不然返回TriggerResult.CONTINUE;onProcessingTime則返回TriggerResult.CONTINUE
  • canMerge返回true;onMerge在window.maxTimestamp()大於ctx.getCurrentWatermark()的時候會執行ctx.registerEventTimeTimer(windowMaxTimestamp);clear則執行ctx.deleteEventTimeTimer(window.maxTimestamp())

ProcessingTimeTrigger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.javaapi

@PublicEvolving
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
	private static final long serialVersionUID = 1L;

	private ProcessingTimeTrigger() {}

	@Override
	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
		ctx.registerProcessingTimeTimer(window.maxTimestamp());
		return TriggerResult.CONTINUE;
	}

	@Override
	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
		return TriggerResult.CONTINUE;
	}

	@Override
	public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
		return TriggerResult.FIRE;
	}

	@Override
	public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
		ctx.deleteProcessingTimeTimer(window.maxTimestamp());
	}

	@Override
	public boolean canMerge() {
		return true;
	}

	@Override
	public void onMerge(TimeWindow window,
			OnMergeContext ctx) {
		// only register a timer if the time is not yet past the end of the merged window
		// this is in line with the logic in onElement(). If the time is past the end of
		// the window onElement() will fire and setting a timer here would fire the window twice.
		long windowMaxTimestamp = window.maxTimestamp();
		if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
			ctx.registerProcessingTimeTimer(windowMaxTimestamp);
		}
	}

	@Override
	public String toString() {
		return "ProcessingTimeTrigger()";
	}

	public static ProcessingTimeTrigger create() {
		return new ProcessingTimeTrigger();
	}

}
  • ProcessingTimeTrigger繼承了Trigger,element類型爲Object,窗口類型爲TimeWindow;SlidingProcessingTimeWindows、TumblingProcessingTimeWindows、ProcessingTimeSessionWindows、DynamicProcessingTimeSessionWindows默認都使用ProcessingTimeTrigger
  • onElement執行ctx.registerProcessingTimeTimer(window.maxTimestamp()),而後返回TriggerResult.CONTINUE;onEventTime返回TriggerResult.CONTINUE;onProcessingTime則返回TriggerResult.FIRE
  • canMerge返回true;onMerge在window.maxTimestamp()大於ctx.getCurrentWatermark()的時候會執行ctx.registerProcessingTimeTimer(windowMaxTimestamp);clear則執行ctx.deleteProcessingTimeTimer(window.maxTimestamp())

NeverTrigger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.javaide

@Internal
	public static class NeverTrigger extends Trigger<Object, GlobalWindow> {
		private static final long serialVersionUID = 1L;

		@Override
		public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
			return TriggerResult.CONTINUE;
		}

		@Override
		public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
			return TriggerResult.CONTINUE;
		}

		@Override
		public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
			return TriggerResult.CONTINUE;
		}

		@Override
		public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}

		@Override
		public void onMerge(GlobalWindow window, OnMergeContext ctx) {
		}
	}
  • NeverTrigger的onElement、onEventTime、onProcessingTime均返回TriggerResult.CONTINUE;GlobalWindows默認使用的是NeverTrigger

CountTrigger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java函數

@PublicEvolving
public class CountTrigger<W extends Window> extends Trigger<Object, W> {
	private static final long serialVersionUID = 1L;

	private final long maxCount;

	private final ReducingStateDescriptor<Long> stateDesc =
			new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);

	private CountTrigger(long maxCount) {
		this.maxCount = maxCount;
	}

	@Override
	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
		ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
		count.add(1L);
		if (count.get() >= maxCount) {
			count.clear();
			return TriggerResult.FIRE;
		}
		return TriggerResult.CONTINUE;
	}

	@Override
	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
		return TriggerResult.CONTINUE;
	}

	@Override
	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
		return TriggerResult.CONTINUE;
	}

	@Override
	public void clear(W window, TriggerContext ctx) throws Exception {
		ctx.getPartitionedState(stateDesc).clear();
	}

	@Override
	public boolean canMerge() {
		return true;
	}

	@Override
	public void onMerge(W window, OnMergeContext ctx) throws Exception {
		ctx.mergePartitionedState(stateDesc);
	}

	@Override
	public String toString() {
		return "CountTrigger(" +  maxCount + ")";
	}

	public static <W extends Window> CountTrigger<W> of(long maxCount) {
		return new CountTrigger<>(maxCount);
	}

	private static class Sum implements ReduceFunction<Long> {
		private static final long serialVersionUID = 1L;

		@Override
		public Long reduce(Long value1, Long value2) throws Exception {
			return value1 + value2;
		}

	}
}
  • CountTrigger繼承了Trigger,指定了element類型爲Object類型;它定義了maxCount及ReducingStateDescriptor;其中ReducingStateDescriptor用於窗口計數(它使用的是本身定義的Sum函數),在onElement方法裏頭,當計數大於等於maxCount時,則會清空計數,而後返回TriggerResult.FIRE,不然返回TriggerResult.CONTINUE;onEventTime、onProcessingTime均返回TriggerResult.CONTINUE;canMerge返回true;onMerge執行的是ctx.mergePartitionedState(stateDesc);clear執行的是ctx.getPartitionedState(stateDesc).clear()

PurgingTrigger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.javathis

@PublicEvolving
public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> {
	private static final long serialVersionUID = 1L;

	private Trigger<T, W> nestedTrigger;

	private  PurgingTrigger(Trigger<T, W> nestedTrigger) {
		this.nestedTrigger = nestedTrigger;
	}

	@Override
	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
		TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
		return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
	}

	@Override
	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
		TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
		return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
	}

	@Override
	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
		TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, window, ctx);
		return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
	}

	@Override
	public void clear(W window, TriggerContext ctx) throws Exception {
		nestedTrigger.clear(window, ctx);
	}

	@Override
	public boolean canMerge() {
		return nestedTrigger.canMerge();
	}

	@Override
	public void onMerge(W window, OnMergeContext ctx) throws Exception {
		nestedTrigger.onMerge(window, ctx);
	}

	@Override
	public String toString() {
		return "PurgingTrigger(" + nestedTrigger.toString() + ")";
	}

	public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
		return new PurgingTrigger<>(nestedTrigger);
	}

	@VisibleForTesting
	public Trigger<T, W> getNestedTrigger() {
		return nestedTrigger;
	}
}
  • PurgingTrigger是包裝型的Trigger,它包裝了nestedTrigger,其onElement、onEventTime、onProcessingTime根據nestedTrigger的返回結果,在triggerResult.isFire()爲true的時候,包裝返回TriggerResult.FIRE_AND_PURGE;canMerge、onMerge、clear等方法均是委託給nestedTrigger處理

小結

  • Trigger接收兩個泛型,一個是element類型,一個是窗口類型;它定義了onElement、onProcessingTime、onEventTime、canMerge、onMerge、clear幾個方法,其中onElement、onProcessingTime、onEventTime均須要返回TriggerResult;TriggerResult用於表示trigger在onElement、onProcessingTime、onEventTime被回調時返回的action枚舉,它有fire、purge兩個屬性(fire表示是否要觸發window的computation操做;而purge表示是否要清理window的窗口數據),CONTINUE、FIRE_AND_PURGE、FIRE、PURGE五個枚舉
  • SlidingEventTimeWindows、TumblingEventTimeWindows、EventTimeSessionWindows、DynamicEventTimeSessionWindows默認都使用EventTimeTrigger;SlidingProcessingTimeWindows、TumblingProcessingTimeWindows、ProcessingTimeSessionWindows、DynamicProcessingTimeSessionWindows默認都使用ProcessingTimeTrigger;GlobalWindows默認使用的是NeverTrigger
  • CountTrigger主要用於計數的窗口類型,它使用ReducingStateDescriptor來進行窗口計數,在onElement方法裏頭,當計數大於等於maxCount時,則會清空計數,而後返回TriggerResult.FIRE,不然返回TriggerResult.CONTINUE;PurgingTrigger是包裝型的Trigger,它包裝了nestedTrigger,其onElement、onEventTime、onProcessingTime根據nestedTrigger的返回結果,在triggerResult.isFire()爲true的時候,包裝返回TriggerResult.FIRE_AND_PURGE;canMerge、onMerge、clear等方法均是委託給nestedTrigger處理

doc

相關文章
相關標籤/搜索