阿里巴巴大規模應用Flink的踩坑經驗:如何大幅下降 HDFS 壓力?

衆所周知 Flink 是當前普遍使用的計算引擎,Flink 使用 checkpoint 機制進行容錯處理[1],Flink 的 checkpoint 會將狀態快照備份到分佈式存儲系統,供後續恢復使用。在 Alibaba 內部咱們使用的存儲主要是 HDFS,當同一個集羣的 Job 到達必定數量後,會對 HDFS 形成很是大的壓力,本文將介紹一種大幅度下降 HDFS 壓力的方法 -- 小文件合併。html

做者:邱從賢(山智)node

衆所周知 Flink 是當前普遍使用的計算引擎,Flink 使用 checkpoint 機制進行容錯處理[1],Flink 的 checkpoint 會將狀態快照備份到分佈式存儲系統,供後續恢復使用。在 Alibaba 內部咱們使用的存儲主要是 HDFS,當同一個集羣的 Job 到達必定數量後,會對 HDFS 形成很是大的壓力,本文將介紹一種大幅度下降 HDFS 壓力的方法 -- 小文件合併。apache

背景

無論使用 FsStateBackend、RocksDBStateBackend 仍是 NiagaraStateBackend,Flink 在進行 checkpoint 的時候,TM 會將狀態快照寫到分佈式文件系統中,而後將文件句柄發給 JM,JM 完成全局 checkpoint 快照的存儲,以下圖所示。安全

1.png

對於全量 checkpoint 來講,TM 將每一個 checkpoint 內部的數據都寫到同一個文件,而對於 RocksDBStateBackend/NiagaraStateBackend 的增量 checkpoint [2]來講,則會將每一個 sst 文件寫到一個分佈式系統的文件內。看成業量很大,且做業的併發很大時,則會對底層 HDFS 造成很是大的壓力:1)大量的 RPC 請求會影響 RPC 的響應時間(以下圖所示);2)大量文件對 NameNode 內存形成很大壓力。多線程

qps_1.jpg

qps_2.jpg

在 Flink 中曾經嘗試使用 ByteStreamStateHandle 來解決小文件多的問題[3],將小於必定閾值的 state 直接發送到 JM,由 JM 統一寫到分佈式文件中,從而避免在 TM 端生成小文件。可是這個方案有必定的侷限性,閾值設置過小,還會有不少小文件生成,閾值設置太大,則會致使 JM 內存消耗太多有 OOM 的風險。併發

1 小文件合併方案

針對上面的問題咱們提出一種解決方案 -- 小文件合併。
在原來的實現中,每一個 sst 文件會打開一個
CheckpointOutputStream,每一個 CheckpointOutputStream 對應一個 FSDataOutputStream,將本地文件寫往一個分佈式文件,而後關閉 FSDataOutputStream,生成一個 StateHandle。以下圖所示:異步

2.png

小文件合併則會重用打開的 FSDataOutputStream,直至文件大小達到預設的閾值爲止,換句話說多個 sst 文件會重用同一個 DFS 上的文件,每一個 sst 文件佔用 DFS 文件中的一部分,最終多個 StateHandle 共用一個物理文件,以下圖所示。分佈式

3.png

在接下來的章節中咱們會描述實現的細節,其中須要重點考慮的地方包括:ide

  1. 併發 checkpoint 的支持
    Flink 天生支持併發 checkpoint,小文件合併方案則會將多個文件寫往同一個分佈式存儲文件中,若是考慮不當,數據會寫串或者損壞,所以咱們須要有一種機制保證該方案的正確性,詳細描述參考 2.1 節
  2. 防止誤刪文件
    咱們使用引用計數來記錄文件的使用狀況,僅經過文件引用計數是否降爲 0 進行判斷刪除,則可能誤刪文件,如何保證文件不會被錯誤刪除,咱們將會在 2.2 節進行闡述
  3. 下降空間放大
    使用小文件合併以後,只要文件中還有一個 statehandle 被使用,整個分佈式文件就不能被刪除,所以會佔用更多的空間,咱們在 2.3 節描述瞭解決該問題的詳細方案
  4. 異常處理
    咱們將在 2.4 節闡述如何處理異常狀況,包括 JM 異常和 TM 異常的狀況
  5. 2.5 節中會詳細描述在 Checkpoint 被取消或者失敗後,如何取消 TM 端的 Snapshot,若是不取消 TM 端的 Snapshot,則會致使 TM 端實際運行的 Snapshot 比正常的多

