Flink的window、時間語義,Watermark機制,多代碼案例詳解,Flink學習入門(三)

你們好,我是後來,我會分享我在學習和工做中遇到的點滴,但願有機會個人某篇文章可以對你有所幫助,全部的文章都會在公衆號首發,歡迎你們關注個人公衆號" 後來X大數據 ",感謝你的支持與承認。

經過前2篇flink的學習,已經基本掌握了flink的基本使用,可是關於flink真正內核的東西還沒開始說,那先簡單介紹一下,flink的核心亮點:html

  1. 窗口
  2. 時間語義
  3. 精準一次性

咱們在第一篇的學習瞭解到了flink的wordCount,以及在第二篇的API 中,咱們也只是獲取到數據,進行簡單的轉換,就直接把數據輸出。java

可是咱們在以前都是以事件爲驅動,等於說是來了一條數據,我就處理一次,可是如今遇到的問題是:apache

咱們能夠簡單的把wordCount的需求比作公司的訂單金額,也就是訂單金額會隨着訂單的增長而只增不減,那麼若是運營部門提了如下需求:編程

  1. 每有1000條訂單就輸出一次這1000條訂單的總金額
  2. 每5分鐘輸出一次剛剛過去這5分鐘的訂單總金額
  3. 每3秒輸出一次最近5分鐘內的累計成交額
  4. 連續2條訂單的間隔時間超過30秒就按照這個時間分爲2組訂單,輸出前一組訂單的總金額

那麼面對這個需求,由於時間一直是流動的,你們有什麼想法?windows

基於這些需求,咱們來說一下flink的窗口。api

窗口

窗口:不管是hive中的開窗函數,仍是Spark中的批次計算中的窗口,仍是咱們這裏講的窗口,本質上都是對數據進行劃分,而後對劃分後的數據進行計算。網絡

那麼Windows是處理無限流的核心。Windows將流分紅有限大小的「存儲桶」,咱們能夠在其上應用計算。app

在flink中,窗口式Flink程序通常有2類,框架

  1. 鍵控流
stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"
  1. 非鍵控流
stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

惟一的區別是:對鍵控流的keyBy(…)調用window(…),而非鍵控流則是調用windowAll(…)。socket

窗口的生命週期

咱們上面說窗口就是對數據進行劃分到不一樣的「桶」中,而後進行計算,那麼什麼開始有這個桶,何時就算是分完了呢?
簡而言之,一旦應屬於該窗口的第一個元素到達,就會建立一個窗口,當時間超過用戶設置的時間戳時,flink將刪除這個窗口。

那咱們來理解一下窗口的類型:

  1. CountWindow:按照指定的數據條數生成一個Window,與時間無關。
  2. TimeWindow:按照時間生成Window。

    1. 滾動窗口
    2. 滑動窗口
    3. 會話窗口

從文字也不難看出,CountWindow就是按照數據條數生成窗口,樣例代碼以下:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

object CountWindowsTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val wordDS = env.socketTextStream("master102",3456)

    wordDS
      .map((_,1))
      .keyBy(0)
      //累計單個Key中3條數據就進行處理
      .countWindow(3)
      .sum(1)
      .print("測試:")

    env.execute()
  }
}

執行結果以下:
在這裏插入圖片描述
能夠看出,不一樣的單詞根據keyby進入不一樣的窗口,而後當窗口中的單個key的數據個數達到3個以後進行輸出。

接下來,咱們主要來講一下時間窗口,這些窗口的結束與開始都是根據數據的時間來判斷的,因此這裏就引出了咱們今天的第二個重點:時間語義

時間語義

Flink 在流式傳輸程序中支持不一樣的時間概念:

  1. Event Time:事件時間是每一個事件在其生產設備上發生的時間。它一般由事件中的時間戳描述,例如採集的日誌數據中,每一條日誌都會記錄本身的生成時間,Flink經過時間戳分配器訪問事件時間戳。
  2. Ingestion Time:攝取時間是數據進入Flink的時間。攝取時間從概念上講介於事件時間和處理時間之間。
  3. Processing Time:處理時間是是指正在執行相應算子操做的機器的系統時間,默認的時間屬性就是Processing Time。 處理時間是最簡單的時間概念,不能提供肯定性,由於它容易依賴數據到達系統(例如從消息隊列)到達系統的速度,以及數據在系統內部之間流動的速度。

咱們根據業務的需求還判斷使用哪一個時間類型,通常來講使用Event Time更多,好比:在統計最近5分鐘的訂單總金額時,咱們須要的是真實的訂單時間,而不是進入flink的時間或者是處理時間。

