本文主要研究一下flink KeyedStream的aggregation操做java
@Test public void testMax() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); WordCount[] data = new WordCount[]{new WordCount(1,"Hello", 1), new WordCount(1,"World", 3), new WordCount(2,"Hello", 1)}; env.fromElements(data) .keyBy("word") .max("frequency") .addSink(new SinkFunction<WordCount>() { @Override public void invoke(WordCount value, Context context) throws Exception { LOGGER.info("value:{}",value); } }); env.execute("testMax"); }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.javaapache
public SingleOutputStreamOperator<T> sum(int positionToSum) { return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig())); } public SingleOutputStreamOperator<T> sum(String field) { return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig())); } public SingleOutputStreamOperator<T> max(int positionToMax) { return aggregate(new ComparableAggregator<>(positionToMax, getType(), AggregationFunction.AggregationType.MAX, getExecutionConfig())); } public SingleOutputStreamOperator<T> max(String field) { return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAX, false, getExecutionConfig())); } public SingleOutputStreamOperator<T> min(int positionToMin) { return aggregate(new ComparableAggregator<>(positionToMin, getType(), AggregationFunction.AggregationType.MIN, getExecutionConfig())); } public SingleOutputStreamOperator<T> min(String field) { return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MIN, false, getExecutionConfig())); } public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy) { return this.maxBy(positionToMaxBy, true); } public SingleOutputStreamOperator<T> maxBy(String positionToMaxBy) { return this.maxBy(positionToMaxBy, true); } public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy, boolean first) { return aggregate(new ComparableAggregator<>(positionToMaxBy, getType(), AggregationFunction.AggregationType.MAXBY, first, getExecutionConfig())); } public SingleOutputStreamOperator<T> maxBy(String field, boolean first) { return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAXBY, first, getExecutionConfig())); } public SingleOutputStreamOperator<T> minBy(int positionToMinBy) { return this.minBy(positionToMinBy, true); } public SingleOutputStreamOperator<T> minBy(String positionToMinBy) { return this.minBy(positionToMinBy, true); } public SingleOutputStreamOperator<T> minBy(int positionToMinBy, boolean first) { return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationFunction.AggregationType.MINBY, first, getExecutionConfig())); } public SingleOutputStreamOperator<T> minBy(String field, boolean first) { return aggregate(new ComparableAggregator(field, getType(), AggregationFunction.AggregationType.MINBY, first, getExecutionConfig())); } protected SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregate) { StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>( clean(aggregate), getType().createSerializer(getExecutionConfig())); return transform("Keyed Aggregation", getType(), operator); }
boolean
)參數,該參數用於指定在碰到多個compare值相等時,是否取第一個返回flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.javaapi
@Internal public class ComparableAggregator<T> extends AggregationFunction<T> { private static final long serialVersionUID = 1L; private Comparator comparator; private boolean byAggregate; private boolean first; private final FieldAccessor<T, Object> fieldAccessor; private ComparableAggregator(AggregationType aggregationType, FieldAccessor<T, Object> fieldAccessor, boolean first) { this.comparator = Comparator.getForAggregation(aggregationType); this.byAggregate = (aggregationType == AggregationType.MAXBY) || (aggregationType == AggregationType.MINBY); this.first = first; this.fieldAccessor = fieldAccessor; } public ComparableAggregator(int positionToAggregate, TypeInformation<T> typeInfo, AggregationType aggregationType, ExecutionConfig config) { this(positionToAggregate, typeInfo, aggregationType, false, config); } public ComparableAggregator(int positionToAggregate, TypeInformation<T> typeInfo, AggregationType aggregationType, boolean first, ExecutionConfig config) { this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, positionToAggregate, config), first); } public ComparableAggregator(String field, TypeInformation<T> typeInfo, AggregationType aggregationType, boolean first, ExecutionConfig config) { this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, field, config), first); } @SuppressWarnings("unchecked") @Override public T reduce(T value1, T value2) throws Exception { Comparable<Object> o1 = (Comparable<Object>) fieldAccessor.get(value1); Object o2 = fieldAccessor.get(value2); int c = comparator.isExtremal(o1, o2); if (byAggregate) { // if they are the same we choose based on whether we want to first or last // element with the min/max. if (c == 0) { return first ? value1 : value2; } return c == 1 ? value1 : value2; } else { if (c == 0) { value1 = fieldAccessor.set(value1, o2); } return value1; } } }
比較字段的值value1小於等於value2的狀況
),則使用反射方法將value2的比較字段的值更新到value1,最後都是返回value1@Internal public abstract class AggregationFunction<T> implements ReduceFunction<T> { private static final long serialVersionUID = 1L; /** * Aggregation types that can be used on a windowed stream or keyed stream. */ public enum AggregationType { SUM, MIN, MAX, MINBY, MAXBY, } }
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/aggregation/Comparator.javaide
@Internal public abstract class Comparator implements Serializable { private static final long serialVersionUID = 1L; public abstract <R> int isExtremal(Comparable<R> o1, R o2); public static Comparator getForAggregation(AggregationType type) { switch (type) { case MAX: return new MaxComparator(); case MIN: return new MinComparator(); case MINBY: return new MinByComparator(); case MAXBY: return new MaxByComparator(); default: throw new IllegalArgumentException("Unsupported aggregation type."); } } private static class MaxComparator extends Comparator { private static final long serialVersionUID = 1L; @Override public <R> int isExtremal(Comparable<R> o1, R o2) { return o1.compareTo(o2) > 0 ? 1 : 0; } } private static class MaxByComparator extends Comparator { private static final long serialVersionUID = 1L; @Override public <R> int isExtremal(Comparable<R> o1, R o2) { int c = o1.compareTo(o2); if (c > 0) { return 1; } if (c == 0) { return 0; } else { return -1; } } } private static class MinByComparator extends Comparator { private static final long serialVersionUID = 1L; @Override public <R> int isExtremal(Comparable<R> o1, R o2) { int c = o1.compareTo(o2); if (c < 0) { return 1; } if (c == 0) { return 0; } else { return -1; } } } private static class MinComparator extends Comparator { private static final long serialVersionUID = 1L; @Override public <R> int isExtremal(Comparable<R> o1, R o2) { return o1.compareTo(o2) < 0 ? 1 : 0; } } }
boolean
)參數,專門用於在比較值爲的0的時候選擇返回哪一個元素;而reduce方法對於非byAggregate操做,始終返回的是value1,在比較值小於等於的時候,使用反射更新value1,而後返回value1