聊聊flink DataStream的iterate操做

本文主要研究一下flink DataStream的iterate操做java

實例

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value <= 0;
    }
});
  • 本實例展現了IterativeStream的一些基本用法,使用iterate建立IterativeStream,使用IterativeStream的closeWith方法來關閉feedbackStream

DataStream.iterate

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.javaapache

@Public
public class DataStream<T> {
	//......

	@PublicEvolving
	public IterativeStream<T> iterate() {
		return new IterativeStream<>(this, 0);
	}

	@PublicEvolving
	public IterativeStream<T> iterate(long maxWaitTimeMillis) {
		return new IterativeStream<>(this, maxWaitTimeMillis);
	}

	//......
}
  • DataStream提供了兩個iterate方法,它們建立並返回IterativeStream,無參的iterate方法其maxWaitTimeMillis爲0

IterativeStream

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/IterativeStream.javaapi

@PublicEvolving
public class IterativeStream<T> extends SingleOutputStreamOperator<T> {

	// We store these so that we can create a co-iteration if we need to
	private DataStream<T> originalInput;
	private long maxWaitTime;

	protected IterativeStream(DataStream<T> dataStream, long maxWaitTime) {
		super(dataStream.getExecutionEnvironment(),
				new FeedbackTransformation<>(dataStream.getTransformation(), maxWaitTime));
		this.originalInput = dataStream;
		this.maxWaitTime = maxWaitTime;
		setBufferTimeout(dataStream.environment.getBufferTimeout());
	}

	@SuppressWarnings({ "unchecked", "rawtypes" })
	public DataStream<T> closeWith(DataStream<T> feedbackStream) {

		Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();

		if (!predecessors.contains(this.transformation)) {
			throw new UnsupportedOperationException(
					"Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
		}

		((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation());

		return feedbackStream;
	}

	public <F> ConnectedIterativeStreams<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
		return withFeedbackType(TypeInformation.of(feedbackTypeClass));
	}

	public <F> ConnectedIterativeStreams<T, F> withFeedbackType(TypeHint<F> feedbackTypeHint) {
		return withFeedbackType(TypeInformation.of(feedbackTypeHint));
	}

	public <F> ConnectedIterativeStreams<T, F> withFeedbackType(TypeInformation<F> feedbackType) {
		return new ConnectedIterativeStreams<>(originalInput, feedbackType, maxWaitTime);
	}

	@Public
	public static class ConnectedIterativeStreams<I, F> extends ConnectedStreams<I, F> {

		private CoFeedbackTransformation<F> coFeedbackTransformation;

		public ConnectedIterativeStreams(DataStream<I> input,
				TypeInformation<F> feedbackType,
				long waitTime) {
			super(input.getExecutionEnvironment(),
					input,
					new DataStream<>(input.getExecutionEnvironment(),
							new CoFeedbackTransformation<>(input.getParallelism(),
									feedbackType,
									waitTime)));
			this.coFeedbackTransformation = (CoFeedbackTransformation<F>) getSecondInput().getTransformation();
		}

		public DataStream<F> closeWith(DataStream<F> feedbackStream) {

			Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors();

			if (!predecessors.contains(this.coFeedbackTransformation)) {
				throw new UnsupportedOperationException(
						"Cannot close an iteration with a feedback DataStream that does not originate from said iteration.");
			}

			coFeedbackTransformation.addFeedbackEdge(feedbackStream.getTransformation());

			return feedbackStream;
		}

		private UnsupportedOperationException groupingException =
				new UnsupportedOperationException("Cannot change the input partitioning of an" +
						"iteration head directly. Apply the partitioning on the input and" +
						"feedback streams instead.");

		@Override
		public ConnectedStreams<I, F> keyBy(int[] keyPositions1, int[] keyPositions2) {
			throw groupingException;
		}

		@Override
		public ConnectedStreams<I, F> keyBy(String field1, String field2) {
			throw groupingException;
		}

		@Override
		public ConnectedStreams<I, F> keyBy(String[] fields1, String[] fields2) {
			throw groupingException;
		}

		@Override
		public ConnectedStreams<I, F> keyBy(KeySelector<I, ?> keySelector1, KeySelector<F, ?> keySelector2) {
			throw groupingException;
		}

		@Override
		public <KEY> ConnectedStreams<I, F> keyBy(KeySelector<I, KEY> keySelector1, KeySelector<F, KEY> keySelector2, TypeInformation<KEY> keyType) {
			throw groupingException;
		}
	}
}
  • IterativeStream繼承了SingleOutputStreamOperator,它的構造器接收兩個參數,一個是originalInput,一個是maxWaitTime;它根據dataStream.getTransformation()及maxWaitTime建立FeedbackTransformation;構造器同時會根據dataStream.environment.getBufferTimeout()參數來設置transformation的bufferTimeout
  • IterativeStream主要提供了兩個方法,一個是closeWith方法,用於close iteration,它主要用於定義要被feedback到iteration頭部的這部分iteration(能夠理解爲迴流,或者相似遞歸的操做,filter控制的是遞歸的條件,經過filter的elements會從新進入IterativeStream的頭部繼續參與後面的運算操做);withFeedbackType方法建立了ConnectedIterativeStreams
  • ConnectedIterativeStreams繼承了ConnectedStreams,它容許要被feedback的iteration的類型與originalInput的類型不同,它也定義了closeWith方法,可是它覆蓋了ConnectedStreams的keyBy方法,拋出UnsupportedOperationException異常

FeedbackTransformation

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/transformations/FeedbackTransformation.javaide

@Internal
public class FeedbackTransformation<T> extends StreamTransformation<T> {