在Flink的流式處理中,絕大部分的業務都會使用EventTime,通常只在EventTime沒法使用時,纔會被迫使用ProcessingTime或者IngestionTime。默認狀況下,Flink框架中處理的時間語義爲ProcessingTime,若是要使用EventTime,那麼須要引入EventTime的時間屬性,引入方式以下所示:

import org.apache.flink.streaming.api.TimeCharacteristic

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

這裏注意:若是要使用事件時間,那麼必需要爲數據定義事件時間,而且還要註冊水位線

好了,又是一個新的知識點:水位線

咱們暫時先有這些概念,而後咱們再返回來繼續說咱們的窗口的類型。說完窗口類型,再詳細說水位線的應用。

因此這也爲後面的數據亂序埋下了坑,好比,2條訂單,它們的訂單時間差很少,一前一後,可是由於先下單的這條訂單的網絡狀況很差,致使後到達flink窗口,也就是咱們常說的數據亂序,那麼這種狀況該怎麼辦?咱們後面再說這個問題

特別注意:窗口是左閉右開的。

滾動窗口

滾動窗口具備固定的尺寸和不重疊,例如,若是指定大小爲5分鐘的滾動窗口,則每五分鐘將啓動一個新窗口,以下圖所示。
在這裏插入圖片描述
樣例代碼以下:

import java.text.SimpleDateFormat

import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
/**
  * @description: ${description}
  * @author: Liu Jun Jun
  * @create: 2020-06-29 13:59
  **/
object WindowTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val dataDS = env.socketTextStream("bigdata101", 3456)

    val tsDS = dataDS.map(str => {
      val strings = str.split(",")
      (strings(0), strings(1).toLong, 1)
    }).keyBy(0)
      //窗口大小爲5s的滾動窗口
      //.timeWindow(Time.seconds(5))和下面的這種寫法都是能夠的
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .apply {
        (tuple: Tuple, window: TimeWindow, es: Iterable[(String, Long, Int)], out: Collector[String]) => {
          val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
          //out.collect(s"window:[${sdf.format(new Date(window.getStart))}-${sdf.format(new Date(window.getEnd))}]:{ ${es.mkString(",")} }")
          out.collect(s"window:[${window.getStart}-${window.getEnd}]:{ ${es.mkString(",")} }")
        }
      }.print("windows:>>>")

    env.execute()
  }
}

經過運行,大概會發現,咱們輸入的時間戳並不會起做用,默認使用的確實是處理時間:
在這裏插入圖片描述
同時,能夠看出,滾動窗口的時間窗口不會有重疊,一條數據只會屬於一個窗口,並且,窗口是左閉右開的。

滑動窗口

滑動窗口也是固定長度的窗口,不過因爲滑動的頻率,當滑動頻率小於窗口大小時,滑動窗口會重疊,在這種狀況下,一個元素被分配到多個窗口。
例如:指定大小爲10分鐘的窗口滑動5分鐘。這樣,您每隔5分鐘就會獲得一個窗口,其中包含最近10分鐘內到達的事件,以下圖所示。
在這裏插入圖片描述
接下來,我只貼改動代碼,其他代碼和上面的滾動代碼是同樣的:

//滾動5秒,滑動3秒
//.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(3)))和下面的這句話是同樣的
 .timeWindow(Time.seconds(5),Time.seconds(3))

在這裏插入圖片描述
很是關鍵的是:你們發現,flink默認的分配窗口是從每秒從0開始數的,舉例:會把5秒的窗口分爲:
[0-5),[5,10),[10-15),....
3秒的窗口爲:
[0-3),[3,6),[6-9),....

會話窗口

與滾動窗口和滑動窗口相比,會話窗口不重疊且沒有固定的開始和結束時間。相反,會話窗口在必定時間段內未收到元素時(即,出現不活動間隙時)關閉。隨後的元素將分配給新的會話窗口。
在這裏插入圖片描述

.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))

在這裏插入圖片描述
能夠看出,此次的窗口大小並非固定的,那麼我在測試輸入的時候,輸完一些後等了一下子才繼續輸入的,那麼就出現了第一個窗口,因此只要processtime間隔時間超過10s,就會輸出上一個窗口。

總結窗口的知識點:

  1. 以上全部的窗口的時間均可以更改成EventTime,同時時間間隔能夠指定爲Time.milliseconds(x),Time.seconds(x),Time.minutes(x)

若是使用timewindow()方法,那麼會隨着事件時間的指定會更改成以事件時間爲標準的窗口,而若是使用window()方法,那麼其中的參數會發生變化。

