本文主要分享與交流 Flink 狀態使用過程當中的一些經驗與心得,固然標題取了「最佳實踐」之名,但願文章內容能給讀者帶去一些乾貨。本文內容首先是回顧 state 相關概念,並認識和區別不一樣的 state backend;以後將分別對 state 使用訪問以及 checkpoint 容錯相關內容進行詳細講解,分享一些經驗和心得。數據庫
咱們先回顧一下到底什麼是 state,流式計算的數據每每是轉瞬即逝, 固然,真實業務場景不可能說全部的數據都是進來以後就走掉,沒有任何東西留下來,那麼留下來的東西其實就是稱之爲 state,中文能夠翻譯成狀態。數組
在下面這個圖中,咱們的全部的原始數據進入用戶代碼以後再輸出到下游,若是中間涉及到 state 的讀寫,這些狀態會存儲在本地的 state backend(能夠對標成嵌入式本地 kv 存儲)當中。併發
接下來咱們會在四個維度來區分兩種不一樣的 state:operator state 以及 keyed state。app
1. 是否存在當前處理的 key(current key):operator state 是沒有當前 key 的概念,而 keyed state 的數值老是與一個 current key 對應。
2. 存儲對象是否 on heap: 目前 operator state backend 僅有一種 on-heap 的實現;而 keyed state backend 有 on-heap 和 off-heap(RocksDB)的多種實現。
3. 是否須要手動聲明快照(snapshot)和恢復 (restore) 方法:operator state 須要手動實現 snapshot 和 restore 方法;而 keyed state 則由 backend 自行實現,對用戶透明。
4. 數據大小:通常而言,咱們認爲 operator state 的數據規模是比較小的;認爲 keyed state 規模是相對比較大的。須要注意的是,這是一個經驗判斷,不是一個絕對的判斷區分標準。分佈式
下面這張圖對目前普遍使用的三類 state backend 作了區分,其中綠色表示所建立的operator/keyed state backend 是 on-heap 的,黃色則表示是 off-heap 的。性能
通常而言,在生產中,咱們會在 FsStateBackend 和 RocksDBStateBackend 間選擇:大數據
RocksDB 是 Facebook 開源的 LSM 的鍵值存儲數據庫,被普遍應用於大數據系統的單機組件中。Flink 的 keyed state 本質上來講就是一個鍵值對,因此與 RocksDB 的數據模型是吻合的。下圖分別是 「window state」 和 「value state」 在 RocksDB 中的存儲格式,全部存儲的 key,value 均被序列化成 bytes 進行存儲。spa
在 RocksDB 中,每一個 state 獨享一個 Column Family,而每一個 Column family 使用各自獨享的 write buffer 和 block cache,上圖中的 window state 和 value state實際上分屬不一樣的 column family。線程
下面介紹一些對 RocksDB 性能比較有影響的參數,並整理了一些相關的推薦配置,至於其餘配置項,能夠參閱社區相關文檔。翻譯
狀態 | 建議 |
---|---|
state.backend.rocksdb.thread.num | 後臺 flush 和 compaction 的線程數. 默認值 ‘1‘. 建議調大 |
state.backend.rocksdb.writebuffer.count | 每一個 column family 的 write buffer 數目,默認值 ‘2‘. 若是有須要能夠適當調大 |
state.backend.rocksdb.writebuffer.size | 每一個 write buffer 的 size,默認值‘64MB‘. 對於寫頻繁的場景,建議調大 |
state.backend.rocksdb.block.cache-size | 每一個 column family 的 block cache大小,默認值‘8MB’,若是存在重複讀的場景,建議調大 |
■ 慎重使用長 list
下圖展現的是目前 task 端 operator state 在執行完 checkpoint 返回給 job master 端的 StateMetaInfo 的代碼片斷。
因爲 operator state 沒有 key group 的概念,因此爲了實現改併發恢復的功能,須要對 operator state 中的每個序列化後的元素存儲一個位置偏移 offset,也就是構成了上圖紅框中的 offset 數組。
那麼若是你的 operator state 中的 list 長度達到必定規模時,這個 offset 數組就可能會有幾十 MB 的規模,關鍵這個數組是會返回給 job master,當 operator 的併發數目很大時,很容易觸發 job master 的內存超用問題。咱們遇到過用戶把 operator state 當作黑名單存儲,結果這個黑名單規模很大,致使一旦開始執行 checkpoint,job master 就會由於收到 task 發來的「巨大」的 offset 數組,而內存不斷增加直到超用沒法正常響應。
■ 正確使用 UnionListState
union list state 目前被普遍使用在 kafka connector 中,不過可能用戶平常開發中較少遇到,他的語義是從檢查點恢復以後每一個併發 task 內拿到的是原先全部operator 上的 state,以下圖所示:
kafka connector 使用該功能,爲的是從檢查點恢復時,能夠拿到以前的全局信息,若是用戶須要使用該功能,須要切記恢復的 task 只取其中的一部分進行處理和用於下一次 snapshot,不然有可能隨着做業不斷的重啓而致使 state 規模不斷增加。
■ 如何正確清空當前的 state
state.clear() 實際上只能清理當前 key 對應的 value 值,若是想要清空整個 state,須要藉助於 applyToAllKeys 方法,具體代碼片斷以下:
若是你的需求中只是對 state 有過時需求,藉助於 state TTL 功能來清理會是一個性能更好的方案。
■ RocksDB 中考慮 value 值很大的極限場景
受限於 JNI bridge API 的限制,單個 value 只支持 2^31 bytes 大小,若是存在很極限的狀況,能夠考慮使用 MapState 來替代 ListState 或者 ValueState,由於RocksDB 的 map state 並非將整個 map 做爲 value 進行存儲,而是將 map 中的一個條目做爲鍵值對進行存儲。
■ 如何知道當前 RocksDB 的運行狀況
比較直觀的方式是打開 RocksDB 的 native metrics ,在默認使用 Flink managed memory 方式的狀況下,state.backend.rocksdb.metrics.block-cache-usage ,state.backend.rocksdb.metrics.mem-table-flush-pending,state.backend.rocksdb.metrics.num-running-compactions 以及 state.backend.rocksdb.metrics.num-running-flushes 是比較重要的相關 metrics。
下面這張圖是 Flink-1.10 以後,打開相關 metrics 的示例圖:
而下面這張是 Flink-1.10 以前或者關閉 state.backend.rocksdb.memory.managed 的效果:
■ 容器內運行的 RocksDB 的內存超用問題
在 Flink-1.10 以前,因爲一個 state 獨佔若干 write buffer 和一塊 block cache,因此咱們會建議用戶不要在一個 operator 內建立過多的 state,不然須要考慮到相應的額外內存使用量,不然容易形成在容器內運行時,相關進程被容器環境所殺。對於用戶來講,須要考慮一個 slot 內有多少 RocksDB 實例在運行,一個 RocksDB 中有多少 state,總體的計算規則就很複雜,很難真得落地實施。
Flink-1.10 以後,因爲引入了 RocksDB 的內存託管機制,在絕大部分狀況下, RocksDB 的這一部分 native 內存是可控的,不過受限於 RocksDB 的相關 cache 實現限制(這裏暫不展開,後續會有文章討論),在某些場景下,沒法作到完美控制,這時候建議打開上文提到的 native metrics,觀察相關 block cache 內存使用是否存在超用狀況,能夠將相關內存添加到 taskmanager.memory.task.off-heap.size 中,使得 Flink 有更多的空間給 native 內存使用。
■ Checkpoint 間隔不要過短
雖然理論上 Flink 支持很短的 checkpoint 間隔,可是在實際生產中,太短的間隔對於底層分佈式文件系統而言,會帶來很大的壓力。另外一方面,因爲檢查點的語義,因此實際上 Flink 做業處理 record 與執行 checkpoint 存在互斥鎖,過於頻繁的 checkpoint,可能會影響總體的性能。固然,這個建議的出發點是底層分佈式文件系統的壓力考慮。
■ 合理設置超時時間
默認的超時時間是 10min,若是 state 規模大,則須要合理配置。最壞狀況是分佈式地建立速度大於單點(job master 端)的刪除速度,致使總體存儲集羣可用空間壓力較大。建議當檢查點頻繁由於超時而失敗時,增大超時時間。