	private final StreamTransformation<T> input;

	private final List<StreamTransformation<T>> feedbackEdges;

	private final Long waitTime;

	public FeedbackTransformation(StreamTransformation<T> input, Long waitTime) {
		super("Feedback", input.getOutputType(), input.getParallelism());
		this.input = input;
		this.waitTime = waitTime;
		this.feedbackEdges = Lists.newArrayList();
	}

	public StreamTransformation<T> getInput() {
		return input;
	}

	public void addFeedbackEdge(StreamTransformation<T> transform) {

		if (transform.getParallelism() != this.getParallelism()) {
			throw new UnsupportedOperationException(
					"Parallelism of the feedback stream must match the parallelism of the original" +
							" stream. Parallelism of original stream: " + this.getParallelism() +
							"; parallelism of feedback stream: " + transform.getParallelism() +
							". Parallelism can be modified using DataStream#setParallelism() method");
		}

		feedbackEdges.add(transform);
	}

	public List<StreamTransformation<T>> getFeedbackEdges() {
		return feedbackEdges;
	}

	public Long getWaitTime() {
		return waitTime;
	}

	@Override
	public final void setChainingStrategy(ChainingStrategy strategy) {
		throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation.");
	}

	@Override
	public Collection<StreamTransformation<?>> getTransitivePredecessors() {
		List<StreamTransformation<?>> result = Lists.newArrayList();
		result.add(this);
		result.addAll(input.getTransitivePredecessors());
		return result;
	}
}
  • FeedbackTransformation繼承了StreamTransformation,它有feedbackEdges、waitTime等屬性
  • addFeedbackEdge方法用於添加一個a feedback edge,IterativeStream的closeWith方法會調用addFeedbackEdge來添加一個StreamTransformation
  • waitTime指定的是feedback operator等待feedback elements的時間,一旦過了waitTime則operation會關閉,再也不接受新的feedback elements

小結

  • DataStream提供了兩個iterate方法,它們建立並返回IterativeStream,無參的iterate方法其maxWaitTimeMillis爲0
  • IterativeStream的構造器接收兩個參數,一個是originalInput,一個是maxWaitTime;它根據dataStream.getTransformation()及maxWaitTime建立FeedbackTransformation;構造器同時會根據dataStream.environment.getBufferTimeout()參數來設置transformation的bufferTimeout;FeedbackTransformation繼承了StreamTransformation,它有feedbackEdges、waitTime等屬性,waitTime指定的是feedback operator等待feedback elements的時間,一旦過了waitTime則operation會關閉,再也不接受新的feedback elements
  • IterativeStream繼承了SingleOutputStreamOperator,它主要提供了兩個方法,一個是closeWith方法,用於close iteration,它主要用於定義要被feedback到iteration頭部的這部分iteration;withFeedbackType方法建立了ConnectedIterativeStreams,ConnectedIterativeStreams繼承了ConnectedStreams,它容許要被feedback的iteration的類型與originalInput的類型不同,它也定義了closeWith方法,可是它覆蓋了ConnectedStreams的keyBy方法,拋出UnsupportedOperationException異常

doc

相關文章
相關標籤/搜索