//滾動窗口
 //事件時間
 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
 //處理時間
 .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

//滑動窗口
 //事件時間
 .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
 //處理時間
 .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))

//會話窗口
 //事件時間
 .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
 //處理時間
 .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
  1. flink默認的分配窗口是從每秒從0開始數的,舉例:會把5秒的窗口分爲:

[0-5),[5,10),[10-15),....
3秒的窗口爲:
[0-3),[3,6),[6-9),....

那麼可不能夠作到窗口的劃分爲[1-6),[6,11)...
固然能夠,flink有窗口偏移設置。通常用不到,我在這裏簡單貼一下使用方式:

//5秒的窗口偏移3秒
 .window(TumblingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(3)))

在這裏插入圖片描述
能從上圖看出,窗口從原來的80-85,偏移到了83-88。那我再把方法總結一下

//窗口偏移方法總結
//滾動窗口
 //事件時間
 .window(TumblingEventTimeWindows.of(Time.seconds(5),Time.seconds(3)))
 //處理時間
 .window(TumblingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(3)))

//滑動窗口
 //事件時間
 .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5),Time.seconds(3)))
 //處理時間
 .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5),Time.seconds(3)))

//會話窗口
 //事件時間
 .window(EventTimeSessionWindows.withGap(Time.minutes(10),Time.seconds(3)))
 //處理時間
 .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10),Time.seconds(3)))

關於窗口的使用基本上差很少了,接下來只要說一說水位線

水位線WaterMark

WaterMark,叫作水位線,那它是幹啥的?支持事件時間的流處理器須要一種衡量事件時間進度的方法。
這裏要注意:使用事件時間必需要使用註冊水位線,而水位線也是事件時間專用的
例如,當以事件時間開窗1小時,目前窗口剛超過一個小時,須要通知構建每小時窗口的窗口操做員,關閉正在進行中的這個窗口程序。

那問題來了,怎麼衡量時間到了沒?因此Flink中用於衡量事件時間進度的機制是水位線。

強調:並非每條數據都會生成水位線。水位線也是一條數據,是流數據的一部分,watermark是一個全局的值,不是某一個key下的值,因此即便不是同一個key的數據,其warmark也會增長。

同時,水位線還有一個重要做用,就是處理延遲數據,咱們在文章開頭的部分也提到了,數據亂序怎麼處理,那麼有些數據由於網絡的緣由,延遲了幾秒,因此也能夠把水位線看做是窗口最後的執行時間

好比說,咱們規定滾動窗口爲5秒,也就是[5-10),同時咱們預測數據通常可能延遲3秒,因此咱們但願窗口是當10s的數據到達後,繼續等待3秒,看這3秒內,仍是否有本來是[5-10)中的數據,一塊兒歸併到這個窗口中,等到出現了時間爲大於等於13s的數據時,就會觸發[5-10)這個窗口的數據執行。這就是延遲處理。(代碼案例看下面的週期性水位線)

那麼水位線怎麼生成呢?

有兩種分配時間戳和生成水位線的方法:

  1. 直接在數據流源中(我如今還不知道哪一種數據源能夠直接生成時間戳和水位線,因此這裏不討論了)
  2. 經過時間戳分配器/水位線生成器:在Flink中,時間戳分配器還定義要發送的水位線注意自1970-01-01T00:00:00Z的Java時代以來,時間戳和水位線都指定爲毫秒。(大部分使用狀況)

Event Time的使用必定要指定數據源中的時間戳。不然程序沒法知道事件的事件時間是什麼(數據源裏的數據沒有時間戳的話,就只能使用Processing Time了)。

那咱們就利用第二種方式來生成水位線吧,注意要在事件時間的第一個操做(例如第一個窗口操做)以前指定分配器,例如:
在這裏插入圖片描述
咱們發現註冊水位線的有2個接口能夠實現:

  1. AssignerWithPeriodicWatermarks(週期性生成水位線)
  2. AssignerWithPunctuatedWatermarks(標記性生成水位線)

一個一個說,先說週期性生成水位線:

週期性水位線

//flink默認200ms(毫秒)生成一條水位線,那咱們也能夠修改
@PublicEvolving
    public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
        this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
        if (characteristic == TimeCharacteristic.ProcessingTime) {
            getConfig().setAutoWatermarkInterval(0);
        } else {
            getConfig().setAutoWatermarkInterval(200);
        }
    }
    
//單位是毫秒,因此我這裏模擬設置的爲10s
env.getConfig.setAutoWatermarkInterval(10000)

