[case52]聊聊flink KeyedStream的aggregation操做

本文主要研究一下flink KeyedStream的aggregation操做java

實例

@Test
    public void testMax() throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        WordCount[] data = new WordCount[]{new WordCount(1,"Hello", 1), new
                WordCount(1,"World", 3), new WordCount(2,"Hello", 1)};
        env.fromElements(data)
                .keyBy("word")
                .max("frequency")
                .addSink(new SinkFunction<WordCount>() {
                    @Override
                    public void invoke(WordCount value, Context context) throws Exception {
                        LOGGER.info("value:{}",value);
                    }
                });
        env.execute("testMax");
    }
  • 這裏先對word字段進行keyBy操做,而後再經過KeyedStream的max方法按frequency字段取最大的WordCount

KeyedStream.aggregate

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/KeyedStream.javaapache

public SingleOutputStreamOperator<T> sum(int positionToSum) {
        return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> sum(String field) {
        return aggregate(new SumAggregator<>(field, getType(), getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> max(int positionToMax) {
        return aggregate(new ComparableAggregator<>(positionToMax, getType(), AggregationFunction.AggregationType.MAX,
                getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> max(String field) {
        return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAX,
                false, getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> min(int positionToMin) {
        return aggregate(new ComparableAggregator<>(positionToMin, getType(), AggregationFunction.AggregationType.MIN,
                getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> min(String field) {
        return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MIN,
                false, getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy) {
        return this.maxBy(positionToMaxBy, true);
    }

    public SingleOutputStreamOperator<T> maxBy(String positionToMaxBy) {
        return this.maxBy(positionToMaxBy, true);
    }

    public SingleOutputStreamOperator<T> maxBy(int positionToMaxBy, boolean first) {
        return aggregate(new ComparableAggregator<>(positionToMaxBy, getType(), AggregationFunction.AggregationType.MAXBY, first,
                getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> maxBy(String field, boolean first) {
        return aggregate(new ComparableAggregator<>(field, getType(), AggregationFunction.AggregationType.MAXBY,
                first, getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> minBy(int positionToMinBy) {
        return this.minBy(positionToMinBy, true);
    }

    public SingleOutputStreamOperator<T> minBy(String positionToMinBy) {
        return this.minBy(positionToMinBy, true);
    }

    public SingleOutputStreamOperator<T> minBy(int positionToMinBy, boolean first) {
        return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationFunction.AggregationType.MINBY, first,
                getExecutionConfig()));
    }

    public SingleOutputStreamOperator<T> minBy(String field, boolean first) {
        return aggregate(new ComparableAggregator(field, getType(), AggregationFunction.AggregationType.MINBY,
                first, getExecutionConfig()));
    }

    protected SingleOutputStreamOperator<T> aggregate(AggregationFunction<T> aggregate) {
        StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(
                clean(aggregate), getType().createSerializer(getExecutionConfig()));
        return transform("Keyed Aggregation", getType(), operator);
    }
  • KeyedStream的aggregation方法是protected修飾的,sum、max、min、maxBy、minBy這幾個方法實際都是調用aggregate方法,只是它們建立的ComparableAggregator的AggregationType不同,分別是SUM, MAX, MIN, MAXBY, MINBY
  • 每一個sum、max、min、maxBy、minBy都有兩個重載方法,一個是int類型的參數,一個是String類型的參數
  • maxBy、minBy比sum、max、min多了first(boolean)參數,該參數用於指定在碰到多個compare值相等時,是否取第一個返回

ComparableAggregator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/aggregation/ComparableAggregator.javaapi

@Internal
public class ComparableAggregator<T> extends AggregationFunction<T> {

    private static final long serialVersionUID = 1L;

    private Comparator comparator;
    private boolean byAggregate;
    private boolean first;
    private final FieldAccessor<T, Object> fieldAccessor;

    private ComparableAggregator(AggregationType aggregationType, FieldAccessor<T, Object> fieldAccessor, boolean first) {
        this.comparator = Comparator.getForAggregation(aggregationType);
        this.byAggregate = (aggregationType == AggregationType.MAXBY) || (aggregationType == AggregationType.MINBY);
        this.first = first;
        this.fieldAccessor = fieldAccessor;
    }

    public ComparableAggregator(int positionToAggregate,
            TypeInformation<T> typeInfo,
            AggregationType aggregationType,
            ExecutionConfig config) {
        this(positionToAggregate, typeInfo, aggregationType, false, config);
    }

    public ComparableAggregator(int positionToAggregate,
            TypeInformation<T> typeInfo,
            AggregationType aggregationType,
            boolean first,
            ExecutionConfig config) {
        this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, positionToAggregate, config), first);
    }

    public ComparableAggregator(String field,
            TypeInformation<T> typeInfo,
            AggregationType aggregationType,
            boolean first,
            ExecutionConfig config) {
        this(aggregationType, FieldAccessorFactory.getAccessor(typeInfo, field, config), first);
    }

    @SuppressWarnings("unchecked")
    @Override
    public T reduce(T value1, T value2) throws Exception {
        Comparable<Object> o1 = (Comparable<Object>) fieldAccessor.get(value1);
        Object o2 = fieldAccessor.get(value2);

        int c = comparator.isExtremal(o1, o2);

        if (byAggregate) {
            // if they are the same we choose based on whether we want to first or last
            // element with the min/max.
            if (c == 0) {
                return first ? value1 : value2;
            }

            return c == 1 ? value1 : value2;

        } else {
            if (c == 0) {
                value1 = fieldAccessor.set(value1, o2);
            }
            return value1;
        }
    }
}
  • ComparableAggregator繼承了AggregationFunction,而AggregationFunction則實現了ReduceFunction接口,這裏ComparableAggregator實現的reduce方法,它首先借助Comparator來比較兩個對象,而後根據是不是byAggregate作不一樣處理,若是是byAggregate,則在比較值爲0時,判斷是否返回最早遇到的元素,若是是則返回value1,不然返回value2,比較值非0時,則取比較值最大的元素返回;若是不是byAggregate,則若是比較值爲0(比較字段的值value1小於等於value2的狀況),則使用反射方法將value2的比較字段的值更新到value1,最後都是返回value1

AggregationFunction

@Internal
public abstract class AggregationFunction<T> implements ReduceFunction<T> {
    private static final long serialVersionUID = 1L;

    /**
     * Aggregation types that can be used on a windowed stream or keyed stream.
     */
    public enum AggregationType {
        SUM, MIN, MAX, MINBY, MAXBY,
    }
}
  • AggregationFunction聲明實現了ReduceFunction,同時定義了五種類型的AggregationType,分別是SUM, MIN, MAX, MINBY, MAXBY

Comparator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/aggregation/Comparator.javaide

@Internal
public abstract class Comparator implements Serializable {

    private static final long serialVersionUID = 1L;

    public abstract <R> int isExtremal(Comparable<R> o1, R o2);

    public static Comparator getForAggregation(AggregationType type) {
        switch (type) {
        case MAX:
            return new MaxComparator();
        case MIN:
            return new MinComparator();
        case MINBY:
            return new MinByComparator();
        case MAXBY:
            return new MaxByComparator();
        default:
            throw new IllegalArgumentException("Unsupported aggregation type.");
        }
    }

    private static class MaxComparator extends Comparator {

        private static final long serialVersionUID = 1L;

        @Override
        public <R> int isExtremal(Comparable<R> o1, R o2) {
            return o1.compareTo(o2) > 0 ? 1 : 0;
        }

    }

    private static class MaxByComparator extends Comparator {

        private static final long serialVersionUID = 1L;

        @Override
        public <R> int isExtremal(Comparable<R> o1, R o2) {
            int c = o1.compareTo(o2);
            if (c > 0) {
                return 1;
            }
            if (c == 0) {
                return 0;
            } else {
                return -1;
            }
        }

    }

    private static class MinByComparator extends Comparator {

        private static final long serialVersionUID = 1L;

        @Override
        public <R> int isExtremal(Comparable<R> o1, R o2) {
            int c = o1.compareTo(o2);
            if (c < 0) {
                return 1;
            }
            if (c == 0) {
                return 0;
            } else {
                return -1;
            }
        }

    }

    private static class MinComparator extends Comparator {

        private static final long serialVersionUID = 1L;

        @Override
        public <R> int isExtremal(Comparable<R> o1, R o2) {
            return o1.compareTo(o2) < 0 ? 1 : 0;
        }

    }
}
  • Comparator則實現Serializable接口,定義了isExtremal抽象方法,同時提供了getForAggregation工廠方法,根據不一樣的AggregationType建立不一樣的Comparator
  • Comparator裏頭定義了MaxComparator、MinComparator、MinByComparator、MaxByComparator四個子類,它們都實現了isExtremal方法
  • MaxComparator直接利用Comparable接口定義的compareTo方法,不過它的返回只有0和1,compareTo大於0的時候才返回1,不然返回0,也就是大於的狀況才返回1,不然返回0;MaxByComparator也先根據Comparable接口定義的compareTo方法獲取值,不過它的返回值有3種,大於0的時候返回1,等於0時返回0,小於0時返回-1,也就是大於的狀況返回1,相等的狀況返回0,小於的狀況返回-1

小結

  • KeyedStream的aggregation操做主要分爲sum、max、min、maxBy、minBy這幾個方法,它們內部都調用了protected修飾的aggregation方法,只是它們建立的ComparableAggregator的AggregationType不同,分別是SUM, MAX, MIN, MAXBY, MINBY
  • ComparableAggregator繼承了AggregationFunction,而AggregationFunction則實現了ReduceFunction接口,這裏ComparableAggregator實現的reduce方法,它首先借助Comparator來比較兩個對象,而後根據是不是byAggregate作不一樣處理,若是是byAggregate,則在比較值爲0時,判斷是否返回最早遇到的元素,若是是則返回最早遇到的,不然返回最後遇到的,比較值非0時,則取比較值最大的元素返回;若是不是byAggregate,則若是比較值爲0,則使用反射方法將後者的值更新到value1,最後都是返回value1
  • Comparator裏頭定義了MaxComparator、MinComparator、MinByComparator、MaxByComparator四個子類,它們都實現了isExtremal方法;MaxComparator與MaxByComparator的區別在於,MaxComparator大於返回1,小於等於返回0,而MaxByComparator返回值更精細,大於返回1,等於返回0,小於返回-1;這個區別也體如今ComparableAggregator的reduce方法中,並且maxBy、minBy比其餘方法多了一個first(boolean)參數,專門用於在比較值爲的0的時候選擇返回哪一個元素;而reduce方法對於非byAggregate操做,始終返回的是value1,在比較值小於等於的時候,使用反射更新value1,而後返回value1

doc

相關文章
相關標籤/搜索