聊聊flink的Session Window

本文主要研究一下flink的Session Windowhtml

MergingWindowAssigner

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/MergingWindowAssigner.javajava

@PublicEvolving
public abstract class MergingWindowAssigner<T, W extends Window> extends WindowAssigner<T, W> {
    private static final long serialVersionUID = 1L;

    /**
     * Determines which windows (if any) should be merged.
     *
     * @param windows The window candidates.
     * @param callback A callback that can be invoked to signal which windows should be merged.
     */
    public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback);

    /**
     * Callback to be used in {@link #mergeWindows(Collection, MergeCallback)} for specifying which
     * windows should be merged.
     */
    public interface MergeCallback<W> {

        /**
         * Specifies that the given windows should be merged into the result window.
         *
         * @param toBeMerged The list of windows that should be merged into one window.
         * @param mergeResult The resulting merged window.
         */
        void merge(Collection<W> toBeMerged, W mergeResult);
    }
}
  • MergingWindowAssigner繼承了WindowAssigner,它本身定義了mergeWindows抽象方法,該方法有一個MergeCallback類型參數,MergeCallback接口定義了merge方法,傳入merge前的windows及合併後的window

EventTimeSessionWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.javaapache

public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    protected long sessionTimeout;

    protected EventTimeSessionWindows(long sessionTimeout) {
        if (sessionTimeout <= 0) {
            throw new IllegalArgumentException("EventTimeSessionWindows parameters must satisfy 0 < size");
        }

        this.sessionTimeout = sessionTimeout;
    }

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
    }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }

    @Override
    public String toString() {
        return "EventTimeSessionWindows(" + sessionTimeout + ")";
    }

    /**
     * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
     * elements to sessions based on the element timestamp.
     *
     * @param size The session timeout, i.e. the time gap between sessions
     * @return The policy.
     */
    public static EventTimeSessionWindows withGap(Time size) {
        return new EventTimeSessionWindows(size.toMilliseconds());
    }

    /**
     * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
     * elements to sessions based on the element timestamp.
     *
     * @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
     * @return The policy.
     */
    @PublicEvolving
    public static <T> DynamicEventTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
        return new DynamicEventTimeSessionWindows<>(sessionWindowTimeGapExtractor);
    }

    @Override
    public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }

    @Override
    public boolean isEventTime() {
        return true;
    }

    /**
     * Merge overlapping {@link TimeWindow}s.
     */
    public void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {
        TimeWindow.mergeWindows(windows, c);
    }

}
  • EventTimeSessionWindows繼承了MergingWindowAssigner,它的構造器參數爲sessionTimeout;assignWindows方法返回的TimeWindow的start爲timestamp,end爲timestamp + sessionTimeout
  • getDefaultTrigger方法返回的是EventTimeTrigger;getWindowSerializer返回的是TimeWindow.Serializer();isEventTime返回的爲true;mergeWindows方法調用的是TimeWindow.mergeWindows方法
  • EventTimeSessionWindows定義了兩個靜態工廠方法,分別是withGap及withDynamicGap,其中withGap建立的是EventTimeSessionWindows,withDynamicGap建立的是DynamicEventTimeSessionWindows

ProcessingTimeSessionWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.javawindows

public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    protected long sessionTimeout;

    protected ProcessingTimeSessionWindows(long sessionTimeout) {
        if (sessionTimeout <= 0) {
            throw new IllegalArgumentException("ProcessingTimeSessionWindows parameters must satisfy 0 < size");
        }

        this.sessionTimeout = sessionTimeout;
    }

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        long currentProcessingTime = context.getCurrentProcessingTime();
        return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
    }

    @Override
    public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return ProcessingTimeTrigger.create();
    }

    @Override
    public String toString() {
        return "ProcessingTimeSessionWindows(" + sessionTimeout + ")";
    }

    /**
     * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
     * elements to sessions based on the element timestamp.
     *
     * @param size The session timeout, i.e. the time gap between sessions
     * @return The policy.
     */
    public static ProcessingTimeSessionWindows withGap(Time size) {
        return new ProcessingTimeSessionWindows(size.toMilliseconds());
    }

    /**
     * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
     * elements to sessions based on the element timestamp.
     *
     * @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
     * @return The policy.
     */
    @PublicEvolving
    public static <T> DynamicProcessingTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
        return new DynamicProcessingTimeSessionWindows<>(sessionWindowTimeGapExtractor);
    }

    @Override
    public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }

    @Override
    public boolean isEventTime() {
        return false;
    }

    /**
     * Merge overlapping {@link TimeWindow}s.
     */
    public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) {
        TimeWindow.mergeWindows(windows, c);
    }

}
  • ProcessingTimeSessionWindows繼承了MergingWindowAssigner,它的構造器參數爲sessionTimeout;assignWindows方法返回的TimeWindow的start爲currentProcessingTime(這裏currentProcessingTime值爲context.getCurrentProcessingTime()),end爲currentProcessingTime + sessionTimeout
  • getDefaultTrigger方法返回的是ProcessingTimeTrigger;getWindowSerializer返回的是TimeWindow.Serializer();isEventTime返回的爲false;mergeWindows方法調用的是TimeWindow.mergeWindows方法
  • ProcessingTimeSessionWindows定義了兩個靜態工廠方法,分別是withGap及withDynamicGap,其中withGap建立的是ProcessingTimeSessionWindows,withDynamicGap建立的是DynamicProcessingTimeSessionWindows

SessionWindowTimeGapExtractor

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.javaapi

@PublicEvolving
public interface SessionWindowTimeGapExtractor<T> extends Serializable {
    /**
     * Extracts the session time gap.
     * @param element The input element.
     * @return The session time gap in milliseconds.
     */
    long extract(T element);
}
  • SessionWindowTimeGapExtractor接口定義了extract方法,用於從element中提取sessionTimeout參數

DynamicEventTimeSessionWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.javasession

@PublicEvolving
public class DynamicEventTimeSessionWindows<T> extends MergingWindowAssigner<T, TimeWindow> {
    private static final long serialVersionUID = 1L;

    protected SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor;

    protected DynamicEventTimeSessionWindows(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
        this.sessionWindowTimeGapExtractor = sessionWindowTimeGapExtractor;
    }

    @Override
    public Collection<TimeWindow> assignWindows(T element, long timestamp, WindowAssignerContext context) {
        long sessionTimeout = sessionWindowTimeGapExtractor.extract(element);
        if (sessionTimeout <= 0) {
            throw new IllegalArgumentException("Dynamic session time gap must satisfy 0 < gap");
        }
        return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
    }

    @SuppressWarnings("unchecked")
    @Override
    public Trigger<T, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return (Trigger<T, TimeWindow>) EventTimeTrigger.create();
    }

    @Override
    public String toString() {
        return "DynamicEventTimeSessionWindows()";
    }

    /**
     * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
     * elements to sessions based on the element timestamp.
     *
     * @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
     * @return The policy.
     */
    public static <T> DynamicEventTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
        return new DynamicEventTimeSessionWindows<>(sessionWindowTimeGapExtractor);
    }

    @Override
    public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }

    @Override
    public boolean isEventTime() {
        return true;
    }

    /**
     * Merge overlapping {@link TimeWindow}s.
     */
    public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) {
        TimeWindow.mergeWindows(windows, c);
    }

}
  • DynamicEventTimeSessionWindows也繼承了MergingWindowAssigner,與EventTimeSessionWindows不一樣的是,它的構造器參數爲SessionWindowTimeGapExtractor
  • assignWindows方法首先使用sessionWindowTimeGapExtractor從element中提取sessionTimeout,而後返回TimeWindow(timestamp, timestamp + sessionTimeout);getDefaultTrigger方法返回的是EventTimeTrigger;isEventTime返回的爲true;mergeWindows方法調用的是TimeWindow.mergeWindows方法
  • DynamicEventTimeSessionWindows定義了withDynamicGap的靜態工廠方法,用於建立DynamicEventTimeSessionWindows

DynamicProcessingTimeSessionWindows

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.javaapp

@PublicEvolving
public class DynamicProcessingTimeSessionWindows<T> extends MergingWindowAssigner<T, TimeWindow> {
    private static final long serialVersionUID = 1L;

    protected SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor;

    protected DynamicProcessingTimeSessionWindows(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
        this.sessionWindowTimeGapExtractor = sessionWindowTimeGapExtractor;
    }

    @Override
    public Collection<TimeWindow> assignWindows(T element, long timestamp, WindowAssignerContext context) {
        long currentProcessingTime = context.getCurrentProcessingTime();
        long sessionTimeout = sessionWindowTimeGapExtractor.extract(element);
        if (sessionTimeout <= 0) {
            throw new IllegalArgumentException("Dynamic session time gap must satisfy 0 < gap");
        }
        return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
    }

    @SuppressWarnings("unchecked")
    @Override
    public Trigger<T, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
        return (Trigger<T, TimeWindow>) ProcessingTimeTrigger.create();
    }

    @Override
    public String toString() {
        return "DynamicProcessingTimeSessionWindows()";
    }

    /**
     * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
     * elements to sessions based on the element timestamp.
     *
     * @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
     * @return The policy.
     */
    public static <T> DynamicProcessingTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
        return new DynamicProcessingTimeSessionWindows<>(sessionWindowTimeGapExtractor);
    }

    @Override
    public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }

    @Override
    public boolean isEventTime() {
        return false;
    }

    /**
     * Merge overlapping {@link TimeWindow}s.
     */
    public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) {
        TimeWindow.mergeWindows(windows, c);
    }

}
  • DynamicProcessingTimeSessionWindows也繼承了MergingWindowAssigner,與ProcessingTimeSessionWindows不一樣的是,它的構造器參數爲SessionWindowTimeGapExtractor
  • assignWindows方法首先使用sessionWindowTimeGapExtractor從element中提取sessionTimeout,而後返回TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout)(這裏currentProcessingTime的值爲context.getCurrentProcessingTime());getDefaultTrigger方法返回的是ProcessingTimeTrigger;isEventTime返回的爲false;mergeWindows方法調用的是TimeWindow.mergeWindows方法
  • DynamicProcessingTimeSessionWindows定義了withDynamicGap的靜態工廠方法,用於建立DynamicProcessingTimeSessionWindows

小結

  • flink的session window主要有EventTimeSessionWindows、DynamicEventTimeSessionWindows、ProcessingTimeSessionWindows、DynamicProcessingTimeSessionWindows,它們都繼承了MergingWindowAssigner;MergingWindowAssigner定義了mergeWindows抽象方法
  • EventTimeSessionWindows與ProcessingTimeSessionWindows的構造器參數都是sessionTimeout,不一樣的是,assignWindows中,ProcessingTimeSessionWindows使用context.getCurrentProcessingTime()替代了方法timestamp參數來計算TimeWindow;getDefaultTrigger方法前者返回EventTimeTrigger,後者返回ProcessingTimeTrigger;isEventTime方法前者返回true,後者返回false
  • DynamicEventTimeSessionWindows與DynamicProcessingTimeSessionWindows,它們與非dynamic的區別是,它們的構造器參數爲SessionWindowTimeGapExtractor;SessionWindowTimeGapExtractor接口定義了extract方法,用於從element中提取sessionTimeout參數;而非dynamic的session window,其sessionTimeout參數在構造器傳入以後就固定了

doc

相關文章
相關標籤/搜索