聊聊flink KeyedStream的intervalJoin操做

本文主要研究一下flink KeyedStream的intervalJoin操做html

實例

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream
    .keyBy(<KeySelector>)
    .intervalJoin(greenStream.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(first + "," + second);
        }
    });

KeyedStream.intervalJoin

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.javajava

@Public
public class KeyedStream<T, KEY> extends DataStream<T> {
	//......

	@PublicEvolving
	public <T1> IntervalJoin<T, T1, KEY> intervalJoin(KeyedStream<T1, KEY> otherStream) {
		return new IntervalJoin<>(this, otherStream);
	}

	//......
}
  • KeyedStream的intervalJoin建立並返回IntervalJoin

IntervalJoin

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.javaapache

@PublicEvolving
	public static class IntervalJoin<T1, T2, KEY> {

		private final KeyedStream<T1, KEY> streamOne;
		private final KeyedStream<T2, KEY> streamTwo;

		IntervalJoin(
				KeyedStream<T1, KEY> streamOne,
				KeyedStream<T2, KEY> streamTwo
		) {
			this.streamOne = checkNotNull(streamOne);
			this.streamTwo = checkNotNull(streamTwo);
		}

		@PublicEvolving
		public IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) {

			TimeCharacteristic timeCharacteristic =
				streamOne.getExecutionEnvironment().getStreamTimeCharacteristic();

			if (timeCharacteristic != TimeCharacteristic.EventTime) {
				throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
			}

			checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join");
			checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join");

			return new IntervalJoined<>(
				streamOne,
				streamTwo,
				lowerBound.toMilliseconds(),
				upperBound.toMilliseconds(),
				true,
				true
			);
		}
	}
  • IntervalJoin提供了between操做,用於設置interval的lowerBound及upperBound,這裏能夠看到between方法裏頭對非TimeCharacteristic.EventTime的直接拋出UnsupportedTimeCharacteristicException;between操做建立並返回IntervalJoined

IntervalJoined

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.javaapi

@PublicEvolving
	public static class IntervalJoined<IN1, IN2, KEY> {

		private final KeyedStream<IN1, KEY> left;
		private final KeyedStream<IN2, KEY> right;

		private final long lowerBound;
		private final long upperBound;

		private final KeySelector<IN1, KEY> keySelector1;
		private final KeySelector<IN2, KEY> keySelector2;

		private boolean lowerBoundInclusive;
		private boolean upperBoundInclusive;

		public IntervalJoined(
				KeyedStream<IN1, KEY> left,
				KeyedStream<IN2, KEY> right,
				long lowerBound,
				long upperBound,
				boolean lowerBoundInclusive,
				boolean upperBoundInclusive) {

			this.left = checkNotNull(left);
			this.right = checkNotNull(right);

			this.lowerBound = lowerBound;
			this.upperBound = upperBound;

			this.lowerBoundInclusive = lowerBoundInclusive;
			this.upperBoundInclusive = upperBoundInclusive;

			this.keySelector1 = left.getKeySelector();
			this.keySelector2 = right.getKeySelector();
		}

		@PublicEvolving
		public IntervalJoined<IN1, IN2, KEY> upperBoundExclusive() {
			this.upperBoundInclusive = false;
			return this;
		}

		@PublicEvolving
		public IntervalJoined<IN1, IN2, KEY> lowerBoundExclusive() {
			this.lowerBoundInclusive = false;
			return this;
		}

		@PublicEvolving
		public <OUT> SingleOutputStreamOperator<OUT> process(ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction) {
			Preconditions.checkNotNull(processJoinFunction);

			final TypeInformation<OUT> outputType = TypeExtractor.getBinaryOperatorReturnType(
				processJoinFunction,
				ProcessJoinFunction.class,
				0,
				1,
				2,
				TypeExtractor.NO_INDEX,
				left.getType(),
				right.getType(),
				Utils.getCallLocationName(),
				true
			);

			return process(processJoinFunction, outputType);
		}

		@PublicEvolving
		public <OUT> SingleOutputStreamOperator<OUT> process(
				ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
				TypeInformation<OUT> outputType) {
			Preconditions.checkNotNull(processJoinFunction);
			Preconditions.checkNotNull(outputType);

			final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);

			final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
				new IntervalJoinOperator<>(
					lowerBound,
					upperBound,
					lowerBoundInclusive,
					upperBoundInclusive,
					left.getType().createSerializer(left.getExecutionConfig()),
					right.getType().createSerializer(right.getExecutionConfig()),
					cleanedUdf
				);

			return left
				.connect(right)
				.keyBy(keySelector1, keySelector2)
				.transform("Interval Join", outputType, operator);
		}
	}
  • IntervalJoined默認對lowerBound及upperBound是inclusive的,它也提供了lowerBoundExclusive、upperBoundExclusive來單獨設置爲exclusive;IntervalJoined提供了process操做,接收的是ProcessJoinFunction;process操做裏頭建立了IntervalJoinOperator,而後執行left.connect(right).keyBy(keySelector1, keySelector2).transform("Interval Join", outputType, operator),返回的是SingleOutputStreamOperator(本實例left爲orangeStream,right爲greenStream)

