flink window的early計算

轉發請註明原創地址:https://www.cnblogs.com/dongxiao-yang/p/9391815.htmlhtml

 

背景
flink 提供了完善的窗口機制, api中支持常見的三種窗口形式,滾動窗口,滑動窗口和session窗口。下面的圖片顯示了三種窗口的劃分區別:
滾動窗口
5b28d86219d81.png
滑動窗口
sw.png
session窗口
pimg_5b5ec2ffb9039.pngapi

Tumbing Windows:滾動窗口,窗口之間時間點不重疊。它是按照固定的時間,或固定的事件個數劃分的,分別能夠叫作滾動時間窗口和滾動事件窗口。
Sliding Windows:滑動窗口,窗口之間時間點存在重疊。對於某些應用,它們須要的時間是不間斷的,須要平滑的進行窗口聚合。例如,能夠每30s記算一次最近1分鐘用戶所購買的商品數量的總數,這個就是時間滑動窗口;或者每10個客戶點擊購買,而後就計算一下最近100個客戶購買的商品的總和,這個就是事件滑動窗口。
Session Windows:會話窗口,通過一段設置時間無數據認爲窗口完成。session

在默認的場景下,全部的窗口都是到達時間語義上的windown end time後觸發對整個窗口元素的計算,可是在部分場景的狀況下,業務方須要在窗口時間沒有結束的狀況下也能夠得到當前的聚合結果,好比每隔五分鐘獲取當前小時的sum值,這種狀況下,官方提供了對於上述窗口的定製化計算器ContinuousEventTimeTriggerContinuousProcessingTimeTriggersocket

下面是一個使用ContinuousProcessingTimeTrigger的簡單例子:ide

 

public class ContinueTriggerDemo {

    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub

        String hostName = "localhost";
        Integer port = Integer.parseInt("8001");
        ;

        // set up the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment
                .getExecutionEnvironment();

        // 從指定socket獲取輸入數據
        DataStream<String> text = env.socketTextStream(hostName, port);

        text.flatMap(new LineSplitter()) //數據語句分詞
                .keyBy(0) // 流按照單詞分區
                .window(TumblingProcessingTimeWindows.of(Time.seconds(120)))// 設置一個120s的滾動窗口
                .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))//窗口每統計一次當前計算結果
                .sum(1)// count求和
                .map(new Mapdemo())//輸出結果加上時間戳
                .print();

        env.execute("Java WordCount from SocketTextStream Example");

    }

    /**
     * Implements the string tokenizer that splits sentences into words as a
     * user-defined FlatMapFunction. The function takes a line (String) and
     * splits it into multiple pairs in the form of "(word,1)" (Tuple2<String,
     * Integer>).
     */
    public static final class LineSplitter implements
            FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }

    public static final class Mapdemo
            implements
            MapFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>> {

        @Override
        public Tuple3<String, String, Integer> map(Tuple2<String, Integer> value)
                throws Exception {
            // TODO Auto-generated method stub

            DateFormat format2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String s = format2.format(new Date());

            return new Tuple3<String, String, Integer>(value.f0, s, value.f1);
        }
    }
    


}

在本地啓動端口 :nc -lk 8001 並啓動flink程序
輸入數據:spa

           aa
           aa
           bb

觀察程序數據結果日誌日誌



5> (aa,2018-07-30 16:08:20,2) 5> (bb,2018-07-30 16:08:20,1) 5> (aa,2018-07-30 16:08:40,2) 5> (bb,2018-07-30 16:08:40,1) 5> (aa,2018-07-30 16:09:00,2) 5> (bb,2018-07-30 16:09:00,1) 5> (aa,2018-07-30 16:09:20,2) 5> (bb,2018-07-30 16:09:20,1) 5> (aa,2018-07-30 16:09:40,2) 5> (bb,2018-07-30 16:09:40,1)

在上述輸入後繼續輸入code

aa

日誌結果統計爲orm

5> (aa,2018-07-30 16:10:00,3)
5> (bb,2018-07-30 16:10:00,1)

根據日誌數據可見,flink輕鬆實現了一個窗口時間長度爲120s並每20s向下遊發送一次窗口當前聚合結果的功能。htm

相關文章
相關標籤/搜索