衆所周知 Flink 是當前普遍使用的計算引擎,Flink 使用 checkpoint 機制進行容錯處理[1],Flink 的 checkpoint 會將狀態快照備份到分佈式存儲系統,供後續恢復使用。在 Alibaba 內部咱們使用的存儲主要是 HDFS,當同一個集羣的 Job 到達必定數量後,會對 HDFS 形成很是大的壓力,本文將介紹一種大幅度下降 HDFS 壓力的方法 -- 小文件合併。html
無論使用 FsStateBackend、RocksDBStateBackend 仍是 NiagaraStateBackend,Flink 在進行 checkpoint 的時候,TM 會將狀態快照寫到分佈式文件系統中,而後將文件句柄發給 JM,JM 完成全局 checkpoint 快照的存儲,以下圖所示。node
對於全量 checkpoint 來講,TM 將每一個 checkpoint 內部的數據都寫到同一個文件,而對於 RocksDBStateBackend/NiagaraStateBackend 的增量 checkpoint [2]來講,則會將每一個 sst 文件寫到一個分佈式系統的文件內。看成業量很大,且做業的併發很大時,則會對底層 HDFS 造成很是大的壓力:1)大量的 RPC 請求會影響 RPC 的響應時間(以下圖所示);2)大量文件對 NameNode 內存形成很大壓力。apache
在 Flink 中曾經嘗試使用 ByteStreamStateHandle 來解決小文件多的問題[3],將小於必定閾值的 state 直接發送到 JM,由 JM 統一寫到分佈式文件中,從而避免在 TM 端生成小文件。可是這個方案有必定的侷限性,閾值設置過小,還會有不少小文件生成,閾值設置太大,則會致使 JM 內存消耗太多有 OOM 的風險。安全
針對上面的問題咱們提出一種解決方案 -- 小文件合併。
在原來的實現中,每一個 sst 文件會打開一個
CheckpointOutputStream,每一個 CheckpointOutputStream 對應一個 FSDataOutputStream,將本地文件寫往一個分佈式文件,而後關閉 FSDataOutputStream,生成一個 StateHandle。以下圖所示:多線程
小文件合併則會重用打開的 FSDataOutputStream,直至文件大小達到預設的閾值爲止,換句話說多個 sst 文件會重用同一個 DFS 上的文件,每一個 sst 文件佔用 DFS 文件中的一部分,最終多個 StateHandle 共用一個物理文件,以下圖所示。併發
在接下來的章節中咱們會描述實現的細節,其中須要重點考慮的地方包括:異步
在第 3 節中闡述了小文件合併方案與現有方案的兼容性;第 4 節則會描述小文件合併方案的優點和不足;最後在第 5 節咱們展現在生產環境下取得的效果。分佈式
本節中咱們會詳細描述整個小文件合併的細節,以及其中的設計要點。
這裏咱們大體回憶一下 TM 端 Snapshot 的過程ide
其中上傳 sst 文件到分佈式存儲系統在上面的第三步,同一個 checkpoint 內的文件順序上傳,多個 checkpoint 的文件上傳可能同時進行。工具
Flink 天生支持併發 checkpoint,所以小文件合併方案也須要可以支持併發 checkpoint,若是不一樣 checkpoint 的 sst 文件同時寫往一個分佈式文件,則會致使文件內容損壞,後續沒法從該文件進行 restore。
在 FLINK-11937[4] 的提案中,咱們會將每一個 checkpoint 的 state 文件寫到同一個 HDFS 文件,不一樣 checkpoint 的 state 寫到不一樣的 HDFS 文件 -- 換句話說,HDFS 文件不跨 Checkpoint 共用,從而避免了多個客戶端同時寫入同一個文件的狀況。
後續咱們會繼續推動跨 Checkpoint 共用文件的方案,固然在跨 Checkpoint 共用文件的方案中,並行的 Checkpoint 也會寫往不一樣的 HDFS 文件。
複用底層文件以後,咱們使用引用計數追蹤文件的使用狀況,在文件引用數降爲 0 的狀況下刪除文件。可是在某些狀況下,文件引用數爲 0 的時候,並不表明文件不會被繼續使用,可能致使文件誤刪。下面咱們會詳>細描述開啓併發 checkpoint 後可能致使文件誤刪的狀況,以及解決方案。
咱們如下圖爲例,maxConcurrentlyCheckpoint = 2
上圖中共有 3 個 checkpoint,其中 chk-1 已經完成,chk-2 和 chk-3 都基於 chk-1 進行,chk-2 在 chk-3 前完成,chk-3 在註冊 4.sst
的時候發現,發現 4.sst
在 chk-2 中已經註冊過,會重用 chk-2 中 4.sst
對應的 stateHandle,而後取消 chk-3 中的 4.sst
的註冊,而且刪除 stateHandle,在處理完 chk-3 中 4.sst
以後,該 stateHandle 對應的分佈式文件的引用計數爲 0,若是咱們這個時候刪除分佈式文件,則會同時刪除 5.sst
對應的內容,致使後續沒法從 chk-3 恢復。
這裏的問題是如何在 stateHandle
對應的分佈式文件引用計數降爲 0 的時候正確判斷是否還會繼續引用該文件,所以在整個 checkpoint 完成處理以後再判斷某個分佈式文件可否刪除,若是真個 checkpoint 完成發現文件沒有被引用,則能夠安全刪除,不然不進行刪除。
使用小文件合併方案後,每一個 sst 文件對應分佈式文件中的一個 segment,以下圖所示
文件僅能在全部 segment 都再也不使用時進行刪除,上圖中有 4 個 segment,僅 segment-4 被使用,可是整個文件都不能刪除,其中 segment[1-3] 的空間被浪費掉了,從實際生產環境中的數據可知,總體的空間放大率(實際佔用的空間 / 真實有用的空間)在 1.3 - 1.6 之間。
爲了解決空間放大的問題,在 TM 端起異步線程對放大率超過閾值的文件進行壓縮。並且僅對已經關閉的文件進行壓縮。
整個壓縮的流程以下所示:
在 checkpoint 的過程當中,主要有兩種異常:JM 異常和 TM 異常,咱們將分狀況闡述。
JM 端主要記錄 StateHandle 以及文件的引用計數,引用計數相關數據不須要持久化到外存中,所以不須要特殊的處理,也不須要考慮 transaction 等相關操做,若是 JM 發送 failover,則能夠直接從最近一次 complete checkpoint 恢復,並重建引用計數便可。
TM 異常能夠分爲兩種:1)該文件在以前 checkpoint 中已經彙報過給 JM;2)文件還沒有彙報過給 JM,咱們會分狀況闡述。
像前面章節所說,咱們須要在 checkpoint 超時/失敗時,取消 TM 端的 snapshot,而 Flink 則沒有相應的通知機制,如今 FLINK-8871[5] 在追蹤相應的優化,咱們在內部增長了相關實現,當 checkpoint 失敗時會發送 RPC 數據給 TM,TM 端接受到相應的 RPC 消息後,會取消相應的 snapshot。
小文件合併功能支持從以前的版本無縫遷移過來。從以前的 checkpoint restore 的的步驟以下:
小文件合併主要影響的是第 2 步,從遠程下載對應數據的時候對不一樣的 StateHandle 進行適配,所以不影響總體的兼容性。
在該方案上線後,對 Namenode 的壓力大幅下降,下面的截圖來自線上生產集羣,從數據來看,文件建立和關閉的 RPC 有明顯降低,RPC 的響應時間也有大幅度下降,確保順利度過雙十一。
[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html
[2] https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
[3] https://www.slideshare.net/dataArtisans/stephan-ewen-experiences-running-flink-at-very-large-scale
[4] https://issues.apache.org/jira/browse/FLINK-11937
[5] https://issues.apache.org/jira/browse/FLINK-8871
本文做者:邱從賢(山智)
本文爲阿里雲內容,未經容許不得轉載。