本文主要研究一下flink DataStream的connect操做java
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.javaapache
@Public public class DataStream<T> { //...... public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) { return new ConnectedStreams<>(environment, this, dataStream); } @PublicEvolving public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) { return new BroadcastConnectedStream<>( environment, this, Preconditions.checkNotNull(broadcastStream), broadcastStream.getBroadcastStateDescriptor()); } //...... }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/ConnectedStreams.javaapi
@Public public class ConnectedStreams<IN1, IN2> { protected final StreamExecutionEnvironment environment; protected final DataStream<IN1> inputStream1; protected final DataStream<IN2> inputStream2; protected ConnectedStreams(StreamExecutionEnvironment env, DataStream<IN1> input1, DataStream<IN2> input2) { this.environment = requireNonNull(env); this.inputStream1 = requireNonNull(input1); this.inputStream2 = requireNonNull(input2); } public StreamExecutionEnvironment getExecutionEnvironment() { return environment; } public DataStream<IN1> getFirstInput() { return inputStream1; } public DataStream<IN2> getSecondInput() { return inputStream2; } public TypeInformation<IN1> getType1() { return inputStream1.getType(); } public TypeInformation<IN2> getType2() { return inputStream2.getType(); } public ConnectedStreams<IN1, IN2> keyBy(int keyPosition1, int keyPosition2) { return new ConnectedStreams<>(this.environment, inputStream1.keyBy(keyPosition1), inputStream2.keyBy(keyPosition2)); } public ConnectedStreams<IN1, IN2> keyBy(int[] keyPositions1, int[] keyPositions2) { return new ConnectedStreams<>(environment, inputStream1.keyBy(keyPositions1), inputStream2.keyBy(keyPositions2)); } public ConnectedStreams<IN1, IN2> keyBy(String field1, String field2) { return new ConnectedStreams<>(environment, inputStream1.keyBy(field1), inputStream2.keyBy(field2)); } public ConnectedStreams<IN1, IN2> keyBy(String[] fields1, String[] fields2) { return new ConnectedStreams<>(environment, inputStream1.keyBy(fields1), inputStream2.keyBy(fields2)); } public ConnectedStreams<IN1, IN2> keyBy(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) { return new ConnectedStreams<>(environment, inputStream1.keyBy(keySelector1), inputStream2.keyBy(keySelector2)); } public <KEY> ConnectedStreams<IN1, IN2> keyBy( KeySelector<IN1, KEY> keySelector1, KeySelector<IN2, KEY> keySelector2, TypeInformation<KEY> keyType) { return new ConnectedStreams<>( environment, inputStream1.keyBy(keySelector1, keyType), inputStream2.keyBy(keySelector2, keyType)); } public <R> SingleOutputStreamOperator<R> map(CoMapFunction<IN1, IN2, R> coMapper) { TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType( coMapper, CoMapFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, getType1(), getType2(), Utils.getCallLocationName(), true); return transform("Co-Map", outTypeInfo, new CoStreamMap<>(inputStream1.clean(coMapper))); } public <R> SingleOutputStreamOperator<R> flatMap( CoFlatMapFunction<IN1, IN2, R> coFlatMapper) { TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType( coFlatMapper, CoFlatMapFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, getType1(), getType2(), Utils.getCallLocationName(), true); return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper))); } @PublicEvolving public <R> SingleOutputStreamOperator<R> process( CoProcessFunction<IN1, IN2, R> coProcessFunction) { TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType( coProcessFunction, CoProcessFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, getType1(), getType2(), Utils.getCallLocationName(), true); return process(coProcessFunction, outTypeInfo); } @Internal public <R> SingleOutputStreamOperator<R> process( CoProcessFunction<IN1, IN2, R> coProcessFunction, TypeInformation<R> outputType) { TwoInputStreamOperator<IN1, IN2, R> operator; if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof KeyedStream)) { operator = new KeyedCoProcessOperator<>(inputStream1.clean(coProcessFunction)); } else { operator = new CoProcessOperator<>(inputStream1.clean(coProcessFunction)); } return transform("Co-Process", outputType, operator); } @PublicEvolving public <R> SingleOutputStreamOperator<R> transform(String functionName, TypeInformation<R> outTypeInfo, TwoInputStreamOperator<IN1, IN2, R> operator) { // read the output type of the input Transforms to coax out errors about MissingTypeInfo inputStream1.getType(); inputStream2.getType(); TwoInputTransformation<IN1, IN2, R> transform = new TwoInputTransformation<>( inputStream1.getTransformation(), inputStream2.getTransformation(), functionName, operator, outTypeInfo, environment.getParallelism()); if (inputStream1 instanceof KeyedStream && inputStream2 instanceof KeyedStream) { KeyedStream<IN1, ?> keyedInput1 = (KeyedStream<IN1, ?>) inputStream1; KeyedStream<IN2, ?> keyedInput2 = (KeyedStream<IN2, ?>) inputStream2; TypeInformation<?> keyType1 = keyedInput1.getKeyType(); TypeInformation<?> keyType2 = keyedInput2.getKeyType(); if (!(keyType1.canEqual(keyType2) && keyType1.equals(keyType2))) { throw new UnsupportedOperationException("Key types if input KeyedStreams " + "don't match: " + keyType1 + " and " + keyType2 + "."); } transform.setStateKeySelectors(keyedInput1.getKeySelector(), keyedInput2.getKeySelector()); transform.setStateKeyType(keyType1); } @SuppressWarnings({ "unchecked", "rawtypes" }) SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, transform); getExecutionEnvironment().addOperator(transform); return returnStream; } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/co/CoMapFunction.javaapp
@Public public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable { OUT map1(IN1 value) throws Exception; OUT map2(IN2 value) throws Exception; }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.javaui
@Public public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable { void flatMap1(IN1 value, Collector<OUT> out) throws Exception; void flatMap2(IN2 value, Collector<OUT> out) throws Exception; }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/co/CoProcessFunction.javathis
@PublicEvolving public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction { private static final long serialVersionUID = 1L; public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception; public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception; public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {} public abstract class Context { public abstract Long timestamp(); public abstract TimerService timerService(); public abstract <X> void output(OutputTag<X> outputTag, X value); } public abstract class OnTimerContext extends Context { /** * The {@link TimeDomain} of the firing timer. */ public abstract TimeDomain timeDomain(); } }