Flink從入門到真香(十二、Flink一大利器-時間窗口)

flink中支持多種窗口,包括:時間窗口,session窗口,統計窗口等等,能想到的基本均可以實現java

時間窗口(Time Windows)
最簡單經常使用的窗口形式是基於時間的窗口,flink支持三種種時間窗口:apache

第一個: 翻滾時間窗口(tumbling time window)

翻滾時間窗口的窗口是固定的,好比設定一個1分鐘的時間窗口,該時間窗口將只計算當前1分鐘內的數據,而不會管前1分鐘或後1分鐘的數據。
時間是對齊的,數據不會同時出如今2個窗口內,不會重疊
Flink從入門到真香(十二、Flink一大利器-時間窗口)api

第二個:滑動時間窗口(sliding time window)

滑動窗口,顧名思義,該時間窗口是滑動的。因此,從概念上講,這裏有兩個方面的概念須要理解:session

窗口:須要定義窗口的大小
滑動:須要定義在窗口中滑動的大小,但理論上講滑動的大小不能超過窗口大小
滑動窗口是固定窗口的更廣義的一種形式,滑動窗口由固定的窗口長度和滑動間隔組成
窗口長度是固定的,能夠有重疊的部分
Flink從入門到真香(十二、Flink一大利器-時間窗口)socket

第三個: 會話窗口(Session Windows)

由一系列事件組合一個指定時間長度的timeout間隙組成,也就是一段時間沒有接收到新數據就會生成新的窗口
主要特色就是: 時間無對齊maven

Flink從入門到真香(十二、Flink一大利器-時間窗口)

window() 方法接收的輸入參數是一個WindowAssigner

WindowAssigner 負責將每條輸入的數據分發到正確的window中

Flink提供了通用的WindowAssigner
滾動窗口(tumbling window)
滑動窗口(sliding window)
會話窗口(session window)
全局窗口(global window)

建立不一樣類型的窗口

滾動時間窗口(tumbling time window)
timeWindow(Time.seconds(15))
滑動時間窗口(sliding time window)
.timeWindow(Time.seconds(15),Time.seconds(5))

會話窗口(session window)
.window(EventTimeSessionWindows.withGap(Time.minutes(10))

窗口函數(window function)
window function 定義了要對窗口中收集的數據作的計算操做,能夠分爲兩類;
增量聚合函數(incrementalggergation functions)
每條數據來了就會進行計算,保持一個簡單的狀態
ReduceFunction, AggregateFunction
全窗口函數(full windowfunctions)
先把窗口全部數據收集起來,等到計算的時候會遍歷全部數據
ProcessWindowFunction

其餘一些經常使用的API
.trigger()---------觸發器
定義window何時關閉,觸發計算並輸出結果
.evicotr()---------移除器
定義移除某些數據的邏輯
.allowedLateness()   ------容許處理遲到的數據
.sideOutputLateData() -----將遲到的數據放入側輸出流
.getSideOutput() ----獲取側輸出流

理論說半天其實仍是萌的,上個栗子ide

假設從文件讀一批數據,每15秒統計一次,獲取窗口內各傳感器全部溫度的最小值,以及最小的時間戳

新建一個scala Object WindowTest.scala函數

package com.mafei.apitest

import com.mafei.sinktest.SensorReadingTest5
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time

object WindowTest {
  def main(args: Array[String]): Unit = {
    //建立執行環境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
//    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)  //以事件時間做爲窗口聚合
//env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)   //以數據進入flink的時間做爲窗口時間
//    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //以Flink實際處理時間做爲窗口時間
    //若是發現沒有輸出,那多是由於數據太少,不到15s都處理完成了,能夠換成socket或者kafka來進行測試
    val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt")

    env.setParallelism(1)
    inputStream.print()

    //先轉換成樣例類類型
    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",") //按照,分割數據,獲取結果
        SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的數據,參數中傳toLong和toDouble是由於默認分割後是字符串類別
      })

    //每15秒統計一次,窗口內各傳感器全部溫度的最小值,以及最小的時間戳
    val resultStream = dataStream
      .map(data=>(data.id,data.temperature,data.timestamp))
      .keyBy(_._1) //按照二元組的第一個元素(id)分組
//      .window(TumblingEventTimeWindows.of(Time.seconds(15))) //滾動時間窗口
//      .window(SlidingProcessingTimeWindows.of(Time.seconds(15),Time.seconds(3))) //滑動時間窗口,15秒一個窗口,每次日後劃3秒
//      .window(EventTimeSessionWindows.withGap(Time.seconds(15))) //會話窗口,超過15秒算下一個會話
//      .countWindow(15) //滾動計數窗口
      .timeWindow(Time.seconds(15))  //每15秒統計一次,滾動時間窗口
//      .minBy(1)  //第二個元素作最小值的統計,若是隻是獲取全部溫度的最小值,直接用這個方法就能夠了。。
      .reduce((curRes,newData)=>(curRes._1, curRes._2.min(newData._2),newData._3))

    resultStream.print()
    env.execute()

  }
}

//上面reduce代碼若是用這個自定義的方式也是同樣能夠實現,效果是同樣的
class MyReducer extends ReduceFunction[SensorReadingTest5]{
  override def reduce(t: SensorReadingTest5, t1: SensorReadingTest5): SensorReadingTest5 =
    SensorReadingTest5(t.id, t1.timestamp,t.temperature.min(t1.temperature))
}

準備一個sensor.txt 放到指定目錄下內容: 測試

sensor1,1603766281,1
sensor2,1603766282,42
sensor3,1603766283,43
sensor4,1603766240,40.1
sensor4,1603766284,20
sensor4,1603766249,40.2

最終代碼的結構,和運行效果
Flink從入門到真香(十二、Flink一大利器-時間窗口)scala

相關文章
相關標籤/搜索