在第 3 節中闡述了小文件合併方案與現有方案的兼容性;第 4 節則會描述小文件合併方案的優點和不足;最後在第 5 節咱們展現在生產環境下取得的效果。工具

2 設計實現

本節中咱們會詳細描述整個小文件合併的細節,以及其中的設計要點。
這裏咱們大體回憶一下 TM 端 Snapshot 的過程

  1. TM 端 barrier 對齊
  2. TM Snapshot 同步操做
  3. TM Snapshot 異步操做

其中上傳 sst 文件到分佈式存儲系統在上面的第三步,同一個 checkpoint 內的文件順序上傳,多個 checkpoint 的文件上傳可能同時進行。

2.1 併發 checkpoint 支持

Flink 天生支持併發 checkpoint,所以小文件合併方案也須要可以支持併發 checkpoint,若是不一樣 checkpoint 的 sst 文件同時寫往一個分佈式文件,則會致使文件內容損壞,後續沒法從該文件進行 restore。

在 FLINK-11937[4] 的提案中,咱們會將每一個 checkpoint 的 state 文件寫到同一個 HDFS 文件,不一樣 checkpoint 的 state 寫到不一樣的 HDFS 文件 -- 換句話說,HDFS 文件不跨 Checkpoint 共用,從而避免了多個客戶端同時寫入同一個文件的狀況。

後續咱們會繼續推動跨 Checkpoint 共用文件的方案,固然在跨 Checkpoint 共用文件的方案中,並行的 Checkpoint 也會寫往不一樣的 HDFS 文件。

2.2 防止誤刪文件

複用底層文件以後,咱們使用引用計數追蹤文件的使用狀況,在文件引用數降爲 0 的狀況下刪除文件。可是在某些狀況下,文件引用數爲 0 的時候,並不表明文件不會被繼續使用,可能致使文件誤刪。下面咱們會詳>細描述開啓併發 checkpoint 後可能致使文件誤刪的狀況,以及解決方案。

咱們如下圖爲例,maxConcurrentlyCheckpoint = 2

concurrent.png

上圖中共有 3 個 checkpoint,其中 chk-1 已經完成,chk-2 和 chk-3 都基於 chk-1 進行,chk-2 在 chk-3 前完成,chk-3 在註冊 4.sst 的時候發現,發現 4.sst 在 chk-2 中已經註冊過,會重用 chk-2 中 4.sst 對應的 stateHandle,而後取消 chk-3 中的 4.sst 的註冊,而且刪除 stateHandle,在處理完 chk-3 中 4.sst 以後,該 stateHandle 對應的分佈式文件的引用計數爲 0,若是咱們這個時候刪除分佈式文件,則會同時刪除 5.sst 對應的內容,致使後續沒法從 chk-3 恢復。

這裏的問題是如何在 stateHandle 對應的分佈式文件引用計數降爲 0 的時候正確判斷是否還會繼續引用該文件,所以在整個 checkpoint 完成處理以後再判斷某個分佈式文件可否刪除,若是真個 checkpoint 完成發現文件沒有被引用,則能夠安全刪除,不然不進行刪除。

2.3 下降空間放大

使用小文件合併方案後,每一個 sst 文件對應分佈式文件中的一個 segment,以下圖所示

segment.png

文件僅能在全部 segment 都再也不使用時進行刪除,上圖中有 4 個 segment,僅 segment-4 被使用,可是整個文件都不能刪除,其中 segment[1-3] 的空間被浪費掉了,從實際生產環境中的數據可知,總體的空間放大率(實際佔用的空間 / 真實有用的空間)在 1.3 - 1.6 之間。

