聊聊flink DataStream的window coGroup操做

本文主要研究一下flink DataStream的window coGroup操做java

實例

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});
  • 這裏展現了DataStream的window coGroup操做的基本用法

DataStream.coGroup

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.javaapache

@Public
public class DataStream<T> {

    //......

    public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {
        return new CoGroupedStreams<>(this, otherStream);
    }

    //......
}
  • DataStream的coGroup操做建立的是CoGroupedStreams

CoGroupedStreams

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.javasegmentfault

@Public
public class CoGroupedStreams<T1, T2> {

    private final DataStream<T1> input1;

    private final DataStream<T2> input2;

    public CoGroupedStreams(DataStream<T1> input1, DataStream<T2> input2) {
        this.input1 = requireNonNull(input1);
        this.input2 = requireNonNull(input2);
    }

    public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector)  {
        Preconditions.checkNotNull(keySelector);
        final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
        return where(keySelector, keyType);
    }

    public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, TypeInformation<KEY> keyType)  {
        Preconditions.checkNotNull(keySelector);
        Preconditions.checkNotNull(keyType);
        return new Where<>(input1.clean(keySelector), keyType);
    }

    //.......
}
  • CoGroupedStreams提供了where操做,用於指定input1的keySelector,它建立並返回Where對象

Where

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.javaapi

@Public
    public class Where<KEY> {

        private final KeySelector<T1, KEY> keySelector1;
        private final TypeInformation<KEY> keyType;

        Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
            this.keySelector1 = keySelector1;
            this.keyType = keyType;
        }

        public EqualTo equalTo(KeySelector<T2, KEY> keySelector)  {
            Preconditions.checkNotNull(keySelector);
            final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
            return equalTo(keySelector, otherKey);
        }

        public EqualTo equalTo(KeySelector<T2, KEY> keySelector, TypeInformation<KEY> keyType)  {
            Preconditions.checkNotNull(keySelector);
            Preconditions.checkNotNull(keyType);

            if (!keyType.equals(this.keyType)) {
                throw new IllegalArgumentException("The keys for the two inputs are not equal: " +
                        "first key = " + this.keyType + " , second key = " + keyType);
            }

            return new EqualTo(input2.clean(keySelector));
        }

        //......
    }
  • Where對象提供了equalTo操做,用於指定input2的keySelector,它建立並返回EqualTo對象

EqualTo

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.javaapp

@Public
        public class EqualTo {

            private final KeySelector<T2, KEY> keySelector2;

            EqualTo(KeySelector<T2, KEY> keySelector2) {
                this.keySelector2 = requireNonNull(keySelector2);
            }

            @PublicEvolving
            public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
                return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null);
            }
        }
  • EqualTo對象提供了window操做,它建立並返回WithWindow對象

WithWindow

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.javaide

@Public
    public static class WithWindow<T1, T2, KEY, W extends Window> {
        private final DataStream<T1> input1;
        private final DataStream<T2> input2;

        private final KeySelector<T1, KEY> keySelector1;
        private final KeySelector<T2, KEY> keySelector2;

        private final TypeInformation<KEY> keyType;

        private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner;

        private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger;

        private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;

        private final Time allowedLateness;

        private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream;

        protected WithWindow(DataStream<T1> input1,
                DataStream<T2> input2,
                KeySelector<T1, KEY> keySelector1,
                KeySelector<T2, KEY> keySelector2,
                TypeInformation<KEY> keyType,
                WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
                Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
                Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
                Time allowedLateness) {
            this.input1 = input1;
            this.input2 = input2;

            this.keySelector1 = keySelector1;
            this.keySelector2 = keySelector2;
            this.keyType = keyType;

            this.windowAssigner = windowAssigner;
            this.trigger = trigger;
            this.evictor = evictor;

            this.allowedLateness = allowedLateness;
        }

        @PublicEvolving
        public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
            return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
                    windowAssigner, newTrigger, evictor, allowedLateness);
        }

        @PublicEvolving
        public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
            return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
                    windowAssigner, trigger, newEvictor, allowedLateness);
        }

        @PublicEvolving
        public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
            return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
                    windowAssigner, trigger, evictor, newLateness);
        }

        public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {

            TypeInformation<T> resultType = TypeExtractor.getCoGroupReturnTypes(
                function,
                input1.getType(),
                input2.getType(),
                "CoGroup",
                false);

            return apply(function, resultType);
        }

        @PublicEvolving
        @Deprecated
        public <T> SingleOutputStreamOperator<T> with(CoGroupFunction<T1, T2, T> function) {
            return (SingleOutputStreamOperator<T>) apply(function);
        }

        public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            //clean the closure
            function = input1.getExecutionEnvironment().clean(function);

            UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType());
            UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2);

            DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
                    .map(new Input1Tagger<T1, T2>())
                    .setParallelism(input1.getParallelism())
                    .returns(unionType);
            DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
                    .map(new Input2Tagger<T1, T2>())
                    .setParallelism(input2.getParallelism())
                    .returns(unionType);

            DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);

            // we explicitly create the keyed stream to manually pass the key type information in
            windowedStream =
                    new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
                    .window(windowAssigner);

            if (trigger != null) {
                windowedStream.trigger(trigger);
            }
            if (evictor != null) {
                windowedStream.evictor(evictor);
            }
            if (allowedLateness != null) {
                windowedStream.allowedLateness(allowedLateness);
            }

            return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
        }

        @PublicEvolving
        @Deprecated
        public <T> SingleOutputStreamOperator<T> with(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
            return (SingleOutputStreamOperator<T>) apply(function, resultType);
        }

        @VisibleForTesting
        Time getAllowedLateness() {
            return allowedLateness;
        }

        @VisibleForTesting
        WindowedStream<TaggedUnion<T1, T2>, KEY, W> getWindowedStream() {
            return windowedStream;
        }
    }
  • WithWindow能夠設置windowAssigner、trigger、evictor、allowedLateness,它提供apply操做(with操做被標記爲廢棄)
  • apply操做接收CoGroupFunction,它內部是先根據兩個keySelector建立UnionKeySelector,而後對兩個input stream分別使用Input1Tagger及Input2Tagger進行map轉換爲TaggedUnion對象的stream,而後執行taggedInput1.union(taggedInput2)獲得unionStream,以後使用UnionKeySelector將unionStream轉換爲KeyedStream,以後在對KeyedStream執行window操做,把原來的windowAssigner、trigger、evictor、allowedLateness都賦值過去,最後將用戶定義的CoGroupFunction包裝爲CoGroupWindowFunction,而後調用windowedStream.apply方法
  • 能夠看到apply操做內部轉化的WindowedStream,其element類型爲TaggedUnion;WindowedStream使用的KeyedStream,它的KeySelector爲UnionKeySelector;而KeyedStream是基於TaggedUnion類型的DataStream,是taggedInput1.union(taggedInput2)操做而來;而taggedInput1及taggedInput2是對原始input stream進行map操做而來,使用的MapFunction分別是Input1Tagger及Input2Tagger

