場景:
窗口中每進入一條數據,就進行一次計算,等時間到了展現最後的結果。
經常使用的聚合算子:session
reduce(reduceFunction) aggregate(aggregateFunction) sum(),min(),max()
(1)reduce算子
場景:窗口內元素的聚合求和app
/** * window的增量聚合 */ public class SocketWindowIncrAgg { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost",9999); SingleOutputStreamOperator<Integer> integerSingleOutputStreamOperator = dataStreamSource.map(number -> Integer.valueOf(number)); AllWindowedStream<Integer,TimeWindow> windowedStream = integerSingleOutputStreamOperator .timeWindowAll(Time.seconds(10)); windowedStream.reduce(new ReduceFunction<Integer>() { @Override public Integer reduce(Integer last, Integer current) throws Exception { System.out.println("process last:"+last+" current:"+current); return last + current; } }).print().setParallelism(1); env.execute(SocketWindowIncrAgg.class.getSimpleName()); } }
(2)aggregate算子
需求:求每隔窗口裏面的數據的平均值socket
/** * window的增量聚合 * 用aggregate算子計算平均值 */ public class SocketWindowAvgAgg { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost",9999); SingleOutputStreamOperator<Integer> integerSingleOutputStreamOperator = dataStreamSource.map(number -> Integer.valueOf(number)); AllWindowedStream<Integer,TimeWindow> windowedStream = integerSingleOutputStreamOperator .timeWindowAll(Time.seconds(10)); windowedStream.aggregate(new MyAggregate()).print().setParallelism(1); env.execute(SocketWindowAvgAgg.class.getSimpleName()); } /** * IN,輸入的數據類型 * ACC,自定義的中間狀態 * Tuple2(Integer,Integer) * key: 計算數據的個數 * value:計算總值 * OUT,輸出的數據類型 * */ private static class MyAggregate implements AggregateFunction<Integer, Tuple2<Integer,Integer>,Double> { /** * 初始化累加器 * @return */ @Override public Tuple2<Integer, Integer> createAccumulator() { return new Tuple2<>(0,0); } /** * 針對每一個數據的操做 * @param element * @param accumulator * @return */ @Override public Tuple2<Integer, Integer> add(Integer element, Tuple2<Integer, Integer> accumulator) { //個數+1 //總的值累計 accumulator.f0+=1; accumulator.f1+=element; System.out.println("input:"+element+"|accumulator:"+accumulator); return new Tuple2<>(accumulator.f0,accumulator.f1); } /** * 計算結果 * @param accumulator * @return */ @Override public Double getResult(Tuple2<Integer, Integer> accumulator) { System.out.println("call getResult:"+accumulator); return (double) accumulator.f1/accumulator.f0; } /** * 最終結果的合併 * @param a1 * @param b1 * @return */ @Override public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a1, Tuple2<Integer, Integer> b1) { System.out.println("call merge:"+a1+" "+b1); return Tuple2.of(a1.f0+b1.f0,a1.f1+b1.f1); } } }
等屬於窗口的數據到齊,纔開始進行聚合計算【能夠實現對窗口內的數據進行排序等需求】
經常使用算子:ide
apply(windowFunction) process(processWindowFunction) processWindowFunction比windowFunction提供了更多的上下文信息。 相似於map和RichMap的關係
/** * window的全量計算 * 用ProcessAllWindowFunction抽象類,重寫process方法 */ public class SocketWindowFullAgg { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost",9999); SingleOutputStreamOperator<Integer> integerSingleOutputStreamOperator = dataStreamSource.map(number -> Integer.valueOf(number)); AllWindowedStream<Integer,TimeWindow> windowedStream = integerSingleOutputStreamOperator .timeWindowAll(Time.seconds(10)); windowedStream.process(new MyProcess()) .print().setParallelism(1); env.execute(SocketWindowFullAgg.class.getSimpleName()); } /** * 抽象類只能繼承 * * @tparam IN The type of the input value. * @tparam OUT The type of the output value. * @tparam W The type of the window. * */ private static class MyProcess extends ProcessAllWindowFunction<Integer,Integer,TimeWindow> { @Override public void process(Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception { System.out.println("process start:"+elements); int count = 0; Iterator<Integer> integerIterator = elements.iterator(); while (integerIterator.hasNext()){ Integer number = integerIterator.next(); count += number; } out.collect(count); } } }
兩個window之間能夠進行join,join操做只支持三種類型的window:
滾動窗口,滑動窗口,會話窗口。
使用方式:code
stream.join(otherStream) //兩個流進行關聯 .where(<KeySelector>) //選擇第一個流的key做爲關聯字段 .equalTo(<KeySelector>)//選擇第二個流的key做爲關聯字段 .window(<WindowAssigner>)//設置窗口的類型 .apply(<JoinFunction>) //對結果作操做
核心思想:兩個事件流先join,而後去開窗
示例:
blog
/** * window的join * 兩個流進行滾動窗口的join */ public class SocketTumblingWindowJoin { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource01 = env.socketTextStream("localhost",9999); DataStreamSource<String> dataStreamSource02 = env.socketTextStream("localhost",9998); SingleOutputStreamOperator<Integer> dataStream01 = dataStreamSource01.map(number -> Integer.valueOf(number)); SingleOutputStreamOperator<Integer> dataStream02 = dataStreamSource02.map(number -> Integer.valueOf(number)); DataStream<String> result = dataStream01.join(dataStream02) .where(new KeySelector<Integer, Object>() { @Override public Object getKey(Integer integer) throws Exception { return integer; } }) .equalTo(new KeySelector<Integer, Object>() { @Override public Object getKey(Integer integer) throws Exception { return integer; } }) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .apply(new JoinFunction<Integer, Integer, String>() { @Override public String join(Integer first, Integer second) throws Exception { return first + ","+ second; } }); result.print().setParallelism(1); env.execute(SocketTumblingWindowJoin.class.getSimpleName()); } }
/** * window的join * 兩個流進行滑動窗口的join */ public class SocketSlideWindowJoin { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource01 = env.socketTextStream("localhost",9999); DataStreamSource<String> dataStreamSource02 = env.socketTextStream("localhost",9998); SingleOutputStreamOperator<Integer> dataStream01 = dataStreamSource01.map(number -> Integer.valueOf(number)); SingleOutputStreamOperator<Integer> dataStream02 = dataStreamSource02.map(number -> Integer.valueOf(number)); DataStream<String> result = dataStream01.join(dataStream02) .where(new KeySelector<Integer, Object>() { @Override public Object getKey(Integer integer) throws Exception { return 1; } }) .equalTo(new KeySelector<Integer, Object>() { @Override public Object getKey(Integer integer) throws Exception { return 1; } }) .window(SlidingProcessingTimeWindows.of(Time.seconds(10),//size Time.seconds(5)//slide )) .apply(new JoinFunction<Integer, Integer, String>() { @Override public String join(Integer first, Integer second) throws Exception { return first + ","+ second; } }); result.print().setParallelism(1); env.execute(SocketSlideWindowJoin.class.getSimpleName()); } }
/** * window的join * 兩個流進行session窗口的join */ public class SocketSesssionWindowJoin { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource01 = env.socketTextStream("localhost",9999); DataStreamSource<String> dataStreamSource02 = env.socketTextStream("localhost",9998); SingleOutputStreamOperator<Integer> dataStream01 = dataStreamSource01.map(number -> Integer.valueOf(number)); SingleOutputStreamOperator<Integer> dataStream02 = dataStreamSource02.map(number -> Integer.valueOf(number)); DataStream<String> result = dataStream01.join(dataStream02) .where(new KeySelector<Integer, Object>() { @Override public Object getKey(Integer integer) throws Exception { return 1; } }) .equalTo(new KeySelector<Integer, Object>() { @Override public Object getKey(Integer integer) throws Exception { return 1; } }) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))) .apply(new JoinFunction<Integer, Integer, String>() { @Override public String join(Integer first, Integer second) throws Exception { return first + ","+ second; } }); result.print().setParallelism(1); env.execute(SocketSesssionWindowJoin.class.getSimpleName()); } }
/** * window的join * 兩個流按照key,取窗口一段間隔內jioin * */ public class SocketIntervalWindowJoin { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource01 = env.socketTextStream("localhost",9999); DataStreamSource<String> dataStreamSource02 = env.socketTextStream("localhost",9998); SingleOutputStreamOperator<Integer> dataStream01 = dataStreamSource01.map(number -> Integer.valueOf(number)); SingleOutputStreamOperator<Integer> dataStream02 = dataStreamSource02.map(number -> Integer.valueOf(number)); DataStream<String> result = dataStream01 .keyBy(0) .intervalJoin(dataStream02.keyBy(0)) .between(Time.seconds(-2),Time.seconds(2)) .process(new ProcessJoinFunction<Integer, Integer, String>() { @Override public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) throws Exception { out.collect(left+","+right); } }); result.print().setParallelism(1); env.execute(SocketIntervalWindowJoin.class.getSimpleName()); } }