在Flink的流式處理中,會涉及到時間的不一樣概念,以下圖所示:web
Event Time:是事件建立的時間。它一般由事件中的時間戳描述,例如採集的日誌數據中,每一條日誌都會記錄本身的生成時間,Flink經過時間戳分配器訪問事件時間戳。bash
Ingestion Time:是數據進入Flink的時間。session
Processing Time:是每個執行基於時間操做的算子的本地系統時間,與機器相關,默認的時間屬性就是Processing Time。socket
例如,一條日誌進入Flink的時間爲2018-11-12 10:00:00.123,到達Window的系統時間爲2018-11-12 10:00:01.234,日誌的內容以下: 2018-11-02 18:37:15.624 INFO Fail over to rm2函數
對於業務來講,要統計1min內的故障日誌個數,哪一個時間是最有意義的?—— eventTime,由於咱們要根據日誌的生成時間進行統計。spa
streaming流式計算是一種被設計用於處理無限數據集的數據處理引擎,而無限數據集是指一種不斷增加的本質上無限的數據集,而window是一種切割無限數據爲有限塊進行處理的手段。設計
Window是無限數據流處理的核心,Window將一個無限的stream拆分紅有限大小的」buckets」桶,咱們能夠在這些桶上作計算操做。3d
Window能夠分紅兩類:日誌
對於TimeWindow 和 CountWindow,能夠根據窗口實現原理的不一樣分紅:滾動窗口(Tumbling Window)、滑動窗口(Sliding Window),另外還有會話窗口(Session Window)。code
(1) 滾動窗口(Tumbling Windows)
將數據依據固定的窗口長度對數據進行切片。
特色:時間對齊,窗口長度固定,沒有重疊。
滾動窗口分配器將每一個元素分配到一個指定窗口大小的窗口中,滾動窗口有一個固定的大小,而且不會出現重疊。例如:若是你指定了一個5分鐘大小的滾動窗口,窗口的建立以下圖所示:
適用場景:適合作BI統計等(作每一個時間段的聚合計算)。
(2)滑動窗口(Sliding Windows)
滑動窗口是固定窗口的更廣義的一種形式,滑動窗口由固定的窗口長度和滑動間隔組成。
特色:時間對齊,窗口長度固定,有重疊。
滑動窗口分配器將元素分配到固定長度的窗口中,與滾動窗口相似,窗口的大小由窗口大小參數來配置,另外一個窗口滑動參數控制滑動窗口開始的頻率。所以,滑動窗口若是滑動參數小於窗口大小的話,窗口是能夠重疊的,在這種狀況下元素會被分配到多個窗口中。
例如,你有10分鐘的窗口和5分鐘的滑動,那麼每一個窗口中5分鐘的窗口裏包含着上個10分鐘產生的數據,以下圖所示:
適用場景:對最近一個時間段內的統計(求某接口最近5min的失敗率來決定是否要報警)。
(3)會話窗口(Session Windows)
由一系列事件組合一個指定時間長度的timeout間隙組成,相似於web應用的session,也就是一段時間沒有接收到新數據就會生成新的窗口。
特色:時間無對齊。
session窗口分配器經過session活動來對元素進行分組,session窗口跟滾動窗口和滑動窗口相比,不會有重疊和固定的開始時間和結束時間的狀況,相反,當它在一個固定的時間週期內再也不收到元素,即非活動間隔產生,那個這個窗口就會關閉。一個session窗口經過一個session間隔來配置,這個session間隔定義了非活躍週期的長度,當這個非活躍週期產生,那麼當前的session將關閉而且後續的元素將被分配到新的session窗口中去。
CountWindow根據窗口中相同key元素的數量來觸發執行,執行時只計算元素數量達到窗口大小的key對應的結果。
注意:CountWindow的window_size指的是相同Key的元素的個數,不是輸入的全部元素的總數。
(1)滾動窗口(Tumbling Windows)
默認的CountWindow是一個滾動窗口,只須要指定窗口大小便可,當元素數量達到窗口大小時,就會觸發窗口的執行。
// 獲取執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 建立SocketSource
val stream = env.socketTextStream("localhost", 11111)
// 對Stream進行處理並按key聚合
val streamKeyBy = stream.map(item => (item.split(" ")(0), item.split(" ")(1).toLong)).keyBy(0)
// 引入滾動窗口
// 這裏的5指的是5個相同key的元素計算一次
val streamWindow = streamKeyBy.countWindow(5)
// 執行聚合操做
val streamReduce = streamWindow.reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)
// 將聚合操做寫入文件
streamReduce.print()
// 執行程序
env.execute("TumbingWindow")
複製代碼
(2)滑動窗口(Sliding Windows)
滑動窗口和滾動窗口的函數名是徹底一致的,只是在傳參數時須要傳入兩個參數,一個是window_size,一個是sliding_size。
下面代碼中的sliding_size設置爲了2,也就是說,每收到兩個相同key的數據就計算一次,每一次計算的window範圍是5個元素。
// 獲取執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 建立SocketSource
val stream = env.socketTextStream("localhost", 11111)
// 對stream進行處理並按key聚合
val streamKeyBy = stream.map(item => (item.split(" ")(0), item.split(" ")(1).toLong)).keyBy(0)
// 引入滾動窗口
// 當相同key的元素個數達到2個時,觸發窗口計算,計算的窗口範圍爲5
val streamWindow = streamKeyBy.countWindow(5,2)
// 執行聚合操做
val streamReduce = streamWindow.reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)
// 將聚合數據寫入文件
streamReduce.print()
// 執行程序
env.execute("TumblingWindow")
複製代碼
TimeWindow是將指定時間範圍內的全部數據組成一個window,一次對一個window裏面的全部數據進行計算。
(1)滾動窗口(Tumbling Windows)
Flink默認的時間窗口根據Processing Time 進行窗口的劃分,將Flink獲取到的數據根據進入Flink的時間劃分到不一樣的窗口中。
// 獲取執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 建立SocketSource
val stream = env.socketTextStream("localhost", 11111)
// 對stream進行處理並按key聚合
val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
// 引入時間窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
// 執行聚合操做
val streamReduce = streamWindow.reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)
// 將聚合數據寫入文件
streamReduce.print()
// 執行程序
env.execute("TimeWindow")
複製代碼
時間間隔能夠經過Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個來指定。
(2) 滑動窗口(SlidingEventTimeWindows)
滑動窗口和滾動窗口的函數名是徹底一致的,只是在傳參數時須要傳入兩個參數,一個是window_size,一個是sliding_size。
下面代碼中的sliding_size設置爲了2s,也就是說,窗口每2s就計算一次,每一次計算的window範圍是5s內的全部元素。
// 獲取執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 建立SocketSource
val stream = env.socketTextStream("localhost", 11111)
// 對stream進行處理並按key聚合
val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
// 引入滾動窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5), Time.seconds(2))
// 執行聚合操做
val streamReduce = streamWindow.reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)
// 將聚合數據寫入文件
streamReduce.print()
// 執行程序
env.execute("TumblingWindow")
複製代碼
時間間隔能夠經過Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個來指定。
WindowedStream → DataStream:給window賦一個reduce功能的函數,並返回一個聚合的結果。
// 獲取執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 建立SocketSource
val stream = env.socketTextStream("localhost", 11111)
// 對stream進行處理並按key聚合
val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
// 引入時間窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
// 執行聚合操做
val streamReduce = streamWindow.reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)
// 將聚合數據寫入文件
streamReduce.print()
// 執行程序
env.execute("TumblingWindow")
複製代碼
WindowedStream → DataStream:給窗口賦一個fold功能的函數,並返回一個fold後的結果。
// 獲取執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 建立SocketSource
val stream = env.socketTextStream("localhost", 11111,'\n',3)
// 對stream進行處理並按key聚合
val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
// 引入滾動窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
// 執行fold操做
val streamFold = streamWindow.fold(100){
(begin, item) =>
begin + item._2
}
// 將聚合數據寫入文件
streamFold.print()
// 執行程序
env.execute("TumblingWindow")
複製代碼
WindowedStream → DataStream:對一個window內的全部元素作聚合操做。min和 minBy的區別是min返回的是最小值,而minBy返回的是包含最小值字段的元素(一樣的原理適用於 max 和 maxBy)。
// 獲取執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 建立SocketSource
val stream = env.socketTextStream("localhost", 11111)
// 對stream進行處理並按key聚合
val streamKeyBy = stream.map(item => (item.split(" ")(0), item.split(" ")(1))).keyBy(0)
// 引入滾動窗口
val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
// 執行聚合操做
val streamMax = streamWindow.max(1)
// 將聚合數據寫入文件
streamMax.print()
// 執行程序
env.execute("TumblingWindow")
複製代碼