在Flink的流式處理中,絕大部分的業務都會使用eventTime,通常只在eventTime沒法使用時,纔會被迫使用ProcessingTime或者IngestionTime。bash
若是要使用EventTime,那麼須要引入EventTime的時間屬性,引入方式以下所示:網絡
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 從調用時刻開始給env建立的每個stream追加時間特徵
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
複製代碼
咱們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的,雖然大部分狀況下,流到operator的數據都是按照事件產生的時間順序來的,可是也不排除因爲網絡、背壓等緣由,致使亂序的產生,所謂亂序,就是指Flink接收到的事件的前後順序不是嚴格按照事件的Event Time順序排列的。socket
那麼此時出現一個問題,一旦出現亂序,若是隻根據eventTime決定window的運行,咱們不能明確數據是否所有到位,但又不能無限期的等下去,此時必需要有個機制來保證一個特定的時間後,必須觸發window去進行計算了,這個特別的機制,就是Watermark。ide
Watermark是一種衡量Event Time進展的機制,它是數據自己的一個隱藏屬性,數據自己攜帶着對應的Watermark。spa
Watermark是用於處理亂序事件的,而正確的處理亂序事件,一般用Watermark機制結合window來實現。3d
數據流中的Watermark用於表示timestamp小於Watermark的數據,都已經到達了,所以,window的執行也是由Watermark觸發的。日誌
Watermark能夠理解成一個延遲觸發機制,咱們能夠設置Watermark的延時時長t,每次系統會校驗已經到達的數據中最大的maxEventTime,而後認定eventTime小於maxEventTime - t的全部數據都已經到達,若是有窗口的中止時間等於maxEventTime – t,那麼這個窗口被觸發執行。code
有序流的Watermarker以下圖所示:(Watermark設置爲0)cdn
亂序流的Watermarker以下圖所示:(Watermark設置爲2)blog
當Flink接收到每一條數據時,都會產生一條Watermark,這條Watermark就等於當前全部到達數據中的maxEventTime - 延遲時長,也就是說,Watermark是由數據攜帶的,一旦數據攜帶的Watermark比當前未觸發的窗口的中止時間要晚,那麼就會觸發相應窗口的執行。因爲Watermark是由數據攜帶的,所以,若是運行過程當中沒法獲取新的數據,那麼沒有被觸發的窗口將永遠都不被觸發。
上圖中,咱們設置的容許最大延遲到達時間爲2s,因此時間戳爲7s的事件對應的Watermark是5s,時間戳爲12s的事件的Watermark是10s,若是咱們的窗口1是1s~5s,窗口2是6s~10s,那麼時間戳爲7s的事件到達時的Watermarker剛好觸發窗口1,時間戳爲12s的事件到達時的Watermark剛好觸發窗口2。
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 從調用時刻開始給env建立的每個stream追加時間特徵
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = env.readTextFile("eventTest.txt").assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(200)) {
override def extractTimestamp(t: String): Long = {
// EventTime是日誌生成時間,咱們從日誌中解析EventTime
t.split(" ")(0).toLong
}
})
複製代碼
當使用EventTimeWindow時,全部的Window在EventTime的時間軸上進行劃分,也就是說,在Window啓動後,會根據初始的EventTime時間每隔一段時間劃分一個窗口,若是Window大小是3秒,那麼1分鐘內會把Window劃分爲以下的形式:
[00:00:00,00:00:03)
[00:00:03,00:00:06)
...
[00:00:57,00:01:00)
複製代碼
若是Window大小是10秒,則Window會被分爲以下的形式:
[00:00:00,00:00:10)
[00:00:10,00:00:20)
...
[00:00:50,00:01:00)
複製代碼
注意,窗口是左閉右開的,形式爲:[window_start_time,window_end_time)。
Window的設定無關數據自己,而是系統定義好了的,也就是說,Window會一直按照指定的時間間隔進行劃分,不論這個Window中有沒有數據,EventTime在這個Window期間的數據會進入這個Window。
Window會不斷產生,屬於這個Window範圍的數據會被不斷加入到Window中,全部未被觸發的Window都會等待觸發,只要Window還沒觸發,屬於這個Window範圍的數據就會一直被加入到Window中,直到Window被觸發纔會中止數據的追加,而當Window觸發以後才接受到的屬於被觸發Window的數據會被丟棄。
Window會在如下的條件知足時被觸發執行:
咱們經過下圖來講明Watermark、EventTime和Window的關係。
// 獲取執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 建立SocketSource
val stream = env.socketTextStream("localhost", 11111)
// 對stream進行處理並按key聚合
val streamKeyBy = stream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(3000)) {
override def extractTimestamp(element: String): Long = {
val sysTime = element.split(" ")(0).toLong
println(sysTime)
sysTime
}}).map(item => (item.split(" ")(1), 1)).keyBy(0)
// 引入滾動窗口
val streamWindow = streamKeyBy.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// 執行聚合操做
val streamReduce = streamWindow.reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)
// 將聚合數據寫入文件
streamReduce.print
// 執行程序
env.execute("TumblingWindow")
複製代碼
結果是按照Event Time的時間窗口計算得出的,而無關係統的時間(包括輸入的快慢)。
// 獲取執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 建立SocketSource
val stream = env.socketTextStream("localhost", 11111)
// 對stream進行處理並按key聚合
val streamKeyBy = stream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(0)) {
override def extractTimestamp(element: String): Long = {
val sysTime = element.split(" ")(0).toLong
println(sysTime)
sysTime
}}).map(item => (item.split(" ")(1), 1)).keyBy(0)
// 引入滾動窗口
val streamWindow = streamKeyBy.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
// 執行聚合操做
val streamReduce = streamWindow.reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)
// 將聚合數據寫入文件
streamReduce.print
// 執行程序
env.execute("TumblingWindow")
複製代碼
相鄰兩次數據的EventTime的時間差超過指定的時間間隔就會觸發執行。若是加入Watermark,那麼當觸發執行時,全部知足時間間隔而尚未觸發的Window會同時觸發執行。
// 獲取執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 建立SocketSource
val stream = env.socketTextStream("localhost", 11111)
// 對stream進行處理並按key聚合
val streamKeyBy = stream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(0)) {
override def extractTimestamp(element: String): Long = {
val sysTime = element.split(" ")(0).toLong
println(sysTime)
sysTime
}}).map(item => (item.split(" ")(1), 1)).keyBy(0)
// 引入滾動窗口
val streamWindow = streamKeyBy.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
// 執行聚合操做
val streamReduce = streamWindow.reduce(
(item1, item2) => (item1._1, item1._2 + item2._2)
)
// 將聚合數據寫入文件
streamReduce.print
// 執行程序
env.execute("TumblingWindow")
複製代碼