ProcessJoinFunction

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/co/ProcessJoinFunction.javaide

@PublicEvolving
public abstract class ProcessJoinFunction<IN1, IN2, OUT> extends AbstractRichFunction {

	private static final long serialVersionUID = -2444626938039012398L;

	public abstract void processElement(IN1 left, IN2 right, Context ctx, Collector<OUT> out) throws Exception;

	public abstract class Context {

		public abstract long getLeftTimestamp();

		public abstract long getRightTimestamp();

		public abstract long getTimestamp();

		public abstract <X> void output(OutputTag<X> outputTag, X value);
	}
}
  • ProcessJoinFunction繼承了AbstractRichFunction,它定義了processElement抽象方法,同時也定義了自身的Context對象,該對象定義了getLeftTimestamp、getRightTimestamp、getTimestamp、output四個抽象方法

IntervalJoinOperator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.javathis

@Internal
public class IntervalJoinOperator<K, T1, T2, OUT>
		extends AbstractUdfStreamOperator<OUT, ProcessJoinFunction<T1, T2, OUT>>
		implements TwoInputStreamOperator<T1, T2, OUT>, Triggerable<K, String> {

	private static final long serialVersionUID = -5380774605111543454L;

	private static final Logger logger = LoggerFactory.getLogger(IntervalJoinOperator.class);

	private static final String LEFT_BUFFER = "LEFT_BUFFER";
	private static final String RIGHT_BUFFER = "RIGHT_BUFFER";
	private static final String CLEANUP_TIMER_NAME = "CLEANUP_TIMER";
	private static final String CLEANUP_NAMESPACE_LEFT = "CLEANUP_LEFT";
	private static final String CLEANUP_NAMESPACE_RIGHT = "CLEANUP_RIGHT";

	private final long lowerBound;
	private final long upperBound;

	private final TypeSerializer<T1> leftTypeSerializer;
	private final TypeSerializer<T2> rightTypeSerializer;

	private transient MapState<Long, List<BufferEntry<T1>>> leftBuffer;
	private transient MapState<Long, List<BufferEntry<T2>>> rightBuffer;

	private transient TimestampedCollector<OUT> collector;
	private transient ContextImpl context;

	private transient InternalTimerService<String> internalTimerService;

	public IntervalJoinOperator(
			long lowerBound,
			long upperBound,
			boolean lowerBoundInclusive,
			boolean upperBoundInclusive,
			TypeSerializer<T1> leftTypeSerializer,
			TypeSerializer<T2> rightTypeSerializer,
			ProcessJoinFunction<T1, T2, OUT> udf) {

		super(Preconditions.checkNotNull(udf));

		Preconditions.checkArgument(lowerBound <= upperBound,
			"lowerBound <= upperBound must be fulfilled");

		// Move buffer by +1 / -1 depending on inclusiveness in order not needing
		// to check for inclusiveness later on
		this.lowerBound = (lowerBoundInclusive) ? lowerBound : lowerBound + 1L;
		this.upperBound = (upperBoundInclusive) ? upperBound : upperBound - 1L;

		this.leftTypeSerializer = Preconditions.checkNotNull(leftTypeSerializer);
		this.rightTypeSerializer = Preconditions.checkNotNull(rightTypeSerializer);
	}

	@Override
	public void open() throws Exception {
		super.open();

		collector = new TimestampedCollector<>(output);
		context = new ContextImpl(userFunction);
		internalTimerService =
			getInternalTimerService(CLEANUP_TIMER_NAME, StringSerializer.INSTANCE, this);
	}

	@Override
	public void initializeState(StateInitializationContext context) throws Exception {
		super.initializeState(context);

		this.leftBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
			LEFT_BUFFER,
			LongSerializer.INSTANCE,
			new ListSeriawelizer<>(new BufferEntrySerializer<>(leftTypeSerializer))
		));

		this.rightBuffer = context.getKeyedStateStore().getMapState(new MapStateDescriptor<>(
			RIGHT_BUFFER,
			LongSerializer.INSTANCE,
			new ListSerializer<>(new BufferEntrySerializer<>(rightTypeSerializer))
		));
	}

	@Override
	public void processElement1(StreamRecord<T1> record) throws Exception {
		processElement(record, leftBuffer, rightBuffer, lowerBound, upperBound, true);
	}

	@Override
	public void processElement2(StreamRecord<T2> record) throws Exception {
		processElement(record, rightBuffer, leftBuffer, -upperBound, -lowerBound, false);
	}

	@SuppressWarnings("unchecked")
	private <THIS, OTHER> void processElement(
			final StreamRecord<THIS> record,
			final MapState<Long, List<IntervalJoinOperator.BufferEntry<THIS>>> ourBuffer,
			final MapState<Long, List<IntervalJoinOperator.BufferEntry<OTHER>>> otherBuffer,
			final long relativeLowerBound,
			final long relativeUpperBound,
			final boolean isLeft) throws Exception {

		final THIS ourValue = record.getValue();
		final long ourTimestamp = record.getTimestamp();

		if (ourTimestamp == Long.MIN_VALUE) {
			throw new FlinkException("Long.MIN_VALUE timestamp: Elements used in " +
					"interval stream joins need to have timestamps meaningful timestamps.");
		}

		if (isLate(ourTimestamp)) {
			return;
		}

		addToBuffer(ourBuffer, ourValue, ourTimestamp);

		for (Map.Entry<Long, List<BufferEntry<OTHER>>> bucket: otherBuffer.entries()) {
			final long timestamp  = bucket.getKey();

			if (timestamp < ourTimestamp + relativeLowerBound ||
					timestamp > ourTimestamp + relativeUpperBound) {
				continue;
			}

			for (BufferEntry<OTHER> entry: bucket.getValue()) {
				if (isLeft) {
					collect((T1) ourValue, (T2) entry.element, ourTimestamp, timestamp);
				} else {
					collect((T1) entry.element, (T2) ourValue, timestamp, ourTimestamp);
				}
			}
		}

		long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + relativeUpperBound : ourTimestamp;
		if (isLeft) {
			internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, cleanupTime);
		} else {
			internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, cleanupTime);
		}
	}

	private boolean isLate(long timestamp) {
		long currentWatermark = internalTimerService.currentWatermark();
		return currentWatermark != Long.MIN_VALUE && timestamp < currentWatermark;
	}

	private void collect(T1 left, T2 right, long leftTimestamp, long rightTimestamp) throws Exception {
		final long resultTimestamp = Math.max(leftTimestamp, rightTimestamp);

		collector.setAbsoluteTimestamp(resultTimestamp);
		context.updateTimestamps(leftTimestamp, rightTimestamp, resultTimestamp);

		userFunction.processElement(left, right, context, collector);
	}

	@Override
	public void onEventTime(InternalTimer<K, String> timer) throws Exception {

		long timerTimestamp = timer.getTimestamp();
		String namespace = timer.getNamespace();

		logger.trace("onEventTime @ {}", timerTimestamp);

		switch (namespace) {
			case CLEANUP_NAMESPACE_LEFT: {
				long timestamp = (upperBound <= 0L) ? timerTimestamp : timerTimestamp - upperBound;
				logger.trace("Removing from left buffer @ {}", timestamp);
				leftBuffer.remove(timestamp);
				break;
			}
			case CLEANUP_NAMESPACE_RIGHT: {
				long timestamp = (lowerBound <= 0L) ? timerTimestamp + lowerBound : timerTimestamp;
				logger.trace("Removing from right buffer @ {}", timestamp);
				rightBuffer.remove(timestamp);
				break;
			}
			default:
				throw new RuntimeException("Invalid namespace " + namespace);
		}
	}

	@Override
	public void onProcessingTime(InternalTimer<K, String> timer) throws Exception {
		// do nothing.
	}

	//......
}
  • IntervalJoinOperator繼承了AbstractUdfStreamOperator抽象類,實現了TwoInputStreamOperator及Triggerable接口
  • IntervalJoinOperator覆蓋了AbstractUdfStreamOperator(StreamOperator定義)的open、initializeState方法,它在open方法裏頭建立了InternalTimerService,傳遞的Triggerable參數爲this,即自身實現的Triggerable接口;在initializeState方法裏頭建立了leftBuffer和rightBuffer兩個MapState
  • IntervalJoinOperator實現了TwoInputStreamOperator接口定義的processElement一、processElement2方法(TwoInputStreamOperator接口定義的其餘一些方法在AbstractUdfStreamOperator的父類AbstractStreamOperator中有實現);processElement一、processElement2方法內部都調用了processElement方法,只是傳遞的relativeLowerBound、relativeUpperBound、isLeft參數不一樣以及leftBuffer和rightBuffer的傳參順序不一樣
  • processElement方法裏頭實現了intervalJoin的時間匹配邏輯,它會從internalTimerService獲取currentWatermark,而後判斷element是否late,若是late直接返回,不然繼續往下執行;以後就是把element的value添加到ourBuffer中(對於processElement1來講ourBuffer爲leftBuffer,對於processElement2來講ourBuffer爲rightBuffer);以後就是遍歷otherBuffer中的每一個元素,挨個判斷時間是否知足要求(即ourTimestamp + relativeLowerBound <= timestamp <= ourTimestamp + relativeUpperBound),不知足要求的直接跳過,知足要求的就調用collect方法(collect方法裏頭執行的是userFunction.processElement,即調用用戶定義的ProcessJoinFunction的processElement方法);以後就是計算cleanupTime,調用internalTimerService.registerEventTimeTimer註冊清理該element的timer
  • IntervalJoinOperator實現了Triggerable接口定義的onEventTime及onProcessingTime方法,其中onProcessingTime不作任何操做,而onEventTime則會根據timestamp清理leftBuffer或者rightBuffer中的element