那麼這裏的時間間隔指的是系統時間的10s,可不是事件時間的10s,這個不要弄混,不相信的話能夠等會看個人測試案例。

import java.text.SimpleDateFormat
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

/**
  * @description: ${本案例模擬的是:以事件時間爲標準,窗口滾動時間爲5秒}
  * @author: Liu Jun Jun
  * @create: 2020-06-28 18:31
  **/
object WaterMarkTest {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //設置以事件時間爲基準
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //並行度設置爲1。關於並行度的案例會在後面測試
    env.setParallelism(1)
    //設置10s生成一次水位線
    env.getConfig.setAutoWatermarkInterval(10000)

    val dataDS = env.socketTextStream("bigdata101", 3456)

    val tsDS = dataDS.map(str => {
      val strings = str.split(",")
      (strings(0), strings(1).toLong, 1)
    }).assignTimestampsAndWatermarks(

      new AssignerWithPeriodicWatermarks[(String,Long,Int)]{
        var maxTs :Long= 0
//獲得水位線,週期性調用這個方法,獲得水位線,我這裏設置的也就是延遲5秒
        override def getCurrentWatermark: Watermark = new Watermark(maxTs - 5000)
//負責抽取事件事件
        override def extractTimestamp(element: (String, Long, Int), previousElementTimestamp: Long): Long = {
          maxTs = maxTs.max(element._2 * 1000L)
          element._2 * 1000L
        }
      }

      /*new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.seconds(5)) {
        override def extractTimestamp(element: (String, Long, Int)): Long = element._2 * 1000
      }*/
    )
    val result = tsDS
      .keyBy(0)
      //窗口大小爲5s的滾動窗口
      .timeWindow(Time.seconds(5))
      .apply {
        (tuple: Tuple, window: TimeWindow, es: Iterable[(String, Long, Int)], out: Collector[String]) => {
          val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
          //out.collect(s"window:[${sdf.format(new Date(window.getStart))}-${sdf.format(new Date(window.getEnd))}]:{ ${es.mkString(",")} }")
          out.collect(s"window:[${window.getStart}-${window.getEnd}]:{ ${es.mkString(",")} }")
        }
      }
    tsDS.print("water")
    result.print("windows:>>>")

    env.execute()
  }
}

在這裏插入圖片描述
那麼從結果能夠看出:
【10- 15)的窗口是20這條數據觸發的,在我輸入20這條數據等了幾秒後輸出了第一個窗口
證明:10s的間隔時間爲系統時間,同時水位線=當前時間戳 - 延遲時間 ,若是窗口的end time <= 水位線,則會觸發這個窗口的執行

【15- 20)的窗口是25這條數據觸發的,一樣符合窗口的end time <= 水位線

那麼若是數據的窗口已經觸發了,但還有一點數據仍是遲到了怎麼辦?
全部還有個概念就是allowedLateness(容許接收延遲數據),而且還會繼續把數據放入對應的窗口。看代碼吧:

