Flink(七)Flink的窗口聚合

1、 window增量聚合

場景:
窗口中每進入一條數據,就進行一次計算,等時間到了展現最後的結果。
經常使用的聚合算子:session

reduce(reduceFunction) 
aggregate(aggregateFunction) 
sum(),min(),max()

Flink(七)Flink的窗口聚合

(1)reduce算子
場景:窗口內元素的聚合求和app

/**
 * window的增量聚合
 */
public class SocketWindowIncrAgg {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost",9999);
        SingleOutputStreamOperator<Integer> integerSingleOutputStreamOperator = dataStreamSource.map(number -> Integer.valueOf(number));

        AllWindowedStream<Integer,TimeWindow> windowedStream = integerSingleOutputStreamOperator
                .timeWindowAll(Time.seconds(10));

        windowedStream.reduce(new ReduceFunction<Integer>() {
            @Override
            public Integer reduce(Integer last, Integer current) throws Exception {
                System.out.println("process last:"+last+" current:"+current);
                return last + current;
            }
        }).print().setParallelism(1);
        env.execute(SocketWindowIncrAgg.class.getSimpleName());
    }
}

(2)aggregate算子
需求:求每隔窗口裏面的數據的平均值socket

/**
 * window的增量聚合
 * 用aggregate算子計算平均值
 */
public class SocketWindowAvgAgg {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost",9999);
        SingleOutputStreamOperator<Integer> integerSingleOutputStreamOperator = dataStreamSource.map(number -> Integer.valueOf(number));

        AllWindowedStream<Integer,TimeWindow> windowedStream = integerSingleOutputStreamOperator
                .timeWindowAll(Time.seconds(10));

        windowedStream.aggregate(new MyAggregate()).print().setParallelism(1);
        env.execute(SocketWindowAvgAgg.class.getSimpleName());
    }

    /**
     * IN,輸入的數據類型
     * ACC,自定義的中間狀態
     *     Tuple2(Integer,Integer)
     *           key: 計算數據的個數
     *           value:計算總值
     * OUT,輸出的數據類型
     *
     */
    private static class MyAggregate implements AggregateFunction<Integer, Tuple2<Integer,Integer>,Double> {
        /**
         * 初始化累加器
         * @return
         */
        @Override
        public Tuple2<Integer, Integer> createAccumulator() {
            return new Tuple2<>(0,0);
        }

        /**
         * 針對每一個數據的操做
         * @param element
         * @param accumulator
         * @return
         */
        @Override
        public Tuple2<Integer, Integer> add(Integer element, Tuple2<Integer, Integer> accumulator) {
            //個數+1
            //總的值累計
            accumulator.f0+=1;
            accumulator.f1+=element;
            System.out.println("input:"+element+"|accumulator:"+accumulator);
            return new Tuple2<>(accumulator.f0,accumulator.f1);
        }

        /**
         * 計算結果
         * @param accumulator
         * @return
         */
        @Override
        public Double getResult(Tuple2<Integer, Integer> accumulator) {
            System.out.println("call getResult:"+accumulator);
            return (double) accumulator.f1/accumulator.f0;
        }

        /**
         * 最終結果的合併
         * @param a1
         * @param b1
         * @return
         */
        @Override
        public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a1, Tuple2<Integer, Integer> b1) {
            System.out.println("call merge:"+a1+"  "+b1);
            return Tuple2.of(a1.f0+b1.f0,a1.f1+b1.f1);
        }
    }
}

二、全量聚合

等屬於窗口的數據到齊,纔開始進行聚合計算【能夠實現對窗口內的數據進行排序等需求】
經常使用算子:ide

apply(windowFunction) 
process(processWindowFunction) 
processWindowFunction比windowFunction提供了更多的上下文信息。
相似於map和RichMap的關係

Flink(七)Flink的窗口聚合

/**
 * window的全量計算
 * 用ProcessAllWindowFunction抽象類,重寫process方法
 */
public class SocketWindowFullAgg {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost",9999);
        SingleOutputStreamOperator<Integer> integerSingleOutputStreamOperator = dataStreamSource.map(number -> Integer.valueOf(number));

        AllWindowedStream<Integer,TimeWindow> windowedStream = integerSingleOutputStreamOperator
                .timeWindowAll(Time.seconds(10));

        windowedStream.process(new MyProcess())
                .print().setParallelism(1);
        env.execute(SocketWindowFullAgg.class.getSimpleName());
    }

    /**
     * 抽象類只能繼承
     *
     * @tparam IN The type of the input value.
     * @tparam OUT The type of the output value.
     * @tparam W The type of the window.
     *
     */
    private static class MyProcess extends ProcessAllWindowFunction<Integer,Integer,TimeWindow> {
        @Override
        public void process(Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
            System.out.println("process start:"+elements);

            int count = 0;
            Iterator<Integer> integerIterator = elements.iterator();
            while (integerIterator.hasNext()){
                Integer number = integerIterator.next();
                count += number;
            }
            out.collect(count);
        }
    }
}

2、window join

兩個window之間能夠進行join,join操做只支持三種類型的window:
滾動窗口,滑動窗口,會話窗口。
使用方式:code

stream.join(otherStream) //兩個流進行關聯 
            .where(<KeySelector>) //選擇第一個流的key做爲關聯字段 
            .equalTo(<KeySelector>)//選擇第二個流的key做爲關聯字段 
            .window(<WindowAssigner>)//設置窗口的類型 
            .apply(<JoinFunction>) //對結果作操做

