Flink學習--Checkpoint 的應用實踐

文章來自:https://ververica.cn/develope...
做者:唐雲(茶幹)java

Checkpoint 與 state 的關係

Checkpoint 是從 source 觸發到下游全部節點完成的一次全局操做。下圖能夠有一個對 Checkpoint 的直觀感覺,紅框裏面能夠看到一共觸發了 569K 次 Checkpoint,而後所有都成功完成,沒有 fail 的。node

state 其實就是 Checkpoint 所作的主要持久化備份的主要數據,看下圖的具體數據統計,其 state 也就 9kb 大小 。算法

什麼是 state

咱們接下來看什麼是 state。先看一個很是經典的 word count 代碼,這段代碼會去監控本地的 9000 端口的數據並對網絡端口輸入進行詞頻統計,咱們本地行動 netcat,而後在終端輸入 hello world,執行程序會輸出什麼?數據庫

答案很明顯,**(hello, 1)** 和 **(word,1)**緩存

那麼問題來了,若是再次在終端輸入 hello world,程序會輸入什麼?網絡

答案其實也很明顯,**(hello, 2)** 和 **(world, 2)**。爲何 Flink 知道以前已經處理過一次 hello world,這就是 state 發揮做用了,這裏是被稱爲 keyed state 存儲了以前須要統計的數據,因此幫助 Flink 知道 hello 和 world 分別出現過一次。併發

回顧一下剛纔這段 word count 代碼。keyby 接口的調用會建立 keyed stream 對 key 進行劃分,這是使用 keyed state 的前提。在此以後,sum 方法會調用內置的 StreamGroupedReduce 實現。app

什麼是 keyed state

對於 keyed state,有兩個特色:框架

  • 只能應用於 KeyedStream 的函數與操做中,例如 Keyed UDF, window state
  • keyed state 是已經分區/劃分好的,每個 key 只能屬於某一個 keyed state

對於如何理解已經分區的概念,咱們須要看一下 keyby 的語義,你們能夠看到下圖左邊有三個併發,右邊也是三個併發,左邊的詞進來以後,經過 keyby 會進行相應的分發。例如對於 hello word,hello 這個詞經過 hash 運算永遠只會到右下方併發的 task 上面去。異步

什麼是 operator state

  • 又稱爲 non-keyed state,每個 operator state 都僅與一個 operator 的實例綁定。
  • 常見的 operator state 是 source state,例如記錄當前 source 的 offset

再看一段使用 operator state 的 word count 代碼:

這裏的fromElements會調用FromElementsFunction的類,其中就使用了類型爲 list state 的 operator state。根據 state 類型作一個分類以下圖:

除了從這種分類的角度,還有一種分類的角度是從 Flink 是否直接接管:

  • Managed State:由 Flink 管理的 state,剛纔舉例的全部 state 均是 managed state
  • Raw State:Flink 僅提供 stream 能夠進行存儲數據,對 Flink 而言 raw state 只是一些 bytes

在實際生產中,都只推薦使用 managed state,本文將圍繞該話題進行討論。

如何在 Flink 中使用 state

下圖就前文 word count 的 sum 所使用的**StreamGroupedReduce**類爲例講解了如何在代碼中使用 keyed state:

下圖則對 word count 示例中的**FromElementsFunction**類進行詳解並分享如何在代碼中使用 operator state:

Checkpoint 的執行機制

在介紹 Checkpoint 的執行機制前,咱們須要瞭解一下 state 的存儲,由於 state 是 Checkpoint 進行持久化備份的主要角色。

Statebackend 的分類

下圖闡釋了目前 Flink 內置的三類 state backend,其中**MemoryStateBackend****FsStateBackend**在運行時都是存儲在 java heap 中的,只有在執行 Checkpoint 時,**FsStateBackend**纔會將數據以文件格式持久化到遠程存儲上。

