Flink從入門到真香(1四、Flink必備知識-Watermark)

對於Flink來講,Watermark是個很難繞過去的概念,有的翻譯爲水位線,有的翻譯爲水印,都是同一個東西,網絡

watermark是一種衡量Event Time進展的機制,它是數據自己的一個隱藏屬性。一般基於Event Time的數據,自身都包含一個timestamp.watermark是用於處理亂序事件的,而正確的處理亂序事件,一般用watermark機制結合window來實現。ide

流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的。雖然大部分狀況下,流到operator的數據都是按照事件產生的時間順序來的,可是也不排除因爲網絡、背壓等緣由,致使亂序的產生(out-of-order或者說late element)。翻譯

可是對於late element,咱們又不能無限期的等下去,必需要有個機制來保證一個特定的時間後,必須觸發window去進行計算了。這個特別的機制,就是watermark。code

設置watermark最大的做用其實就是爲了解決數據亂序問題

假設10分鐘發一班車,8點一班,8點10分一班,設置watermark後,8點這一班的實際會在8點10分所有發出去,會自動設置延遲

Watermark的特色

以當前最大的時間戳,減去固定的延遲做爲watermark的時間戳,
也是就是每條數據出來都會算下,全部數據裏面最大的那個值,再減去固定的延遲blog

我的理解不必定對: 以這個爲例:watermark爲2,當看到5的時候,watermark是5-2=3,假設桶設置的4,那麼0-4的桶就不會關閉,繼續往下走
下一個數字3,發現最大數仍是5,那5-2=3,0-4的桶仍是不動,當走到6的時候,最大是6,那6-2=4,這時候0-4的桶就會關掉了,以此類推
Flink從入門到真香(1四、Flink必備知識-Watermark)
watermark是一條特殊的數據記錄
watermark必須單調遞增,以確保任務的事件時間時鐘在向前推動,而不是在後退
watermark與數據的時間戳相關排序

watermark傳遞

上游向下遊傳遞的時候會把watermark廣播出去,
下游可能會接收到多個上游的watermark數據,會在內部創建一個分區watermark,以最小的數據做爲最終的watermark
好比上游有3個數據源,輸出的watermark分別爲4,3,5 那麼在下游會把3個數據所有接收到,最終輸出最小的爲本身的watermark也就是3事件

下面這個例子,有4個上游數據,watermark分爲是4,7,6,6,分區數據watermark數據是2,4,3,6
第一張圖: 當上遊第1,2,3個數據都沒來的時候,全部分區數據最小的是2,因此輸出當前事件時間時間爲2
第二張圖:上游第1個數據來了,也就是4數據把原來的2覆蓋了,這時候數據變成了4,4,3,6 最小的數據變成3,因此輸出當前事件時間爲3
第三張圖:上游第2個數據7來了,也就是7把原來的4覆蓋了,這時候數據變成了4,7,3,6 最小的數據仍是3,因此輸出當前事件時間仍是3
第四張圖:上游第3個數據6來了,也就是6把3給覆蓋了,這時候數據變成了4,7,6,6 最小的數據變成了4,因此輸出當前事件時間是4
Flink從入門到真香(1四、Flink必備知識-Watermark)element

watermark的引入

Event Time的使用必定要指定數據源中的時間戳
調用assignTimestampAndWatermarks方法,傳入一個BoundedOutOfOrdernessTimestampExtractor,就能夠指定watermark開發

//先轉換成樣例類類型
    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",") //按照,分割數據,獲取結果
        SensorReadingTest5(arr(0), arr(1).toLong, arr(2).toDouble) //生成一個傳感器類的數據,參數中傳toLong和toDouble是由於默認分割後是字符串類別
      })
//      .assignAscendingTimestamps(_.timestamp ) //這種是當時間確定是按照時間排序的,沒有亂序的狀況,升序提取時間戳(若是數據中timestamp爲秒,能夠*1000L轉爲毫秒)
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReadingTest5](Time.seconds(3)) {  // 指定亂序最大等3
        override def extractTimestamp(t: SensorReadingTest5): Long = t.timestamp * 1000L //指定watermark的字段
      })

watermark的設定

在Flink中,watermark由應用開發人員生成,這一般須要對對應的領域有必定的瞭解
若是watermark設置的延遲過久,收到結果的速度可能就會很慢,解決辦法是在水位線到達以前輸出一個近似結果
而若是watermark到達得太早,則可能收到錯誤結果,不過flink處理遲到數據的機制能夠解決這個問題字符串

週期性水印(With Periodic Watermarks)

AssignerWithPeriodicWatermarks週期性地分配timestamp和生成watermark(可能依賴於元素或者純粹基於處理時間)。
watermark產生的事件間隔(每n毫秒)是經過ExecutionConfig.setAutoWatermarkInterval(...)來定義的,每當分配器的getCurrentWatermark()方法唄調用時,若是返回的watermark是非空而且大於上一個watermark的話,一個新的watermark將會被髮射。

Assigner with punctuated watermarks

間斷式的生成watermark。和週期性水印不同,這種方式不是固定時間的,而是能夠根據須要對每條數據進行篩選和處理

對於亂序數據處理,flink提供3重保障
一、watermark: 能夠設置小一點hold住大部分狀況,提供近似正確的結果
二、.allowedLateness(Time.minutes(1)) //容許處理遲到數據1分鐘
三、.sideOutputLateData(new OutputTag(String, Double, Long)) //側輸出流,先輸出到一個旁路,打上標籤,保證數據不會丟

相關文章
相關標籤/搜索