CoGroupFunction

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/CoGroupFunction.javaui

@Public
@FunctionalInterface
public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {

    void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
}
  • CoGroupFunction繼承了Function,它定義了coGroup方法,該方法接收兩個Iterable類型的element集合

Input1Tagger及Input2Tagger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.javathis

private static class Input1Tagger<T1, T2> implements MapFunction<T1, TaggedUnion<T1, T2>> {
        private static final long serialVersionUID = 1L;

        @Override
        public TaggedUnion<T1, T2> map(T1 value) throws Exception {
            return TaggedUnion.one(value);
        }
    }

    private static class Input2Tagger<T1, T2> implements MapFunction<T2, TaggedUnion<T1, T2>> {
        private static final long serialVersionUID = 1L;

        @Override
        public TaggedUnion<T1, T2> map(T2 value) throws Exception {
            return TaggedUnion.two(value);
        }
    }
  • Input1Tagger及Input2Tagger實現了MapFunction,該map方法返回的類型爲TaggedUnion

TaggedUnion

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.javacode

@Internal
    public static class TaggedUnion<T1, T2> {
        private final T1 one;
        private final T2 two;

        private TaggedUnion(T1 one, T2 two) {
            this.one = one;
            this.two = two;
        }

        public boolean isOne() {
            return one != null;
        }

        public boolean isTwo() {
            return two != null;
        }

        public T1 getOne() {
            return one;
        }

        public T2 getTwo() {
            return two;
        }

        public static <T1, T2> TaggedUnion<T1, T2> one(T1 one) {
            return new TaggedUnion<>(one, null);
        }

        public static <T1, T2> TaggedUnion<T1, T2> two(T2 two) {
            return new TaggedUnion<>(null, two);
        }
    }
  • TaggedUnion裏頭有one、two兩個屬性,它提供了兩個靜態工廠方法one及two,能夠看到TaggedUnion對象要麼one爲null,要麼two爲null,不可能兩個同時有值

UnionKeySelector

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.javaorm

private static class UnionKeySelector<T1, T2, KEY> implements KeySelector<TaggedUnion<T1, T2>, KEY> {
        private static final long serialVersionUID = 1L;

        private final KeySelector<T1, KEY> keySelector1;
        private final KeySelector<T2, KEY> keySelector2;

        public UnionKeySelector(KeySelector<T1, KEY> keySelector1,
                KeySelector<T2, KEY> keySelector2) {
            this.keySelector1 = keySelector1;
            this.keySelector2 = keySelector2;
        }

        @Override
        public KEY getKey(TaggedUnion<T1, T2> value) throws Exception{
            if (value.isOne()) {
                return keySelector1.getKey(value.getOne());
            } else {
                return keySelector2.getKey(value.getTwo());
            }
        }
    }
  • UnionKeySelector有兩個KeySelector屬性,它的getKey操做根據TaggedUnion來判斷,若是是one,則使用keySelector1.getKey(value.getOne()),不然使用keySelector2.getKey(value.getTwo())

DataStream.union

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

@Public
public class DataStream<T> {

    //......

