本文主要研究一下flink DataStream的iterate操做java
IterativeStream<Long> iteration = initialStream.iterate(); DataStream<Long> iterationBody = iteration.map (/*do something*/); DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Long value) throws Exception { return value > 0; } }); iteration.closeWith(feedback); DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Long value) throws Exception { return value <= 0; } });
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.javaapache
@Public public class DataStream<T> { //...... @PublicEvolving public IterativeStream<T> iterate() { return new IterativeStream<>(this, 0); } @PublicEvolving public IterativeStream<T> iterate(long maxWaitTimeMillis) { return new IterativeStream<>(this, maxWaitTimeMillis); } //...... }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/IterativeStream.javaapi
@PublicEvolving public class IterativeStream<T> extends SingleOutputStreamOperator<T> { // We store these so that we can create a co-iteration if we need to private DataStream<T> originalInput; private long maxWaitTime; protected IterativeStream(DataStream<T> dataStream, long maxWaitTime) { super(dataStream.getExecutionEnvironment(), new FeedbackTransformation<>(dataStream.getTransformation(), maxWaitTime)); this.originalInput = dataStream; this.maxWaitTime = maxWaitTime; setBufferTimeout(dataStream.environment.getBufferTimeout()); } @SuppressWarnings({ "unchecked", "rawtypes" }) public DataStream<T> closeWith(DataStream<T> feedbackStream) { Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors(); if (!predecessors.contains(this.transformation)) { throw new UnsupportedOperationException( "Cannot close an iteration with a feedback DataStream that does not originate from said iteration."); } ((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation()); return feedbackStream; } public <F> ConnectedIterativeStreams<T, F> withFeedbackType(Class<F> feedbackTypeClass) { return withFeedbackType(TypeInformation.of(feedbackTypeClass)); } public <F> ConnectedIterativeStreams<T, F> withFeedbackType(TypeHint<F> feedbackTypeHint) { return withFeedbackType(TypeInformation.of(feedbackTypeHint)); } public <F> ConnectedIterativeStreams<T, F> withFeedbackType(TypeInformation<F> feedbackType) { return new ConnectedIterativeStreams<>(originalInput, feedbackType, maxWaitTime); } @Public public static class ConnectedIterativeStreams<I, F> extends ConnectedStreams<I, F> { private CoFeedbackTransformation<F> coFeedbackTransformation; public ConnectedIterativeStreams(DataStream<I> input, TypeInformation<F> feedbackType, long waitTime) { super(input.getExecutionEnvironment(), input, new DataStream<>(input.getExecutionEnvironment(), new CoFeedbackTransformation<>(input.getParallelism(), feedbackType, waitTime))); this.coFeedbackTransformation = (CoFeedbackTransformation<F>) getSecondInput().getTransformation(); } public DataStream<F> closeWith(DataStream<F> feedbackStream) { Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors(); if (!predecessors.contains(this.coFeedbackTransformation)) { throw new UnsupportedOperationException( "Cannot close an iteration with a feedback DataStream that does not originate from said iteration."); } coFeedbackTransformation.addFeedbackEdge(feedbackStream.getTransformation()); return feedbackStream; } private UnsupportedOperationException groupingException = new UnsupportedOperationException("Cannot change the input partitioning of an" + "iteration head directly. Apply the partitioning on the input and" + "feedback streams instead."); @Override public ConnectedStreams<I, F> keyBy(int[] keyPositions1, int[] keyPositions2) { throw groupingException; } @Override public ConnectedStreams<I, F> keyBy(String field1, String field2) { throw groupingException; } @Override public ConnectedStreams<I, F> keyBy(String[] fields1, String[] fields2) { throw groupingException; } @Override public ConnectedStreams<I, F> keyBy(KeySelector<I, ?> keySelector1, KeySelector<F, ?> keySelector2) { throw groupingException; } @Override public <KEY> ConnectedStreams<I, F> keyBy(KeySelector<I, KEY> keySelector1, KeySelector<F, KEY> keySelector2, TypeInformation<KEY> keyType) { throw groupingException; } } }
能夠理解爲迴流,或者相似遞歸的操做,filter控制的是遞歸的條件,經過filter的elements會從新進入IterativeStream的頭部繼續參與後面的運算操做
);withFeedbackType方法建立了ConnectedIterativeStreamsflink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/transformations/FeedbackTransformation.javaide
@Internal public class FeedbackTransformation<T> extends StreamTransformation<T> { private final StreamTransformation<T> input; private final List<StreamTransformation<T>> feedbackEdges; private final Long waitTime; public FeedbackTransformation(StreamTransformation<T> input, Long waitTime) { super("Feedback", input.getOutputType(), input.getParallelism()); this.input = input; this.waitTime = waitTime; this.feedbackEdges = Lists.newArrayList(); } public StreamTransformation<T> getInput() { return input; } public void addFeedbackEdge(StreamTransformation<T> transform) { if (transform.getParallelism() != this.getParallelism()) { throw new UnsupportedOperationException( "Parallelism of the feedback stream must match the parallelism of the original" + " stream. Parallelism of original stream: " + this.getParallelism() + "; parallelism of feedback stream: " + transform.getParallelism() + ". Parallelism can be modified using DataStream#setParallelism() method"); } feedbackEdges.add(transform); } public List<StreamTransformation<T>> getFeedbackEdges() { return feedbackEdges; } public Long getWaitTime() { return waitTime; } @Override public final void setChainingStrategy(ChainingStrategy strategy) { throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation."); } @Override public Collection<StreamTransformation<?>> getTransitivePredecessors() { List<StreamTransformation<?>> result = Lists.newArrayList(); result.add(this); result.addAll(input.getTransitivePredecessors()); return result; } }