本文主要研究一下flink KeyedStream的intervalJoin操做html
DataStream<Integer> orangeStream = ... DataStream<Integer> greenStream = ... orangeStream .keyBy(<KeySelector>) .intervalJoin(greenStream.keyBy(<KeySelector>)) .between(Time.milliseconds(-2), Time.milliseconds(1)) .process (new ProcessJoinFunction<Integer, Integer, String(){ @Override public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) { out.collect(first + "," + second); } });
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.javajava
@Public public class KeyedStream<T, KEY> extends DataStream<T> { //...... @PublicEvolving public <T1> IntervalJoin<T, T1, KEY> intervalJoin(KeyedStream<T1, KEY> otherStream) { return new IntervalJoin<>(this, otherStream); } //...... }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.javaapache
@PublicEvolving public static class IntervalJoin<T1, T2, KEY> { private final KeyedStream<T1, KEY> streamOne; private final KeyedStream<T2, KEY> streamTwo; IntervalJoin( KeyedStream<T1, KEY> streamOne, KeyedStream<T2, KEY> streamTwo ) { this.streamOne = checkNotNull(streamOne); this.streamTwo = checkNotNull(streamTwo); } @PublicEvolving public IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) { TimeCharacteristic timeCharacteristic = streamOne.getExecutionEnvironment().getStreamTimeCharacteristic(); if (timeCharacteristic != TimeCharacteristic.EventTime) { throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time"); } checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join"); checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join"); return new IntervalJoined<>( streamOne, streamTwo, lowerBound.toMilliseconds(), upperBound.toMilliseconds(), true, true ); } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.javaapi
@PublicEvolving public static class IntervalJoined<IN1, IN2, KEY> { private final KeyedStream<IN1, KEY> left; private final KeyedStream<IN2, KEY> right; private final long lowerBound; private final long upperBound; private final KeySelector<IN1, KEY> keySelector1; private final KeySelector<IN2, KEY> keySelector2; private boolean lowerBoundInclusive; private boolean upperBoundInclusive; public IntervalJoined( KeyedStream<IN1, KEY> left, KeyedStream<IN2, KEY> right, long lowerBound, long upperBound, boolean lowerBoundInclusive, boolean upperBoundInclusive) { this.left = checkNotNull(left); this.right = checkNotNull(right); this.lowerBound = lowerBound; this.upperBound = upperBound; this.lowerBoundInclusive = lowerBoundInclusive; this.upperBoundInclusive = upperBoundInclusive; this.keySelector1 = left.getKeySelector(); this.keySelector2 = right.getKeySelector(); } @PublicEvolving public IntervalJoined<IN1, IN2, KEY> upperBoundExclusive() { this.upperBoundInclusive = false; return this; } @PublicEvolving public IntervalJoined<IN1, IN2, KEY> lowerBoundExclusive() { this.lowerBoundInclusive = false; return this; } @PublicEvolving public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction) { Preconditions.checkNotNull(processJoinFunction); final TypeInformation<OUT> outputType = TypeExtractor.getBinaryOperatorReturnType( processJoinFunction, ProcessJoinFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, left.getType(), right.getType(), Utils.getCallLocationName(), true ); return process(processJoinFunction, outputType); } @PublicEvolving public <OUT> SingleOutputStreamOperator<OUT> process( ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction, TypeInformation<OUT> outputType) { Preconditions.checkNotNull(processJoinFunction); Preconditions.checkNotNull(outputType); final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction); final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator = new IntervalJoinOperator<>( lowerBound, upperBound, lowerBoundInclusive, upperBoundInclusive, left.getType().createSerializer(left.getExecutionConfig()), right.getType().createSerializer(right.getExecutionConfig()), cleanedUdf ); return left .connect(right) .keyBy(keySelector1, keySelector2) .transform("Interval Join", outputType, operator); } }
本實例left爲orangeStream,right爲greenStream
)flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.javaide
@PublicEvolving public abstract class ProcessJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction { private static final long serialVersionUID = -2444626938039012398L; public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception; public abstract class Context { public abstract long getLeftTimestamp(); public abstract long getRightTimestamp(); public abstract long getTimestamp(); public abstract <X> void output(OutputTag<X> outputTag, X value); } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.javathis
@Internal public class IntervalJoinOperator<K, T1, T2, OUT> extends AbstractUdfStreamOperator<OUT, ProcessJoinFunction<T1, T2, OUT>> implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> { private static final long serialVersionUID = -5380774605111543454L; private static final Logger logger = LoggerFactory.getLogger(IntervalJoinOperator.class); private static final String LEFT_BUFFER = "LEFT_BUFFER"; private static final String RIGHT_BUFFER = "RIGHT_BUFFER"; private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER"; private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT"; private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT"; private final long lowerBound; private final long upperBound; private final TypeSerializer<T1> leftTypeSerializer; private final TypeSerializer<T2> rightTypeSerializer; private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer; private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer; private transient TimestampedCollector<OUT> collector; private transient ContextImpl context; private transient InternalTimerService<String> internalTimerService; public IntervalJoinOperator( long lowerBound, long upperBound, boolean lowerBoundInclusive, boolean upperBoundInclusive, TypeSerializer<T1> leftTypeSerializer, TypeSerializer<T2> rightTypeSerializer, ProcessJoinFunction<T1, T2, OUT> udf) { super(Preconditions.checkNotNull(udf)); Preconditions.checkArgument(lowerBound <= upperBound, "lowerBound <= upperBound must be fulfilled"); // Move buffer by +1 / -1 depending on inclusiveness in order not needing // to check for inclusiveness later on this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L; this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L; this.leftTypeSerializer = Preconditions.checkNotNull(leftTypeSerializer); this.rightTypeSerializer = Preconditions.checkNotNull(rightTypeSerializer); } @Override public void open() throws Exception { super.open(); collector = new TimestampedCollector<>(output); context = new ContextImpl(userFunction); internalTimerService = getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this); } @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( LEFT_BUFFER, LongSerializer.INSTANCE, new ListSeriawelizer<>(new BufferEntrySerializer<>(leftTypeSerializer)) )); this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>( RIGHT_BUFFER, LongSerializer.INSTANCE, new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer)) )); } @Override public void processElement1(StreamRecord<T1> record) throws Exception { processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true); } @Override public void processElement2(StreamRecord<T2> record) throws Exception { processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false); } @SuppressWarnings("unchecked") private <THIS, OTHER> void processElement( final StreamRecord<THIS> record, final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer, final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer, final long relativeLowerBound, final long relativeUpperBound, final boolean isLeft) throws Exception { final THIS ourValue = record.getValue(); final long ourTimestamp = record.getTimestamp(); if (ourTimestamp == Long.MIN_VALUE) { throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " + "interval stream joins need to have timestamps meaningful timestamps."); } if (isLate(ourTimestamp)) { return; } addToBuffer(ourBuffer, ourValue, ourTimestamp); for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) { final long timestamp = bucket.getKey(); if (timestamp < ourTimestamp + relativeLowerBound || timestamp > ourTimestamp + relativeUpperBound) { continue; } for (BufferEntry<OTHER> entry: bucket.getValue()) { if (isLeft) { collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp); } else { collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp); } } } long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp; if (isLeft) { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime); } else { internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime); } } private boolean isLate(long timestamp) { long currentWatermark = internalTimerService.currentWatermark(); return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark; } private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception { final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp); collector.setAbsoluteTimestamp(resultTimestamp); context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp); userFunction.processElement(left, right, context, collector); } @Override public void onEventTime(InternalTimer<K, String> timer) throws Exception { long timerTimestamp = timer.getTimestamp(); String namespace = timer.getNamespace(); logger.trace("onEventTime @ {}", timerTimestamp); switch (namespace) { case CLEANUP_NAMESPACE_LEFT: { long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound; logger.trace("Removing from left buffer @ {}", timestamp); leftBuffer.remove(timestamp); break; } case CLEANUP_NAMESPACE_RIGHT: { long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp; logger.trace("Removing from right buffer @ {}", timestamp); rightBuffer.remove(timestamp); break; } default: throw new RuntimeException("Invalid namespace " + namespace); } } @Override public void onProcessingTime(InternalTimer<K, String> timer) throws Exception { // do nothing. } //...... }
StreamOperator定義
)的open、initializeState方法,它在open方法裏頭建立了InternalTimerService,傳遞的Triggerable參數爲this,即自身實現的Triggerable接口;在initializeState方法裏頭建立了leftBuffer和rightBuffer兩個MapStateTwoInputStreamOperator接口定義的其餘一些方法在AbstractUdfStreamOperator的父類AbstractStreamOperator中有實現
);processElement一、processElement2方法內部都調用了processElement方法,只是傳遞的relativeLowerBound、relativeUpperBound、isLeft參數不一樣以及leftBuffer和rightBuffer的傳參順序不一樣對於processElement1來講ourBuffer爲leftBuffer,對於processElement2來講ourBuffer爲rightBuffer
);以後就是遍歷otherBuffer中的每一個元素,挨個判斷時間是否知足要求(即ourTimestamp + relativeLowerBound <= timestamp <= ourTimestamp + relativeUpperBound
),不知足要求的直接跳過,知足要求的就調用collect方法(collect方法裏頭執行的是userFunction.processElement,即調用用戶定義的ProcessJoinFunction的processElement方法
);以後就是計算cleanupTime,調用internalTimerService.registerEventTimeTimer註冊清理該element的timerStreamOperator定義
)的open、initializeState方法,它在open方法裏頭建立了InternalTimerService,傳遞的Triggerable參數爲this,即自身實現的Triggerable接口;在initializeState方法裏頭建立了leftBuffer和rightBuffer兩個MapState;它實現了TwoInputStreamOperator接口定義的processElement一、processElement2方法,processElement一、processElement2方法內部都調用了processElement方法,只是傳遞的relativeLowerBound、relativeUpperBound、isLeft參數不一樣以及leftBuffer和rightBuffer的傳參順序不一樣即ourTimestamp + relativeLowerBound <= timestamp <= ourTimestamp + relativeUpperBound
),不知足要求的直接跳過,知足要求的就調用collect方法(collect方法裏頭執行的是userFunction.processElement,即調用用戶定義的ProcessJoinFunction的processElement方法
);以後就是計算cleanupTime,調用internalTimerService.registerEventTimeTimer註冊清理該element的timer