//其他代碼和上面案例的同樣,只是在開窗以後多了一行
 .keyBy(0)
      .timeWindow(Time.seconds(5))
      //具體這2秒錶明什麼意思,看完測試結果案例就懂了
      .allowedLateness(Time.seconds(2)
      .apply{}

在這裏插入圖片描述
經過看圖應該能明白這裏allowedLateness(Time.seconds(2)是什麼意思了,只要是窗口觸發後,時間小於設定的延遲時間,收到的延遲數據均可以處理,但要是沒有設置allowedLateness(Time.seconds(2)),那麼窗口觸發後的延遲數據都不會處理。

數據的延遲老是不可徹底預測的,假如時間已經超過了容許接收的延遲數據時間,還有一點點數據遲到,就是上圖中,在22這條數據以後我輸入的14這條數據,那怎麼辦?這種狀況下,咱們不能爲了偶爾的一點數據就把全部窗口的等待時間延遲好久,全部還有個概念就是側輸出流,將晚到的數據放置在側輸出流中。來看代碼:

//只加了3行,其他的和以前的代碼同樣
 val outputTag = new OutputTag[(String, Long, Int)]("lateData")//新加的
    val result = tsDS
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .allowedLateness(Time.seconds(2))
      .sideOutputLateData(outputTag)//新加的
      .apply {}
result.getSideOutput(outputTag).print("side>>>")//新加的

在這裏插入圖片描述

標記性水位線

知識不少東西是想通的,因此開始講延遲數據就巴拉巴拉一堆,再繼續說間標記水位線,爲何叫作標記呢?由於這種水位線的生成與時間無關,而是決定於什麼時候收到標記事件。
默認狀況下,全部的數據都屬於標記事件,意味着每條數據都會生成水位線。
因此使用這種方式的時候,須要對某些特定事件進行標記。

object WaterMarkTest {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val dataDS = env.socketTextStream("bigdata101", 3456)
    
    val tsDS = dataDS.map(str => {
      val strings = str.split(",")
      (strings(0), strings(1).toLong, 1)
    }).assignTimestampsAndWatermarks(

      new AssignerWithPunctuatedWatermarks[(String,Long,Int)] {
        override def checkAndGetNextWatermark(lastElement: (String, Long, Int), extractedTimestamp: Long): Watermark = {
          if (lastElement._1 .contains("later")){
            println("間歇性生成了水位線.....")
            // 間歇性生成水位線數據
            new Watermark(extractedTimestamp)
          }
          return null
        }

        override def extractTimestamp(element: (String, Long, Int), previousElementTimestamp: Long): Long = {
          element._2 * 1000L
        }
      }
   )

    val result = tsDS
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .apply {
        (tuple: Tuple, window: TimeWindow, es: Iterable[(String, Long, Int)], out: Collector[String]) => {
          val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
          out.collect(s"window:[${window.getStart}-${window.getEnd}]:{ ${es.mkString(",")} }")
        }
      }
    tsDS.print("water")
    result.print("calc")

    env.execute()
  }
}

看一下個人測試結果:
在這裏插入圖片描述
固然即使咱們設置了標記,在TPS很高的場景下依然會產生大量的Watermark,在必定程度上對下游算子形成壓力,因此只有在實時性要求很是高的場景纔會選擇Punctuated的方式進行Watermark的生成。

關於並行度與水位線

細心的小夥伴也會發現,我在上面的全部的案例中,使用的並行度都是1,但實際生產中確定不是1啊,這個會有什麼變化麼?固然是有的。

我先說結論:
若是並行度不爲1,那麼在計算窗口時,是按照各自的並行度單獨計算的。只有當全部並行度中都觸發了同一個窗口,那麼這個窗口才會觸發。

口說無憑,咱們來看案例,此次放完整代碼:

import java.text.SimpleDateFormat
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

/**
  * @description: ${模擬多並行度下,窗口如何觸發}
  * @author: Liu Jun Jun
  * @create: 2020-06-28 18:31
  **/
object WaterMarkTest {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //注掉了並行度爲1,默認並行度=cpu核數,我這裏cpu爲4個
    //env.setParallelism(1)
    env.getConfig.setAutoWatermarkInterval(10000)

    val dataDS = env.socketTextStream("bigdata101", 3456)

    val tsDS = dataDS.map(str => {
      val strings = str.split(",")
      (strings(0), strings(1).toLong, 1)
    }).assignTimestampsAndWatermarks(

      new AssignerWithPeriodicWatermarks[(String,Long,Int)]{
        var maxTs :Long= 0
        override def getCurrentWatermark: Watermark = new Watermark(maxTs - 5000)

        override def extractTimestamp(element: (String, Long, Int), previousElementTimestamp: Long): Long = {
          maxTs = maxTs.max(element._2 * 1000L)
          element._2 * 1000L
        }
      }
    )
    //該案例中,爲了簡單,去掉了allowedLateness和側輸出流
    val result = tsDS
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .apply {
        (tuple: Tuple, window: TimeWindow, es: Iterable[(String, Long, Int)], out: Collector[String]) => {
          val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
          //out.collect(s"window:[${sdf.format(new Date(window.getStart))}-${sdf.format(new Date(window.getEnd))}]:{ ${es.mkString(",")} }")
          out.collect(s"window:[${window.getStart}-${window.getEnd}]:{ ${es.mkString(",")} }")
        }
      }
    tsDS.print("water")
    result.print("calc")
    env.execute()
  }
}

看一下測試結果吧:
在這裏插入圖片描述
好了,到這裏,窗口、時間語義以及水位線的基本原理就說完了,理解了這些再看看文章開頭提到了4個需求,是否是就有些想法了呢?

到目前爲止,咱們只是對數據進行了開窗,可是數據在一個窗口內怎麼處理尚未說,那麼下一章就來講處理函數,以及Flink的狀態編程。

在此次學習中發現的不錯的帖子:https://www.cnblogs.com/rossi...

掃碼關注公衆號「後來X大數據」,回覆【電子書】,領取超多本pdf 【java及大數據 電子書】

在這裏插入圖片描述

相關文章
相關標籤/搜索