flink中支持多種窗口,包括:時間窗口,session窗口,統計窗口等等,能想到的基本均可以實現java
時間窗口(Time Windows)
最簡單經常使用的窗口形式是基於時間的窗口,flink支持三種種時間窗口:apache
翻滾時間窗口的窗口是固定的,好比設定一個1分鐘的時間窗口,該時間窗口將只計算當前1分鐘內的數據,而不會管前1分鐘或後1分鐘的數據。
時間是對齊的,數據不會同時出如今2個窗口內,不會重疊api
滑動窗口,顧名思義,該時間窗口是滑動的。因此,從概念上講,這裏有兩個方面的概念須要理解:session
窗口:須要定義窗口的大小
滑動:須要定義在窗口中滑動的大小,但理論上講滑動的大小不能超過窗口大小
滑動窗口是固定窗口的更廣義的一種形式,滑動窗口由固定的窗口長度和滑動間隔組成
窗口長度是固定的,能夠有重疊的部分socket
由一系列事件組合一個指定時間長度的timeout間隙組成,也就是一段時間沒有接收到新數據就會生成新的窗口
主要特色就是: 時間無對齊maven
window() 方法接收的輸入參數是一個WindowAssigner WindowAssigner 負責將每條輸入的數據分發到正確的window中 Flink提供了通用的WindowAssigner 滾動窗口(tumbling window) 滑動窗口(sliding window) 會話窗口(session window) 全局窗口(global window) 建立不一樣類型的窗口 滾動時間窗口(tumbling time window) timeWindow(Time.seconds(15)) 滑動時間窗口(sliding time window) .timeWindow(Time.seconds(15),Time.seconds(5)) 會話窗口(session window) .window(EventTimeSessionWindows.withGap(Time.minutes(10)) 窗口函數(window function) window function 定義了要對窗口中收集的數據作的計算操做,能夠分爲兩類; 增量聚合函數(incrementalggergation functions) 每條數據來了就會進行計算,保持一個簡單的狀態 ReduceFunction, AggregateFunction 全窗口函數(full windowfunctions) 先把窗口全部數據收集起來,等到計算的時候會遍歷全部數據 ProcessWindowFunction 其餘一些經常使用的API .trigger()---------觸發器 定義window何時關閉,觸發計算並輸出結果 .evicotr()---------移除器 定義移除某些數據的邏輯 .allowedLateness() ------容許處理遲到的數據 .sideOutputLateData() -----將遲到的數據放入側輸出流 .getSideOutput() ----獲取側輸出流
理論說半天其實仍是萌的,上個栗子ide
新建一個scala Object WindowTest.scala函數
package com.mafei.apitest import com.mafei.sinktest.SensorReadingTest5 import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation} import org.apache.flink.streaming.api.windowing.time.Time object WindowTest { def main(args: Array[String]): Unit = { //建立執行環境 val env = StreamExecutionEnvironment.getExecutionEnvironment // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //以事件時間做爲窗口聚合 //env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) //以數據進入flink的時間做爲窗口時間 // env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) //以Flink實際處理時間做爲窗口時間 //若是發現沒有輸出,那多是由於數據太少,不到15s都處理完成了,能夠換成socket或者kafka來進行測試 val inputStream = env.readTextFile("/opt/java2020_study/maven/flink1/src/main/resources/sensor.txt") env.setParallelism(1) inputStream.print() //先轉換成樣例類類型 val dataStream = inputStream .map(data => { val arr = data.split(",") //按照,分割數據,獲取結果 SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的數據,參數中傳toLong和toDouble是由於默認分割後是字符串類別 }) //每15秒統計一次,窗口內各傳感器全部溫度的最小值,以及最小的時間戳 val resultStream = dataStream .map(data=>(data.id,data.temperature,data.timestamp)) .keyBy(_._1) //按照二元組的第一個元素(id)分組 // .window(TumblingEventTimeWindows.of(Time.seconds(15))) //滾動時間窗口 // .window(SlidingProcessingTimeWindows.of(Time.seconds(15),Time.seconds(3))) //滑動時間窗口,15秒一個窗口,每次日後劃3秒 // .window(EventTimeSessionWindows.withGap(Time.seconds(15))) //會話窗口,超過15秒算下一個會話 // .countWindow(15) //滾動計數窗口 .timeWindow(Time.seconds(15)) //每15秒統計一次,滾動時間窗口 // .minBy(1) //第二個元素作最小值的統計,若是隻是獲取全部溫度的最小值,直接用這個方法就能夠了。。 .reduce((curRes,newData)=>(curRes._1, curRes._2.min(newData._2),newData._3)) resultStream.print() env.execute() } } //上面reduce代碼若是用這個自定義的方式也是同樣能夠實現,效果是同樣的 class MyReducer extends ReduceFunction[SensorReadingTest5]{ override def reduce(t: SensorReadingTest5, t1: SensorReadingTest5): SensorReadingTest5 = SensorReadingTest5(t.id, t1.timestamp,t.temperature.min(t1.temperature)) }
準備一個sensor.txt 放到指定目錄下內容: 測試
sensor1,1603766281,1 sensor2,1603766282,42 sensor3,1603766283,43 sensor4,1603766240,40.1 sensor4,1603766284,20 sensor4,1603766249,40.2
最終代碼的結構,和運行效果scala