小結

  • flink的intervalJoin操做要求是KeyedStream,並且必須是TimeCharacteristic.EventTime;KeyedStream的intervalJoin建立並返回IntervalJoin;IntervalJoin提供了between操做,用於設置interval的lowerBound及upperBound,該操做建立並返回IntervalJoined
  • IntervalJoined提供了process操做,接收的是ProcessJoinFunction;process操做裏頭建立了IntervalJoinOperator,而後執行left.connect(right).keyBy(keySelector1, keySelector2).transform("Interval Join", outputType, operator),返回的是SingleOutputStreamOperator
  • IntervalJoinOperator繼承了AbstractUdfStreamOperator抽象類,實現了TwoInputStreamOperator及Triggerable接口;它覆蓋了AbstractUdfStreamOperator(StreamOperator定義)的open、initializeState方法,它在open方法裏頭建立了InternalTimerService,傳遞的Triggerable參數爲this,即自身實現的Triggerable接口;在initializeState方法裏頭建立了leftBuffer和rightBuffer兩個MapState;它實現了TwoInputStreamOperator接口定義的processElement一、processElement2方法,processElement一、processElement2方法內部都調用了processElement方法,只是傳遞的relativeLowerBound、relativeUpperBound、isLeft參數不一樣以及leftBuffer和rightBuffer的傳參順序不一樣
  • IntervalJoinOperator的processElement方法裏頭實現了intervalJoin的時間匹配邏輯,它首先判斷element是否late,若是late直接返回,以後將element添加到buffer中,而後對以後就是遍歷otherBuffer中的每一個元素,挨個判斷時間是否知足要求(即ourTimestamp + relativeLowerBound <= timestamp <= ourTimestamp + relativeUpperBound),不知足要求的直接跳過,知足要求的就調用collect方法(collect方法裏頭執行的是userFunction.processElement,即調用用戶定義的ProcessJoinFunction的processElement方法);以後就是計算cleanupTime,調用internalTimerService.registerEventTimeTimer註冊清理該element的timer
  • IntervalJoinOperator實現了Triggerable接口定義的onEventTime及onProcessingTime方法,其中onProcessingTime不作任何操做,而onEventTime則會根據timestamp清理leftBuffer或者rightBuffer中的element

doc

相關文章
相關標籤/搜索