本文主要研究一下flink DataStream的window coGroup操做java
dataStream.coGroup(otherStream) .where(0).equalTo(1) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new CoGroupFunction () {...});
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.javaapache
@Public public class DataStream<T> { //...... public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) { return new CoGroupedStreams<>(this, otherStream); } //...... }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.javasegmentfault
@Public public class CoGroupedStreams<T1, T2> { private final DataStream<T1> input1; private final DataStream<T2> input2; public CoGroupedStreams(DataStream<T1> input1, DataStream<T2> input2) { this.input1 = requireNonNull(input1); this.input2 = requireNonNull(input2); } public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector) { Preconditions.checkNotNull(keySelector); final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType()); return where(keySelector, keyType); } public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, TypeInformation<KEY> keyType) { Preconditions.checkNotNull(keySelector); Preconditions.checkNotNull(keyType); return new Where<>(input1.clean(keySelector), keyType); } //....... }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.javaapi
@Public public class Where<KEY> { private final KeySelector<T1, KEY> keySelector1; private final TypeInformation<KEY> keyType; Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) { this.keySelector1 = keySelector1; this.keyType = keyType; } public EqualTo equalTo(KeySelector<T2, KEY> keySelector) { Preconditions.checkNotNull(keySelector); final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType()); return equalTo(keySelector, otherKey); } public EqualTo equalTo(KeySelector<T2, KEY> keySelector, TypeInformation<KEY> keyType) { Preconditions.checkNotNull(keySelector); Preconditions.checkNotNull(keyType); if (!keyType.equals(this.keyType)) { throw new IllegalArgumentException("The keys for the two inputs are not equal: " + "first key = " + this.keyType + " , second key = " + keyType); } return new EqualTo(input2.clean(keySelector)); } //...... }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.javaapp
@Public public class EqualTo { private final KeySelector<T2, KEY> keySelector2; EqualTo(KeySelector<T2, KEY> keySelector2) { this.keySelector2 = requireNonNull(keySelector2); } @PublicEvolving public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null); } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.javaide
@Public public static class WithWindow<T1, T2, KEY, W extends Window> { private final DataStream<T1> input1; private final DataStream<T2> input2; private final KeySelector<T1, KEY> keySelector1; private final KeySelector<T2, KEY> keySelector2; private final TypeInformation<KEY> keyType; private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner; private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger; private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor; private final Time allowedLateness; private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream; protected WithWindow(DataStream<T1> input1, DataStream<T2> input2, KeySelector<T1, KEY> keySelector1, KeySelector<T2, KEY> keySelector2, TypeInformation<KEY> keyType, WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner, Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger, Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor, Time allowedLateness) { this.input1 = input1; this.input2 = input2; this.keySelector1 = keySelector1; this.keySelector2 = keySelector2; this.keyType = keyType; this.windowAssigner = windowAssigner; this.trigger = trigger; this.evictor = evictor; this.allowedLateness = allowedLateness; } @PublicEvolving public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, windowAssigner, newTrigger, evictor, allowedLateness); } @PublicEvolving public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, windowAssigner, trigger, newEvictor, allowedLateness); } @PublicEvolving public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, windowAssigner, trigger, evictor, newLateness); } public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) { TypeInformation<T> resultType = TypeExtractor.getCoGroupReturnTypes( function, input1.getType(), input2.getType(), "CoGroup", false); return apply(function, resultType); } @PublicEvolving @Deprecated public <T> SingleOutputStreamOperator<T> with(CoGroupFunction<T1, T2, T> function) { return (SingleOutputStreamOperator<T>) apply(function); } public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) { //clean the closure function = input1.getExecutionEnvironment().clean(function); UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType()); UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2); DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1 .map(new Input1Tagger<T1, T2>()) .setParallelism(input1.getParallelism()) .returns(unionType); DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2 .map(new Input2Tagger<T1, T2>()) .setParallelism(input2.getParallelism()) .returns(unionType); DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2); // we explicitly create the keyed stream to manually pass the key type information in windowedStream = new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType) .window(windowAssigner); if (trigger != null) { windowedStream.trigger(trigger); } if (evictor != null) { windowedStream.evictor(evictor); } if (allowedLateness != null) { windowedStream.allowedLateness(allowedLateness); } return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType); } @PublicEvolving @Deprecated public <T> SingleOutputStreamOperator<T> with(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) { return (SingleOutputStreamOperator<T>) apply(function, resultType); } @VisibleForTesting Time getAllowedLateness() { return allowedLateness; } @VisibleForTesting WindowedStream<TaggedUnion<T1, T2>, KEY, W> getWindowedStream() { return windowedStream; } }
with操做被標記爲廢棄
)flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/CoGroupFunction.javaui
@Public @FunctionalInterface public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable { void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception; }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.javathis
private static class Input1Tagger<T1, T2> implements MapFunction<T1, TaggedUnion<T1, T2>> { private static final long serialVersionUID = 1L; @Override public TaggedUnion<T1, T2> map(T1 value) throws Exception { return TaggedUnion.one(value); } } private static class Input2Tagger<T1, T2> implements MapFunction<T2, TaggedUnion<T1, T2>> { private static final long serialVersionUID = 1L; @Override public TaggedUnion<T1, T2> map(T2 value) throws Exception { return TaggedUnion.two(value); } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.javacode
@Internal public static class TaggedUnion<T1, T2> { private final T1 one; private final T2 two; private TaggedUnion(T1 one, T2 two) { this.one = one; this.two = two; } public boolean isOne() { return one != null; } public boolean isTwo() { return two != null; } public T1 getOne() { return one; } public T2 getTwo() { return two; } public static <T1, T2> TaggedUnion<T1, T2> one(T1 one) { return new TaggedUnion<>(one, null); } public static <T1, T2> TaggedUnion<T1, T2> two(T2 two) { return new TaggedUnion<>(null, two); } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.javaorm
private static class UnionKeySelector<T1, T2, KEY> implements KeySelector<TaggedUnion<T1, T2>, KEY> { private static final long serialVersionUID = 1L; private final KeySelector<T1, KEY> keySelector1; private final KeySelector<T2, KEY> keySelector2; public UnionKeySelector(KeySelector<T1, KEY> keySelector1, KeySelector<T2, KEY> keySelector2) { this.keySelector1 = keySelector1; this.keySelector2 = keySelector2; } @Override public KEY getKey(TaggedUnion<T1, T2> value) throws Exception{ if (value.isOne()) { return keySelector1.getKey(value.getOne()); } else { return keySelector2.getKey(value.getTwo()); } } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
@Public public class DataStream<T> { //...... @SafeVarargs public final DataStream<T> union(DataStream<T>... streams) { List<StreamTransformation<T>> unionedTransforms = new ArrayList<>(); unionedTransforms.add(this.transformation); for (DataStream<T> newStream : streams) { if (!getType().equals(newStream.getType())) { throw new IllegalArgumentException("Cannot union streams of different types: " + getType() + " and " + newStream.getType()); } unionedTransforms.add(newStream.getTransformation()); } return new DataStream<>(this.environment, new UnionTransformation<>(unionedTransforms)); } //...... }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window> extends WrappingFunction<CoGroupFunction<T1, T2, T>> implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> { private static final long serialVersionUID = 1L; public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) { super(userFunction); } @Override public void apply(KEY key, W window, Iterable<TaggedUnion<T1, T2>> values, Collector<T> out) throws Exception { List<T1> oneValues = new ArrayList<>(); List<T2> twoValues = new ArrayList<>(); for (TaggedUnion<T1, T2> val: values) { if (val.isOne()) { oneValues.add(val.getOne()); } else { twoValues.add(val.getTwo()); } } wrappedFunction.coGroup(oneValues, twoValues, out); } }
WrappingFunction繼承了AbstractRichFunction,覆蓋了父類的open、close、setRuntimeContext方法,用於管理wrappedFunction
),實現了WindowFunction接口,其apply方法對TaggedUnion類型的Iterable數據進行拆解,分別拆分到oneValues及twoValues中,而後調用用戶定義的CoGroupFunction的coGroup方法藉助TaggedUnion類統一兩個stream的element類型,而後好執行union操做
)WrappingFunction繼承了AbstractRichFunction,覆蓋了父類的open、close、setRuntimeContext方法,用於管理wrappedFunction
),實現了WindowFunction接口,其apply方法對TaggedUnion類型的Iterable數據進行拆解,分別拆分到oneValues及twoValues中,而後調用用戶定義的CoGroupFunction的coGroup方法這裏的操做對集合爲空的狀況不作任何操做,於是實現的就是inner join效果;用戶使用coGroup操做能夠自定義CoGroupFunction實現outer join
)