本文主要研究一下flink DataStream的join操做html
stream.join(otherStream) .where(<KeySelector>) .equalTo(<KeySelector>) .window(<WindowAssigner>) .apply(<JoinFunction>)
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.javajava
@Public public class DataStream<T> { //...... /** * Creates a join operation. See {@link JoinedStreams} for an example of how the keys * and window can be specified. */ public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) { return new JoinedStreams<>(this, otherStream); } //...... }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.javaapache
@Public public class JoinedStreams<T1, T2> { /** The first input stream. */ private final DataStream<T1> input1; /** The second input stream. */ private final DataStream<T2> input2; public JoinedStreams(DataStream<T1> input1, DataStream<T2> input2) { this.input1 = requireNonNull(input1); this.input2 = requireNonNull(input2); } public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector) { requireNonNull(keySelector); final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType()); return where(keySelector, keyType); } public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, TypeInformation<KEY> keyType) { requireNonNull(keySelector); requireNonNull(keyType); return new Where<>(input1.clean(keySelector), keyType); } //...... }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java編程
@Public public class Where<KEY> { private final KeySelector<T1, KEY> keySelector1; private final TypeInformation<KEY> keyType; Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) { this.keySelector1 = keySelector1; this.keyType = keyType; } public EqualTo equalTo(KeySelector<T2, KEY> keySelector) { requireNonNull(keySelector); final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType()); return equalTo(keySelector, otherKey); } public EqualTo equalTo(KeySelector<T2, KEY> keySelector, TypeInformation<KEY> keyType) { requireNonNull(keySelector); requireNonNull(keyType); if (!keyType.equals(this.keyType)) { throw new IllegalArgumentException("The keys for the two inputs are not equal: " + "first key = " + this.keyType + " , second key = " + keyType); } return new EqualTo(input2.clean(keySelector)); } //...... }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.javaapi
@Public public class EqualTo { private final KeySelector<T2, KEY> keySelector2; EqualTo(KeySelector<T2, KEY> keySelector2) { this.keySelector2 = requireNonNull(keySelector2); } /** * Specifies the window on which the join operation works. */ @PublicEvolving public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null); } }
/flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.javaapp
@Public public static class WithWindow<T1, T2, KEY, W extends Window> { private final DataStream<T1> input1; private final DataStream<T2> input2; private final KeySelector<T1, KEY> keySelector1; private final KeySelector<T2, KEY> keySelector2; private final TypeInformation<KEY> keyType; private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner; private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger; private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor; private final Time allowedLateness; private CoGroupedStreams.WithWindow<T1, T2, KEY, W> coGroupedWindowedStream; @PublicEvolving protected WithWindow(DataStream<T1> input1, DataStream<T2> input2, KeySelector<T1, KEY> keySelector1, KeySelector<T2, KEY> keySelector2, TypeInformation<KEY> keyType, WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner, Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger, Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor, Time allowedLateness) { this.input1 = requireNonNull(input1); this.input2 = requireNonNull(input2); this.keySelector1 = requireNonNull(keySelector1); this.keySelector2 = requireNonNull(keySelector2); this.keyType = requireNonNull(keyType); this.windowAssigner = requireNonNull(windowAssigner); this.trigger = trigger; this.evictor = evictor; this.allowedLateness = allowedLateness; } @PublicEvolving public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, windowAssigner, newTrigger, evictor, allowedLateness); } @PublicEvolving public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, windowAssigner, trigger, newEvictor, allowedLateness); } @PublicEvolving public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, windowAssigner, trigger, evictor, newLateness); } public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) { TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType( function, JoinFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, input1.getType(), input2.getType(), "Join", false); return apply(function, resultType); } @PublicEvolving @Deprecated public <T> SingleOutputStreamOperator<T> with(JoinFunction<T1, T2, T> function) { return (SingleOutputStreamOperator<T>) apply(function); } public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) { //clean the closure function = input1.getExecutionEnvironment().clean(function); coGroupedWindowedStream = input1.coGroup(input2) .where(keySelector1) .equalTo(keySelector2) .window(windowAssigner) .trigger(trigger) .evictor(evictor) .allowedLateness(allowedLateness); return coGroupedWindowedStream .apply(new FlatJoinCoGroupFunction<>(function), resultType); } @PublicEvolving @Deprecated public <T> SingleOutputStreamOperator<T> with(FlatJoinFunction<T1, T2, T> function, TypeInformation<T> resultType) { return (SingleOutputStreamOperator<T>) apply(function, resultType); } public <T> DataStream<T> apply(FlatJoinFunction<T1, T2, T> function) { TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType( function, FlatJoinFunction.class, 0, 1, 2, new int[]{2, 0}, input1.getType(), input2.getType(), "Join", false); return apply(function, resultType); } @PublicEvolving @Deprecated public <T> SingleOutputStreamOperator<T> with(FlatJoinFunction<T1, T2, T> function) { return (SingleOutputStreamOperator<T>) apply(function); } public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) { //clean the closure function = input1.getExecutionEnvironment().clean(function); coGroupedWindowedStream = input1.coGroup(input2) .where(keySelector1) .equalTo(keySelector2) .window(windowAssigner) .trigger(trigger) .evictor(evictor) .allowedLateness(allowedLateness); return coGroupedWindowedStream .apply(new JoinCoGroupFunction<>(function), resultType); } @PublicEvolving @Deprecated public <T> SingleOutputStreamOperator<T> with(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) { return (SingleOutputStreamOperator<T>) apply(function, resultType); } @VisibleForTesting Time getAllowedLateness() { return allowedLateness; } @VisibleForTesting CoGroupedStreams.WithWindow<T1, T2, KEY, W> getCoGroupedWindowedStream() { return coGroupedWindowedStream; } }
with操做被標記爲廢棄
)JoinFunction使用JoinCoGroupFunction包裝,FlatJoinFunction使用FlatJoinCoGroupFunction包裝
)傳遞給CoGroupedStreams的WithWindow的apply方法flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/JoinFunction.javaide
@Public @FunctionalInterface public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable { /** * The join method, called once per joined pair of elements. * * @param first The element from first input. * @param second The element from second input. * @return The resulting element. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ OUT join(IN1 first, IN2 second) throws Exception; }
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/FlatJoinFunction.javaoop
@Public @FunctionalInterface public interface FlatJoinFunction<IN1, IN2, OUT> extends Function, Serializable { /** * The join method, called once per joined pair of elements. * * @param first The element from first input. * @param second The element from second input. * @param out The collector used to return zero, one, or more elements. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ void join (IN1 first, IN2 second, Collector<OUT> out) throws Exception; }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/CoGroupedStreams.javaui
@Public public class CoGroupedStreams<T1, T2> { //...... @Public public static class WithWindow<T1, T2, KEY, W extends Window> { private final DataStream<T1> input1; private final DataStream<T2> input2; private final KeySelector<T1, KEY> keySelector1; private final KeySelector<T2, KEY> keySelector2; private final TypeInformation<KEY> keyType; private final WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner; private final Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger; private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor; private final Time allowedLateness; private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream; protected WithWindow(DataStream<T1> input1, DataStream<T2> input2, KeySelector<T1, KEY> keySelector1, KeySelector<T2, KEY> keySelector2, TypeInformation<KEY> keyType, WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner, Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger, Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor, Time allowedLateness) { this.input1 = input1; this.input2 = input2; this.keySelector1 = keySelector1; this.keySelector2 = keySelector2; this.keyType = keyType; this.windowAssigner = windowAssigner; this.trigger = trigger; this.evictor = evictor; this.allowedLateness = allowedLateness; } @PublicEvolving public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, windowAssigner, newTrigger, evictor, allowedLateness); } @PublicEvolving public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, windowAssigner, trigger, newEvictor, allowedLateness); } @PublicEvolving public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, windowAssigner, trigger, evictor, newLateness); } public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) { TypeInformation<T> resultType = TypeExtractor.getCoGroupReturnTypes( function, input1.getType(), input2.getType(), "CoGroup", false); return apply(function, resultType); } public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) { //clean the closure function = input1.getExecutionEnvironment().clean(function); UnionTypeInfo<T1, T2> unionType = new UnionTypeInfo<>(input1.getType(), input2.getType()); UnionKeySelector<T1, T2, KEY> unionKeySelector = new UnionKeySelector<>(keySelector1, keySelector2); DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1 .map(new Input1Tagger<T1, T2>()) .setParallelism(input1.getParallelism()) .returns(unionType); DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2 .map(new Input2Tagger<T1, T2>()) .setParallelism(input2.getParallelism()) .returns(unionType); DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2); // we explicitly create the keyed stream to manually pass the key type information in windowedStream = new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType) .window(windowAssigner); if (trigger != null) { windowedStream.trigger(trigger); } if (evictor != null) { windowedStream.evictor(evictor); } if (allowedLateness != null) { windowedStream.allowedLateness(allowedLateness); } return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType); } //...... } //...... }
flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/functions/CoGroupFunction.javathis
@Public @FunctionalInterface public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable { /** * This method must be implemented to provide a user implementation of a * coGroup. It is called for each pair of element groups where the elements share the * same key. * * @param first The records from the first input. * @param second The records from the second. * @param out A collector to return elements. * * @throws Exception The function may throw Exceptions, which will cause the program to cancel, * and may trigger the recovery logic. */ void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception; }
flink-java-1.7.0-sources.jar!/org/apache/flink/api/java/operators/translation/WrappingFunction.java
@Internal public abstract class WrappingFunction<T extends Function> extends AbstractRichFunction { private static final long serialVersionUID = 1L; protected T wrappedFunction; protected WrappingFunction(T wrappedFunction) { this.wrappedFunction = wrappedFunction; } @Override public void open(Configuration parameters) throws Exception { FunctionUtils.openFunction(this.wrappedFunction, parameters); } @Override public void close() throws Exception { FunctionUtils.closeFunction(this.wrappedFunction); } @Override public void setRuntimeContext(RuntimeContext t) { super.setRuntimeContext(t); FunctionUtils.setFunctionRuntimeContext(this.wrappedFunction, t); } public T getWrappedFunction () { return this.wrappedFunction; } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java
/** * CoGroup function that does a nested-loop join to get the join result. */ private static class JoinCoGroupFunction<T1, T2, T> extends WrappingFunction<JoinFunction<T1, T2, T>> implements CoGroupFunction<T1, T2, T> { private static final long serialVersionUID = 1L; public JoinCoGroupFunction(JoinFunction<T1, T2, T> wrappedFunction) { super(wrappedFunction); } @Override public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception { for (T1 val1: first) { for (T2 val2: second) { out.collect(wrappedFunction.join(val1, val2)); } } } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/JoinedStreams.java
/** * CoGroup function that does a nested-loop join to get the join result. (FlatJoin version) */ private static class FlatJoinCoGroupFunction<T1, T2, T> extends WrappingFunction<FlatJoinFunction<T1, T2, T>> implements CoGroupFunction<T1, T2, T> { private static final long serialVersionUID = 1L; public FlatJoinCoGroupFunction(FlatJoinFunction<T1, T2, T> wrappedFunction) { super(wrappedFunction); } @Override public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception { for (T1 val1: first) { for (T2 val2: second) { wrappedFunction.join(val1, val2, out); } } } }
JoinFunction使用JoinCoGroupFunction包裝,FlatJoinFunction使用FlatJoinCoGroupFunction包裝
),而後去調用CoGroupedStreams的WithWindow的apply方法;JoinCoGroupFunction與FlatJoinCoGroupFunction都繼承了WrappingFunction(它繼承了AbstractRichFunction,這裏它覆蓋了父類的open、close、setRuntimeContext方法,用於管理wrappedFunction
),同時實現CoGroupFunction接口定義的coGroup方法,不一樣的是一個是包裝JoinFunction,一個是包裝FlatJoinFunction,不一樣的是後者是包裝FlatJoinFunction,於是join方法多傳遞了out參數