eb2e01446559af416a93e7e0664a6526.jpg

爲了解決空間放大的問題,在 TM 端起異步線程對放大率超過閾值的文件進行壓縮。並且僅對已經關閉的文件進行壓縮。

整個壓縮的流程以下所示:

  1. 計算每一個文件的放大率
  2. 若是放大率較小則直接跳到步驟 7
  3. 若是文件 A 的放大率超過閾值,則生成一個對應的新文件 A‘(若是這個過程當中建立文件失敗,則由 TM 負責清理工做)
  4. 記錄 A 與 A’ 的映射關係
  5. 在下一次 checkpoint X 往 JM 發送落在文件 A 中的 StateHandle 時,則使用 A` 中的信息生成一個新的 StateHandle 發送給 JM
  6. checkpoint X 完成後,咱們增長 A‘ 的引用計數,減小 A 的引用計數,在引用計數降爲 0 後將文件 A 刪除(若是 JM 增長了 A’ 的引用,而後出現異常,則會從上次成功的 checkpoint 從新構建整個引用計數器)
  7. 文件壓縮完成

2.4 異常狀況處理

在 checkpoint 的過程當中,主要有兩種異常:JM 異常和 TM 異常,咱們將分狀況闡述。

2.4.1 JM 異常

JM 端主要記錄 StateHandle 以及文件的引用計數,引用計數相關數據不須要持久化到外存中,所以不須要特殊的處理,也不須要考慮 transaction 等相關操做,若是 JM 發送 failover,則能夠直接從最近一次 complete checkpoint 恢復,並重建引用計數便可。

2.4.2 TM 異常

TM 異常能夠分爲兩種:1)該文件在以前 checkpoint 中已經彙報過給 JM;2)文件還沒有彙報過給 JM,咱們會分狀況闡述。

  1. 文件已經彙報過給 JM

文件彙報過給 JM,所以在 JM 端有文件的引用計數,文件的刪除由 JM 控制,當文件的引用計數變爲 0 以後,JM 將刪除該文件。

  1. 文件還沒有彙報給 JM

該文件暫時還沒有彙報過給 JM,該文件再也不被使用,也不會被 JM 感知,成爲孤兒文件。這種狀況暫時有外圍工具統一進行清理。

2.5 取消 TM 端 snapshot

像前面章節所說,咱們須要在 checkpoint 超時/失敗時,取消 TM 端的 snapshot,而 Flink 則沒有相應的通知機制,如今 FLINK-8871[5] 在追蹤相應的優化,咱們在內部增長了相關實現,當 checkpoint 失敗時會發送 RPC 數據給 TM,TM 端接受到相應的 RPC 消息後,會取消相應的 snapshot。

3 兼容性

小文件合併功能支持從以前的版本無縫遷移過來。從以前的 checkpoint restore 的的步驟以下:

  1. 每一個 TM 分到本身須要 restore 的 state handle
  2. TM 從遠程下載 state handle 對應的數據
  3. 從本地進行恢復

小文件合併主要影響的是第 2 步,從遠程下載對應數據的時候對不一樣的 StateHandle 進行適配,所以不影響總體的兼容性。

4 優點和不足

  • 優點:大幅度下降 HDFS 的壓力:包括 RPC 壓力以及 NameNode 內存的壓力
  • 不足:不支持 State 多線程上傳的功能(State 上傳暫時不是 checkpoint 的瓶頸)

5 線上環境的結果

在該方案上線後,對 Namenode 的壓力大幅下降,下面的截圖來自線上生產集羣,從數據來看,文件建立和關閉的 RPC 有明顯降低,RPC 的響應時間也有大幅度下降,確保順利度過雙十一。

rpc_all.png
rpc_close.png
rpc_create.png
rpc_holdlock.png

參考文獻

[1] https://ci.apache.org/project...

[2] https://flink.apache.org/feat...

[3] https://www.slideshare.net/da...

[4] https://issues.apache.org/jir...

[5] https://issues.apache.org/jir...

相關文章
相關標籤/搜索