**RocksDBStateBackend**則借用了 RocksDB(內存磁盤混合的 LSM DB)對 state 進行存儲。

MemoryStateBackend

MemoryStateBackend 將工做狀態數據保存在 taskmanager 的 java 內存中。key/value 狀態和 window 算子使用哈希表存儲數值和觸發器。進行快照時(checkpointing),生成的快照數據將和 checkpoint ACK 消息一塊兒發送給 jobmanager,jobmanager 將收到的全部快照保存在 java 內存中。
MemoryStateBackend 如今被默認配置成異步的,這樣避免阻塞主線程的 pipline 處理。
MemoryStateBackend 的狀態存取的速度都很是快,可是不適合在生產環境中使用。這是由於 MemoryStateBackend 有如下限制:

  • 每一個 state 的默認大小被限制爲 5 MB(這個值能夠經過 MemoryStateBackend 構造函數設置)
  • 每一個 task 的全部 state 數據 (一個 task 可能包含一個 pipline 中的多個 Operator) 大小不能超過 RPC 系統的幀大小(akka.framesize,默認 10MB)
  • jobmanager 收到的 state 數據總和不能超過 jobmanager 內存

MemoryStateBackend 適合的場景:

  • 本地開發和調試
  • 狀態很小的做業

下圖表示了 MemoryStateBackend 的數據存儲位置:
@MemoryStateBackend state 存儲位置 | center
值得說明的是,當觸發 savepoint 時,jobmanager 會把快照數據持久化到外部存儲。

FsStateBackend

FsStateBackend 須要配置一個 checkpoint 路徑,例如「hdfs://namenode:40010/flink/checkpoints」 或者 「file:///data/flink/checkpoints」,咱們通常配置爲 hdfs 目錄
FsStateBackend 將工做狀態數據保存在 taskmanager 的 java 內存中。進行快照時,再將快照數據寫入上面配置的路徑,而後將寫入的文件路徑告知 jobmanager。jobmanager 中保存全部狀態的元數據信息(在 HA 模式下,元數據會寫入 checkpoint 目錄)。
FsStateBackend 默認使用異步方式進行快照,防止阻塞主線程的 pipline 處理。能夠經過 FsStateBackend 構造函數取消該模式:

new FsStateBackend(path, false);

FsStateBackend 適合的場景:

  • 大狀態、長窗口、大鍵值(鍵或者值很大)狀態的做業
  • 適合高可用方案

@FsStateBackend state 存儲位置 | center

RocksDBStateBackend

RocksDBStateBackend 也須要配置一個 checkpoint 路徑,例如:「hdfs://namenode:40010/flink/checkpoints」 或者 「file:///data/flink/checkpoints」,通常配置爲 hdfs 路徑。
RocksDB 是一種可嵌入的持久型的 key-value 存儲引擎,提供 ACID 支持。由 Facebook 基於 levelDB 開發,使用 LSM 存儲引擎,是內存和磁盤混合存儲。
RocksDBStateBackend 將工做狀態保存在 taskmanager 的 RocksDB 數據庫中;checkpoint 時,RocksDB 中的全部數據會被傳輸到配置的文件目錄,少許元數據信息保存在 jobmanager 內存中( HA 模式下,會保存在 checkpoint 目錄)。
RocksDBStateBackend 使用異步方式進行快照。
RocksDBStateBackend 的限制:

  • 因爲 RocksDB 的 JNI bridge API 是基於 byte[] 的,RocksDBStateBackend 支持的每一個 key 或者每一個 value 的最大值不超過 2^31 bytes((2GB))。
  • 要注意的是,有 merge 操做的狀態(例如 ListState),可能會在運行過程當中超過 2^31 bytes,致使程序失敗。

RocksDBStateBackend 適用於如下場景:

  • 超大狀態、超長窗口(天)、大鍵值狀態的做業
  • 適合高可用模式

