簡介: 本文介紹字節跳動在過去一段時間裏作的兩個主要的 Feature,一是在 Network 層的單點恢復的功能,二是 Checkpoint 層的 Regional Checkpoint。性能優化
做者|廖嘉逸網絡
摘要:本文介紹字節跳動在過去一段時間裏作的兩個主要的 Feature,一是在 Network 層的單點恢復的功能,二是 Checkpoint 層的 Regional Checkpoint。內容包括:數據結構
單點恢復機制Regional Checkpoint在 Checkpoint 的其它優化挑戰 & 將來規劃併發
做者分享原版視頻回顧:https://www.bilibili.com/video/BV13a4y1H7XY?p=2ide
1、單點恢復機制
在字節跳動的實時推薦場景中,咱們使用 Flink 將用戶特徵與用戶行爲進行實時拼接,拼接樣本做爲實時模型的輸入。拼接服務的時延和穩定性直接影響了線上產品對用戶的推薦效果,而這種拼接服務在 Flink 中是一個相似雙流 Join 的實現,Job 中的任何一個 Task 或節點出現故障,都會致使整個 Job 發生 Failover,影響對應業務的實時推薦效果。高併發
在介紹單點恢復以前,咱們回顧一下 Flink 的 Failover 策略。性能
- Individual-Failover:
只重啓出錯的 Task,適用於 Task 間無鏈接的狀況,應用場景有限。測試
- Region-Failover:
該策略會將做業中的全部 Task 劃分爲數個 Region。當有 Task 發生故障時,它會嘗試找出進行故障恢復須要重啓的最小 Region 集合。相比於全局重啓故障恢復策略,這種策略在一些場景下的故障恢復須要重啓的 Task 會更少。優化
若是使用 Region-Failover 策略,但由於 Job 是一個全鏈接的拓撲,自己就是一個大 region。重啓 region 至關於重啓整個 Job,因此咱們考慮是否能夠用 Flink Individual-task-failover 策略去替代 Region-failover 策略?而 Individual-task-failover 的策略在這種拓撲下是徹底不適用的。因此咱們對於如下特徵的場景,須要設計開發一個新的 Failover 策略:阿里雲
- 多流 Join 拓撲
- 流量大(30M QPS)、高併發度(16K*16K)
- 容許短期內小量部分數據丟失
- 對數據持續輸出型要求高
在講述技術方案以前,看一下 Flink 現有的數據傳輸機制。
從左往右看(SubTaskA):
- 當數據流入時會先被 RecordWriter 接收
- RecordWriter 根據數據的信息,例如 key,將數據進行 shuffle 選擇對應的 channel
- 將數據裝載到 buffer 中,並放到 channel 對應的 buffer 隊列裏
- 經過 Netty Server 向下遊發送
- 下游 Netty Client 接收數據
- 根據 buffer 中的分區信息,轉發發到下游對應的 channel 中
- 由 InputProcessor 將數據從 buffer 中取出,執行 operator 邏輯
根據上面提出的思路咱們要解決如下幾個問題:
- 如何讓上游 Task 感知下游 Failure
- 下游 Task 失敗後,如何讓上游 Task 向正常的 Task 發送數據
- 上游 Task 失敗後,如何讓下游 Task 繼續消費 buffer 中的數據
- 上下游中不完整的數據如何處理
- 如何創建新的鏈接
針對上述問題提出解決方案。
■ 如何讓上游 Task 感知下游 Failure
下游 SubTask 主動將失敗信息傳遞給上游,或者 TM 被關閉上游 Netty Server 也能夠感知到。圖中用 X 表示不可用的 SubPartition。
首先將 SubPartition1 和對應的 view (Netty Server 用來取 SubPartition 數據的一個結構)置爲不可用。
以後當 Record Writer 接收到新數據須要向 SubPartition1 發送數據,此時須要進行一個可用性判斷,當 SubPartition 狀態可用則正常發送,不可用直接丟棄數據。
■ 上游 Task 接收到下游 Task 新的鏈接
下游 subTask 被從新調度啓動後,向上遊發送 Partition Request,上游 Netty Server 收到 Partition Request 後從新給下游 SubTask 建立對用的 View, 此時上游 Record Writer 就能夠正常寫數據。
■ 下游 Task 感知上游 Task 失敗
一樣的下游 Netty Client 能感知到上游有 subTask 失敗了,這時找出對應的 channel ,在末尾插入一個不可用的事件(這裏用感嘆號來表示事件)。咱們的目的是想要儘量的少丟數據,此時 channel 中的 buffer 任能夠被 InputProcessor 正常消費,直到讀取到「不可用事件」。再進行 channel 不可用標記和對應的 buffer 隊列清理。
■ Buffer 中有不完整的數據
首先要知道不完整的數據存放在哪裏,它存在於 input process 的內部,input process 會給每個 channel 維護一個小的 buffer 隊列。當收到一個 buffer ,它是不完整的數據,那麼等到接收到下一個 buffer 後再拼接成一條完整的數據發往 operator。
■ 下游 Task 和上游 Task 從新鏈接
當上遊有問題的 Task 被從新調度後,經過調用 TaskManager API 來通知下游。下游 Shuffle Environment 收到通知後判斷對應的 channel 狀態,若是是不可,用直接生成新的 channel 並釋放掉老的。若是是可用狀態,說明 channel 的 buffer 沒有消費完,須要等待 buffer 消費完再進行替換操做。
業務收益
上圖是以 4000 並行度的做業爲例作了對比測試。業務是將一個用戶展示流和一個用戶行爲流的進行 Join,整個做業共有 12000個 Task。
上圖中 單點恢復(預留資源)是使用調度組作的一個 feature,在申請資源的時,選擇額外多申請一些資源,當發生 failover 時省去了從 YARN 去申請資源的時間開銷。
最後作到了做業的輸出減小千分之一,恢復時間約 5 秒。由於整個恢復過程時間較短,能夠基本作到下游無感知。
2、Regional Checkpoint
在一個比較經典的數據集成場景,數據導入導出。好比從 Kafka 導入到 Hive,知足下面幾個特徵。
- 拓撲中沒有 All-to-All 的鏈接
- 強依賴 Checkpoint 來實現 Exactly-Once 語義下的數據輸出
- Checkpoint 間隔長,對成功率要求高
在這種狀況下,數據沒有任何的 shuffle 。
在數據集成的場景中遇到哪些問題?
- 單個 Task Checkpoint 失敗會影響全局的 Checkpoint 輸出
- 網絡抖動、寫入超時/失敗、存儲環境抖動對做業的影響過於明顯
- 2000並行以上的做業成功率明顯降低,低於業務預期
在這裏,咱們想到做業會根據 region-failover 策略將做業的拓撲劃分爲多個 region。那麼 Checkpoint 是否能夠採起相似的思路,將 checkpoint 以 region 的單位來管理?答案是確定的。
在這種狀況下不須要等到全部 Task checkpoint 完成後纔去作分區歸檔操做(例如 HDFS 文件 rename)。而是當某個 region 完成後便可進行 region 級別的 checkpoint 歸檔操做。
介紹方案以前先簡單回顧 Flink 現有的 checkpoint 機制。相信你們都比較熟悉。
現有 ckp
上圖中是一個 Kafka source 和 Hive sink 算子的拓撲,並行度爲 4 的例子。
首先 checkpoint coordinator 觸發 triggerCheckpoint 的操做,發送到各個 source task。在 Task 收到請求以後,觸發 Task 內的 operator 進行 snapshot 操做。例子中有 8 個 operator 狀態。
現有 ckp1
在各 operator 完成 snapshot 後,Task 發送 ACK 消息給 checkpoint coordinator 表示當前 Task 已經完成了 Checkpoint。
以後當 coordinator 收到全部 Task 成功的 ACK 消息,那麼 checkpont 能夠認爲是成功了。最後觸發 finalize 操做,保存對應的 metadata。通知全部 Task checkpoint 完成。
當咱們使用 Region 方式去管理 checkpoint 時會遇到什麼問題?
- 如何劃分 Checkpoint Region
把彼此沒有鏈接的 Task 集合,劃分爲 1 個 region。顯而易見例子中有四個 Region。
- 失敗 Region 的 Checkpoint 結果如何處理
假設第一次 checkpoint 能正常完成,每一個 operator 對應的狀態都成功寫入 HDFS checkpoint1 目錄中,並經過邏輯映射,將 8 個operator 映射到 4 個 checkpoint region。注意僅僅是邏輯映射,並無對物理文件作出任何移動和修改。
現有 ckp1
第二次進行 checkpoint 時 region-4-data(Kafka-4,Hive-4)checkpoint 失敗。checkpoint2 (job/chk_2)目錄中沒有對應 Kafka-4-state 和 Hive-4-state 文件,當前 checkpoint2 是不完整的。爲了保證完整,從上一次或以前成功的 checkpoint 文件中尋找 region-4-data 成功的 state 文件,並進行邏輯映射。這樣當前 checkpoint 每一個 region 狀態文件就完整了,能夠認爲 checkpoint 完成。
此時若是發生大部分或全部 region 都失敗,若是都引用前一次 checkpoint 那麼當前這個 checkpoint 和上一個 checkpoint 相同也就沒有意義了。
經過配置 region 最大失敗比例, 好比 50%,例子中 4 個 region ,最多能接受兩個 region 失敗。
- 如何避免在文件系統上存儲過多的 Checkpoint 歷史數據
若是有某個 region 一直失敗(遇到髒數據或代碼邏輯問題),當前的機制會致使把全部歷史 checkpoint 文件都保留下來,顯然這是不合理的。
經過配置支持 region 最大連續失敗次數。例如2表示 region 最多能引用前兩次的 checkpoint 成功的 region 結果。
工程實現難點
- 如何處理 Task Fail 和 checkpoint timeout
- 同一 region 內已經 snapshot 成功的 subTask 狀態如何處理
- 如何保證和 checkpoint Coordinator 的兼容性
來看目前 Flink 是如何作的。
現有 coordinator
當發生 Task failure ,先會通知到 JobMaster FailoverStrategy,經過 FailoverStrategy 來通知 checkpoint coordinator 進行 checkpoint cancel 操做。
那麼 checkpoint timeout 狀況如何處理?當 coordinator 觸發 checkpoint 時,會開啓 checkpoint canceller。canceller 內有一個定時器,當超過預設時間而且 coordinator 還未完成 checkpoint,說明出現timeout,通知 coordinator cancel 本次 checkpoint。
不管是 Task fail 仍是 timeout 最終都會指向 pendding checkpoint,而且當前指向的 checkpoint 就會被丟棄。
在作出相應修改前先梳理 checkpoint 相關的 Message,和 checkpoint coordinator 會作出的反應。
Global checkpoint 爲 Flink 現有機制。
爲了保持和 checkpoint Coordinator 兼容性,添加一個 CheckpointHandle 接口。並添加了兩個實現分別是 GlobalCheckpointHandle 和 RegionalCheckpointHandle 經過過濾消息的方式實現 global checkpoint 和 region checkpoint 相關操做。
region checkpoint 提一點。若是 handler 接收到失敗消息,將這個 region 置爲失敗,並嘗試從以前的 successful checkpoint 進行 region 邏輯映射。一樣 coordinator 發送 nofityComplate 消息也會先通過 handler 的過濾,過濾掉髮送給失敗 Task 的消息。
業務收益
測試在 5000 並行度下,假設單個 Task snapshot 的成功率爲 99.99%。使用 Global checkpoint 的成功率爲 60.65%, 而使用 Region checkpoint 任然能保持 99.99%。
3、Checkpoint 上的其它優化
■ 並行化恢復 operator 狀態
union state 是一種比較特殊的狀態,在恢復時須要找到 job 全部的 Task state 再進行 union 恢復到單個 Task 中。若是 Job 並行度很是大,如 10000, 那麼每一個 task 的 union state 進行恢復時至少須要讀取 10000 個文件。若是串行恢復這 10000 個文件裏的狀態,那麼恢復的耗時可想而知是很是漫長的。
雖然 OperatorState 對應的數據結構是沒法進行並行操做的,可是咱們讀取文件的過程是能夠並行化的,在 OperatorStateBackend 的恢復過程當中,咱們將讀取 HDFS 文件的過程並行化,等到全部狀態文件解析到內存後,再用單線程去處理,這樣咱們能夠將幾十分鐘的狀態恢復時間減小到幾分鐘。
■ 加強 CheckpointScheduler 並支持 Checkpoint 整點觸發
Flink checkpoint 的 interval,timeout 在任務提交以後是沒法修改的。但剛上線時只能根據經驗值進行設置。而每每在做業高峯期時會發現 interval,timeout 等參數設置不合理。這時一般一個方法是修改參數重啓任務,對業務影響比較大,顯然這種方式是不合理的。
在這裏,咱們對 CheckpointCoordinator 內部的 Checkpoint 觸發機制作了重構,將已有的 Checkpoint 觸發流程給抽象出來,使得咱們能夠很快地基於抽象類對 Checkpoint 觸發機制進行定製化。好比在支持數據導入的場景中,爲了更快地造成 Hive 分區,咱們實現了整點觸發的機制,方便下游儘快地看到數據。
還有不少優化點就不一一列舉了。
4、挑戰 & 將來規劃
目前字節內部的做業狀態最大能達到 200TB 左右的水平,而對於這種大流量和大狀態的做業,直接使用 RocksDB StateBackend 是沒法支撐的。因此將來,咱們會以後繼續會在 state 和 checkpoint 性能優化和穩定性上作更多的工做,好比強化已有的 StateBackend、解決傾斜和反壓下 Checkpoint 的速率問題、加強調試能力等。
本文爲阿里雲原創內容,未經容許不得轉載。