    @SafeVarargs
    public final DataStream<T> union(DataStream<T>... streams) {
        List<StreamTransformation<T>> unionedTransforms = new ArrayList<>();
        unionedTransforms.add(this.transformation);

        for (DataStream<T> newStream : streams) {
            if (!getType().equals(newStream.getType())) {
                throw new IllegalArgumentException("Cannot union streams of different types: "
                        + getType() + " and " + newStream.getType());
            }

            unionedTransforms.add(newStream.getTransformation());
        }
        return new DataStream<>(this.environment, new UnionTransformation<>(unionedTransforms));
    }

    //......
}
  • DataStream的union操做,使用UnionTransformation建立了一個新的DataStream;注意union操做須要兩個stream使用相同類型的element,這就是爲何WithWindow的apply操做對兩個input stream分別使用Input1Tagger及Input2Tagger進行map轉換爲TaggedUnion對象來統一兩個stream的element類型的緣由

CoGroupWindowFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java

private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>
            extends WrappingFunction<CoGroupFunction<T1, T2, T>>
            implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {

        private static final long serialVersionUID = 1L;

        public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {
            super(userFunction);
        }

        @Override
        public void apply(KEY key,
                W window,
                Iterable<TaggedUnion<T1, T2>> values,
                Collector<T> out) throws Exception {

            List<T1> oneValues = new ArrayList<>();
            List<T2> twoValues = new ArrayList<>();

            for (TaggedUnion<T1, T2> val: values) {
                if (val.isOne()) {
                    oneValues.add(val.getOne());
                } else {
                    twoValues.add(val.getTwo());
                }
            }
            wrappedFunction.coGroup(oneValues, twoValues, out);
        }
    }
  • CoGroupWindowFunction繼承了WrappingFunction(WrappingFunction繼承了AbstractRichFunction,覆蓋了父類的open、close、setRuntimeContext方法,用於管理wrappedFunction),實現了WindowFunction接口,其apply方法對TaggedUnion類型的Iterable數據進行拆解,分別拆分到oneValues及twoValues中,而後調用用戶定義的CoGroupFunction的coGroup方法

小結

  • DataStream提供了coGroup方法,用於執行window coGroup操做,它返回的是CoGroupedStreams;CoGroupedStreams主要是提供where操做來構建Where對象;Where對象主要提供equalTo操做用於構建EqualTo對象;EqualTo對象提供window操做用於構建WithWindow對象;WithWindow能夠設置windowAssigner、trigger、evictor、allowedLateness,它提供apply操做
  • CoGroupedStreams的WithWindow對象的apply操做接收CoGroupFunction,它內部是先根據兩個keySelector建立UnionKeySelector,而後對兩個input stream分別使用Input1Tagger及Input2Tagger進行map轉換爲TaggedUnion對象的stream,而後執行taggedInput1.union(taggedInput2)獲得unionStream,以後使用UnionKeySelector將unionStream轉換爲KeyedStream,以後在對KeyedStream執行window操做,把原來的windowAssigner、trigger、evictor、allowedLateness都賦值過去,最後將用戶定義的CoGroupFunction包裝爲CoGroupWindowFunction,而後調用windowedStream.apply方法
  • CoGroupedStreams的WithWindow對象的apply操做藉助了DataStream的union操做類合併兩個stream,而後轉換爲KeyedStream,這裏關鍵的兩個類分別是TaggedUnion及UnionKeySelector;TaggedUnion裏頭有one、two兩個屬性,它提供了兩個靜態工廠方法one及two,能夠看到TaggedUnion對象要麼one爲null,要麼two爲null,不可能兩個同時有值;UnionKeySelector有兩個KeySelector屬性,它的getKey操做根據TaggedUnion來判斷,若是是one,則使用keySelector1.getKey(value.getOne()),不然使用keySelector2.getKey(value.getTwo())(藉助TaggedUnion類統一兩個stream的element類型,而後好執行union操做)
  • CoGroupWindowFunction繼承了WrappingFunction(WrappingFunction繼承了AbstractRichFunction,覆蓋了父類的open、close、setRuntimeContext方法,用於管理wrappedFunction),實現了WindowFunction接口,其apply方法對TaggedUnion類型的Iterable數據進行拆解,分別拆分到oneValues及twoValues中,而後調用用戶定義的CoGroupFunction的coGroup方法
  • CoGroupFunction繼承了Function,它定義了coGroup方法,該方法接收兩個Iterable類型的element集合;JoinedStreams的WithWindow對象的apply方法內部將JoinFunction或者FlatJoinFunction包裝爲CoGroupFunction(JoinFunction使用JoinCoGroupFunction包裝,FlatJoinFunction使用FlatJoinCoGroupFunction包裝),而後去調用CoGroupedStreams的WithWindow的apply方法;而JoinCoGroupFunction及FlatJoinCoGroupFunction繼承了WrappingFunction,同時實現CoGroupFunction接口定義的coGroup方法,默認是遍歷第一個集合,對其每一個元素遍歷第二個集合,挨個執行JoinFunction或FlatJoinFunction的join方法(這裏的操做對集合爲空的狀況不作任何操做,於是實現的就是inner join效果;用戶使用coGroup操做能夠自定義CoGroupFunction實現outer join)

doc

相關文章
相關標籤/搜索