Checkpoint 與 state 的關係
Checkpoint 是從 source 觸發到下游全部節點完成的一次全局操做。下圖能夠有一個對 Checkpoint 的直觀感覺,紅框裏面能夠看到一共觸發了 569K 次 Checkpoint,而後所有都成功完成,沒有 fail 的。java
state 其實就是 Checkpoint 所作的主要持久化備份的主要數據,看下圖的具體數據統計,其 state 也就 9kb 大小 。算法
什麼是 state緩存
咱們接下來看什麼是 state。先看一個很是經典的 word count 代碼,這段代碼會去監控本地的 9000 端口的數據並對網絡端口輸入進行詞頻統計,咱們本地行動 netcat,而後在終端輸入 hello world,執行程序會輸出什麼?網絡
答案很明顯,(hello, 1) 和 (word,1)併發
那麼問題來了,若是再次在終端輸入 hello world,程序會輸入什麼?框架
答案其實也很明顯,(hello, 2) 和 (world, 2)。爲何 Flink 知道以前已經處理過一次 hello world,這就是 state 發揮做用了,這裏是被稱爲 keyed state 存儲了以前須要統計的數據,因此幫助 Flink 知道 hello 和 world 分別出現過一次。異步
回顧一下剛纔這段 word count 代碼。keyby 接口的調用會建立 keyed stream 對 key 進行劃分,這是使用 keyed state 的前提。在此以後,sum 方法會調用內置的 StreamGroupedReduce 實現。分佈式
什麼是 keyed state函數
對於 keyed state,有兩個特色:spa
對於如何理解已經分區的概念,咱們須要看一下 keyby 的語義,你們能夠看到下圖左邊有三個併發,右邊也是三個併發,左邊的詞進來以後,經過 keyby 會進行相應的分發。例如對於 hello word,hello 這個詞經過 hash 運算永遠只會到右下方併發的 task 上面去。
什麼是 operator state
再看一段使用 operator state 的 word count 代碼:
這裏的fromElements會調用FromElementsFunction的類,其中就使用了類型爲 list state 的 operator state。根據 state 類型作一個分類以下圖:
除了從這種分類的角度,還有一種分類的角度是從 Flink 是否直接接管:
在實際生產中,都只推薦使用 managed state,本文將圍繞該話題進行討論。
如何在 Flink 中使用 state
下圖就前文 word count 的 sum 所使用的StreamGroupedReduce類爲例講解了如何在代碼中使用 keyed state:
下圖則對 word count 示例中的FromElementsFunction類進行詳解並分享如何在代碼中使用 operator state:
Checkpoint 的執行機制
在介紹 Checkpoint 的執行機制前,咱們須要瞭解一下 state 的存儲,由於 state 是 Checkpoint 進行持久化備份的主要角色。
Statebackend 的分類
下圖闡釋了目前 Flink 內置的三類 state backend,其中MemoryStateBackend和FsStateBackend在運行時都是存儲在 java heap 中的,只有在執行 Checkpoint 時,FsStateBackend纔會將數據以文件格式持久化到遠程存儲上。而RocksDBStateBackend則借用了 RocksDB(內存磁盤混合的 LSM DB)對 state 進行存儲。
對於HeapKeyedStateBackend,有兩種實現:
特別在 MemoryStateBackend 內使用HeapKeyedStateBackend時,Checkpoint 序列化數據階段默認有最大 5 MB 數據的限制
對於RocksDBKeyedStateBackend,每一個 state 都存儲在一個單獨的 column family 內,其中 keyGroup,Key 和 Namespace 進行序列化存儲在 DB 做爲 key。
Checkpoint 執行機制詳解
本小節將對 Checkpoint 的執行流程逐步拆解進行講解,下圖左側是 Checkpoint Coordinator,是整個 Checkpoint 的發起者,中間是由兩個 source,一個 sink 組成的 Flink 做業,最右側的是持久化存儲,在大部分用戶場景中對應 HDFS。
Checkpoint 的 EXACTLY_ONCE 語義
爲了實現 EXACTLY ONCE 語義,Flink 經過一個 input buffer 將在對齊階段收到的數據緩存起來,等對齊完成以後再進行處理。而對於 AT LEAST ONCE 語義,無需緩存收集到的數據,會對後續直接處理,因此致使 restore 時,數據可能會被屢次處理。下圖是官網文檔裏面就 Checkpoint align 的示意圖:
須要特別注意的是,Flink 的 Checkpoint 機制只能保證 Flink 的計算過程能夠作到 EXACTLY ONCE,端到端的 EXACTLY ONCE 須要 source 和 sink 支持。
Savepoint 與 Checkpoint 的區別
做業恢復時,兩者都可以使用,主要區別以下:
Savepoint | Externalized |
---|---|
用戶經過命令觸發,由用戶管理其建立與刪除 | Checkpoint 完成時,在用戶給定的外部持久化存儲保存 |
標準化格式存儲,容許做業升級或者配置變動 | 看成業 FAILED(或者 CANCELED)時,外部存儲的 Checkpoint 會保留下來 |
用戶在恢復時須要提供用於恢復做業狀態的 savepoint 路徑 | 用戶在恢復時須要提供用於恢復的做業狀態的 Checkpoint 路徑 |
本文爲雲棲社區原創內容,未經容許不得轉載。