使用 RocksDBStateBackend 時,可以限制狀態大小的是 taskmanager 磁盤空間(相對於 FsStateBackend 狀態大小限制於 taskmanager 內存 )。這也致使 RocksDBStateBackend 的吞吐比其餘兩個要低一些。由於 RocksDB 的狀態數據的讀寫都要通過反序列化/序列化。

RocksDBStateBackend 是目前三者中惟一支持增量 checkpoint 的。

@ RocksDBStateBackend 存儲位置 | center

statebackend 如何保存 managed keyed/operator state

對於HeapKeyedStateBackend,有兩種實現:

  • 支持異步 Checkpoint(默認):存儲格式 CopyOnWriteStateMap
  • 僅支持同步 Checkpoint:存儲格式 NestedStateMap

特別在 MemoryStateBackend 內使用HeapKeyedStateBackend時,Checkpoint 序列化數據階段默認有最大 5 MB數據的限制

對於RocksDBKeyedStateBackend,每一個 state 都存儲在一個單獨的 column family 內,其中 keyGroup,Key 和 Namespace 進行序列化存儲在 DB 做爲 key。

Checkpoint 執行機制詳解

本小節將對 Checkpoint 的執行流程逐步拆解進行講解,下圖左側是 Checkpoint Coordinator,是整個 Checkpoint 的發起者,中間是由兩個 source,一個 sink 組成的 Flink 做業,最右側的是持久化存儲,在大部分用戶場景中對應 HDFS。

a. 第一步,Checkpoint Coordinator 向全部 source 節點 trigger Checkpoint;。

b. 第二步,source 節點向下遊廣播 barrier,這個 barrier 就是實現 Chandy-Lamport 分佈式快照算法的核心,下游的 task 只有收到全部 input 的 barrier 纔會執行相應的 Checkpoint。

c. 第三步,當 task 完成 state 備份後,會將備份數據的地址(state handle)通知給 Checkpoint coordinator。

d. 第四步,下游的 sink 節點收集齊上游兩個 input 的 barrier 以後,會執行本地快照,這裏特意展現了 RocksDB incremental Checkpoint 的流程,首先 RocksDB 會全量刷數據到磁盤上(紅色大三角表示),而後 Flink 框架會從中選擇沒有上傳的文件進行持久化備份(紫色小三角)。

e. 一樣的,sink 節點在完成本身的 Checkpoint 以後,會將 state handle 返回通知 Coordinator。

f. 最後,當 Checkpoint coordinator 收集齊全部 task 的 state handle,就認爲這一次的 Checkpoint 全局完成了,向持久化存儲中再備份一個 Checkpoint meta 文件。

Checkpoint 的 EXACTLY_ONCE 語義

爲了實現 EXACTLY ONCE 語義,Flink 經過一個 input buffer 將在對齊階段收到的數據緩存起來,等對齊完成以後再進行處理。而對於 AT LEAST ONCE 語義,無需緩存收集到的數據,會對後續直接處理,因此致使 restore 時,數據可能會被屢次處理。下圖是官網文檔裏面就 Checkpoint align 的示意圖:

須要特別注意的是,Flink 的 Checkpoint 機制只能保證 Flink 的計算過程能夠作到 EXACTLY ONCE,端到端的 EXACTLY ONCE 須要 source 和 sink 支持。

Savepoint 與 Checkpoint 的區別

做業恢復時,兩者都可以使用,主要區別以下:

Savepoint Externalized Checkpoint
用戶經過命令觸發,由用戶管理其建立與刪除 Checkpoint 完成時,在用戶給定的外部持久化存儲保存
標準化格式存儲,容許做業升級或者配置變動 看成業 FAILED(或者CANCELED)時,外部存儲的 Checkpoint 會保留下來
用戶在恢復時須要提供用於恢復做業狀態的 savepoint 路徑 用戶在恢復時須要提供用於恢復的做業狀態的 Checkpoint 路徑
相關文章
相關標籤/搜索