「Flink」Flink中的時間類型

Flink中的時間類型和窗口是很是重要概念,是學習Flink必需要掌握的兩個知識點。html

Flink中的時間類型

時間類型介紹

Flink流式處理中支持不一樣類型的時間。分爲如下幾種:apache

  1. 處理時間
    • Flink程序執行對應操做的系統時間。全部基於時間的操做(例如:時間窗口)都將使用運行相應operator的系統時間。例如:每一個小時的處理時間窗口包括在系統時間範圍內全部operator接收到的記錄。例如:若是應用程序在09:15開始運行,則第一個滾動時間窗口將包括:09:15 – 10:00 之間的處理事件,下一個窗口包括上午10:00 – 11:00之間的處理事件
    • 這種處理時間方式實時性是最好的,但數據未必準確
  2. 事件時間
    • 每一個事件發生的時間。這個時間通常是在進入到Flink以前就包含在事件中
    • 針對Eventtime,事件被處理的時間以來與事件自己
    • Eventtime必需要指定如何生成Eventtime Watermark(水印)
    • 理想狀況,無論事件什麼時候到達或者順序如何,事件時間處理可以獲得完整一致地結果。
    • 事件處理在等待亂序事件時,會產生一些延遲。這樣會對Eventtime的應用性能有必定的影響
  3. 攝入時間
    • 攝入時間是事件進入Flink的時間
    • 在source operator中,每一個記錄以時間戳的形式獲取源的當前時間
    • 它在概念是處於事件時間和處理時間中間
    • 攝入時間不能處理亂序問題或者延遲數據,攝入時間能夠由流式系統自動生成水印

Flink支持的這幾種時間恰好和咱們上一篇播客中的內容相對應。app

http://www.javashuo.com/article/p-hcvvzljd-gh.htmlide

應用一張Flink官網的圖。oop

image

Flink代碼中設置時間類型

一般,咱們在Flink初始化流式運行環境時,就會設置流處理時間特性。這個設置很重要,它決定了數據流的行爲方式。(例如:是否須要給事件分配時間戳),以及窗口操做應該使用什麼樣的時間類型。例如:KeyedStream.timeWindow(Time.seconds(30))。性能


咱們接下來經過實現一個每5秒中進行一次單詞計數的案例,來講明Flink中如何指定時間類型。學習

public class WordCountWindow {
    public static void main(String[] args) throws Exception {
        // 1. 初始化流式運行環境
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

        // 2. 設置時間處理類型,這裏設置的方式處理時間
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        // 3. 定義數據源,每秒發送一個hadoop單詞
        DataStreamSource<String> wordDS = env.addSource(new RichSourceFunction<String>() {

            private boolean isCanaled = false;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                while (!isCanaled) {
                    ctx.collect("hadooop");
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                isCanaled = true;
            }
        });

        // 4. 每5秒進行一次,分組統計
        // 4.1 轉換爲元組
        wordDS.map(word -> Tuple2.of(word, 1))
                // 指定返回類型
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                // 按照單詞進行分組
                .keyBy(t -> t.f0)
                // 滾動窗口,3秒計算一次
                .timeWindow(Time.seconds(3))
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                }, new RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
                    @Override
                    public void apply(String word, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {

                        // 打印窗口開始、結束時間
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        System.out.println("窗口開始時間:" + sdf.format(window.getStart())
                                + " 窗口結束時間:" + sdf.format(window.getEnd())
                                + " 窗口計算時間:" + sdf.format(System.currentTimeMillis()));

                        int sum = 0;
                        Iterator<Tuple2<String, Integer>> iterator = input.iterator();
                        while(iterator.hasNext()) {
                            Integer count = iterator.next().f1;
                            sum += count;
                        }
                        out.collect(Tuple2.of(word, sum));
                    }
                }).print();

        env.execute("app");
    }
}

窗口開始時間:2020-02-05 00:22:21 窗口結束時間:2020-02-05 00:22:24 窗口計算時間:2020-02-05 00:22:24
4> (hadooop,2)
窗口開始時間:2020-02-05 00:22:24 窗口結束時間:2020-02-05 00:22:27 窗口計算時間:2020-02-05 00:22:27
4> (hadooop,3)
窗口開始時間:2020-02-05 00:22:27 窗口結束時間:2020-02-05 00:22:30 窗口計算時間:2020-02-05 00:22:30
4> (hadooop,3)
窗口開始時間:2020-02-05 00:22:30 窗口結束時間:2020-02-05 00:22:33 窗口計算時間:2020-02-05 00:22:33
4> (hadooop,3)
窗口開始時間:2020-02-05 00:22:33 窗口結束時間:2020-02-05 00:22:36 窗口計算時間:2020-02-05 00:22:36
4> (hadooop,3)
窗口開始時間:2020-02-05 00:22:36 窗口結束時間:2020-02-05 00:22:39 窗口計算時間:2020-02-05 00:22:39spa

咱們能夠看到,這個滾動窗口,每3秒計算一次,是按照系統時間來計算的。code

咱們再把時間窗口設置爲1分鐘,再試試。orm

窗口開始時間:2020-02-05 00:27:00 窗口結束時間:2020-02-05 00:28:00 窗口計算時間:2020-02-05 00:28:00
4> (hadooop,32)

窗口開始時間:2020-02-05 00:28:00 窗口結束時間:2020-02-05 00:29:00 窗口計算時間:2020-02-05 00:29:00
4> (hadooop,60)

恰好在 00:27:00 – 00:28:00之間。


參考文件:

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html

相關文章
相關標籤/搜索