本文主要研究一下flink的Global Windowhtml
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.javajava
@PublicEvolving public class GlobalWindow extends Window { private static final GlobalWindow INSTANCE = new GlobalWindow(); private GlobalWindow() { } public static GlobalWindow get() { return INSTANCE; } @Override public long maxTimestamp() { return Long.MAX_VALUE; } @Override public boolean equals(Object o) { return this == o || !(o == null || getClass() != o.getClass()); } @Override public int hashCode() { return 0; } @Override public String toString() { return "GlobalWindow"; } /** * A {@link TypeSerializer} for {@link GlobalWindow}. */ public static class Serializer extends TypeSerializerSingleton<GlobalWindow> { private static final long serialVersionUID = 1L; @Override public boolean isImmutableType() { return true; } @Override public GlobalWindow createInstance() { return GlobalWindow.INSTANCE; } @Override public GlobalWindow copy(GlobalWindow from) { return from; } @Override public GlobalWindow copy(GlobalWindow from, GlobalWindow reuse) { return from; } @Override public int getLength() { return 0; } @Override public void serialize(GlobalWindow record, DataOutputView target) throws IOException { target.writeByte(0); } @Override public GlobalWindow deserialize(DataInputView source) throws IOException { source.readByte(); return GlobalWindow.INSTANCE; } @Override public GlobalWindow deserialize(GlobalWindow reuse, DataInputView source) throws IOException { source.readByte(); return GlobalWindow.INSTANCE; } @Override public void copy(DataInputView source, DataOutputView target) throws IOException { source.readByte(); target.writeByte(0); } @Override public boolean canEqual(Object obj) { return obj instanceof Serializer; } } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.javaapache
@PublicEvolving public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> { private static final long serialVersionUID = 1L; private GlobalWindows() {} @Override public Collection<GlobalWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) { return Collections.singletonList(GlobalWindow.get()); } @Override public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return new NeverTrigger(); } @Override public String toString() { return "GlobalWindows()"; } /** * Creates a new {@code GlobalWindows} {@link WindowAssigner} that assigns * all elements to the same {@link GlobalWindow}. * * @return The global window policy. */ public static GlobalWindows create() { return new GlobalWindows(); } /** * A trigger that never fires, as default Trigger for GlobalWindows. */ @Internal public static class NeverTrigger extends Trigger<Object, GlobalWindow> { private static final long serialVersionUID = 1L; @Override public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {} @Override public void onMerge(GlobalWindow window, OnMergeContext ctx) { } } @Override public TypeSerializer<GlobalWindow> getWindowSerializer(ExecutionConfig executionConfig) { return new GlobalWindow.Serializer(); } @Override public boolean isEventTime() { return false; } }