一、Tumbling Window Join

核心思想:兩個事件流先join,而後去開窗
示例:
Flink(七)Flink的窗口聚合blog

/**
 * window的join
 * 兩個流進行滾動窗口的join
 */
public class SocketTumblingWindowJoin {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStreamSource01 = env.socketTextStream("localhost",9999);
        DataStreamSource<String> dataStreamSource02 = env.socketTextStream("localhost",9998);
        SingleOutputStreamOperator<Integer> dataStream01 = dataStreamSource01.map(number -> Integer.valueOf(number));
        SingleOutputStreamOperator<Integer> dataStream02 = dataStreamSource02.map(number -> Integer.valueOf(number));

        DataStream<String> result = dataStream01.join(dataStream02)
                .where(new KeySelector<Integer, Object>() {
                    @Override
                    public Object getKey(Integer integer) throws Exception {
                        return integer;
                    }
                })
                .equalTo(new KeySelector<Integer, Object>() {
                    @Override
                    public Object getKey(Integer integer) throws Exception {
                        return integer;
                    }
                })
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .apply(new JoinFunction<Integer, Integer, String>() {
                    @Override
                    public String join(Integer first, Integer second) throws Exception {
                        return first + ","+ second;
                    }
                });

        result.print().setParallelism(1);
        env.execute(SocketTumblingWindowJoin.class.getSimpleName());
    }
}

二、Sliding Window Join

Flink(七)Flink的窗口聚合

/**
 * window的join
 * 兩個流進行滑動窗口的join
 */
public class SocketSlideWindowJoin {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStreamSource01 = env.socketTextStream("localhost",9999);
        DataStreamSource<String> dataStreamSource02 = env.socketTextStream("localhost",9998);
        SingleOutputStreamOperator<Integer> dataStream01 = dataStreamSource01.map(number -> Integer.valueOf(number));
        SingleOutputStreamOperator<Integer> dataStream02 = dataStreamSource02.map(number -> Integer.valueOf(number));

        DataStream<String> result = dataStream01.join(dataStream02)
                .where(new KeySelector<Integer, Object>() {
                    @Override
                    public Object getKey(Integer integer) throws Exception {
                        return 1;
                    }
                })
                .equalTo(new KeySelector<Integer, Object>() {
                    @Override
                    public Object getKey(Integer integer) throws Exception {
                        return 1;
                    }
                })
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10),//size
                        Time.seconds(5)//slide
                        ))
                .apply(new JoinFunction<Integer, Integer, String>() {
                    @Override
                    public String join(Integer first, Integer second) throws Exception {
                        return first + ","+ second;
                    }
                });

        result.print().setParallelism(1);
        env.execute(SocketSlideWindowJoin.class.getSimpleName());
    }

}

三、Session Window Join

Flink(七)Flink的窗口聚合

/**
 * window的join
 * 兩個流進行session窗口的join
 */
public class SocketSesssionWindowJoin {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStreamSource01 = env.socketTextStream("localhost",9999);
        DataStreamSource<String> dataStreamSource02 = env.socketTextStream("localhost",9998);
        SingleOutputStreamOperator<Integer> dataStream01 = dataStreamSource01.map(number -> Integer.valueOf(number));
        SingleOutputStreamOperator<Integer> dataStream02 = dataStreamSource02.map(number -> Integer.valueOf(number));

        DataStream<String> result = dataStream01.join(dataStream02)
                .where(new KeySelector<Integer, Object>() {
                    @Override
                    public Object getKey(Integer integer) throws Exception {
                        return 1;
                    }
                })
                .equalTo(new KeySelector<Integer, Object>() {
                    @Override
                    public Object getKey(Integer integer) throws Exception {
                        return 1;
                    }
                })
                .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
                .apply(new JoinFunction<Integer, Integer, String>() {
                    @Override
                    public String join(Integer first, Integer second) throws Exception {
                        return first + ","+ second;
                    }
                });

        result.print().setParallelism(1);
        env.execute(SocketSesssionWindowJoin.class.getSimpleName());
    }

}

四、Interval Join

Flink(七)Flink的窗口聚合

/**
 * window的join
 * 兩個流按照key,取窗口一段間隔內jioin
 * 
 */
public class SocketIntervalWindowJoin {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStreamSource01 = env.socketTextStream("localhost",9999);
        DataStreamSource<String> dataStreamSource02 = env.socketTextStream("localhost",9998);
        SingleOutputStreamOperator<Integer> dataStream01 = dataStreamSource01.map(number -> Integer.valueOf(number));
        SingleOutputStreamOperator<Integer> dataStream02 = dataStreamSource02.map(number -> Integer.valueOf(number));

        DataStream<String> result = dataStream01
                .keyBy(0)
                .intervalJoin(dataStream02.keyBy(0))
                .between(Time.seconds(-2),Time.seconds(2))
                .process(new ProcessJoinFunction<Integer, Integer, String>() {
                    @Override
                    public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) throws Exception {
                        out.collect(left+","+right);
                    }
                });

        result.print().setParallelism(1);
        env.execute(SocketIntervalWindowJoin.class.getSimpleName());
    }

}
相關文章
相關標籤/搜索