簡介: 在 Flink 現有的架構設計中,多流 Join 拓撲下單個 Task 失敗會致使全部 Task 從新部署,耗時可能會持續幾分鐘,致使做業的輸出斷流,這對於線上業務來講是不可接受的。針對這一痛點,字節提出單點恢復的方案。緩存
在字節跳動的實時計算場景中,咱們有不少任務(數量 2k+)會直接服務於線上,其輸出時延和穩定性會直接影響線上產品的用戶體驗,這類任務一般具備以下特色:網絡
在 Flink 現有的架構設計中,多流 Join 拓撲下單個 Task 失敗會致使全部 Task 從新部署,耗時可能會持續幾分鐘,致使做業的輸出斷流,這對於線上業務來講是不可接受的。多線程
針對這一痛點,咱們提出單點恢復的方案,經過對 network 層的加強,使得在機器下線或者 Task 失敗的狀況下,以短期內故障 Task 的部分數據丟失爲代價,達成如下目標:架構
當初遇到這些問題的時候,咱們提出的想法是說能不能在機器故障下線的時候,只讓在這臺機器上的 Tasks 進行 Failover,而這些 Tasks 的上下游 Tasks 能剛好感知到這些失敗的 Tasks,並做出對應的措施:併發
根據這些想法咱們思考得出幾個比較關鍵點在於:ide
基於以上考慮咱們決定基於已有的 Network 層線程模型,修改上下游對於 Task Failed 後的處理邏輯,讓非故障的 Tasks 短期內完成對失敗 Task 的感知操做,從而使得做業持續穩定地輸出。性能
注:咱們的實現基於 Flink-1.9,1.11 後的網絡模型加入了 Unaligned Checkpoint 的特性,可能會有所變化。測試
咱們先將 Flink 的上下游 Task 通訊模型簡單抽象一下:優化
上下游 Task 感知彼此狀態的邏輯,分三種狀況考慮:spa
能夠看到,在大部分狀況下,Task 是能夠直接感知到上下游 Task 的狀態變化。瞭解了基礎的通訊模型以後,咱們能夠按照以前的解決思路繼續深刻一下,分別在上游發送端和下游接收端能夠作什麼樣改進來實現單點恢復。
根據咱們的解決思路,咱們來繪製一下單個 Task 掛了以後,整個 Job 的通訊流程:
Map(1) 失敗以後:
從這個流程,咱們能夠將優化分爲三個模塊,分別爲上游發送端、下游接收端和 JobManager。
咱們再細化一下上游發送端的相關細節,
(1) Netty Server 收到 Client 發送的 Partition Request 後,在對應的 Subpartition 註冊讀取數據的 SubpartitionView 和 Reader。
(2) RecordWriter 發送數據到不一樣的 Subpartitions,每一個 Subpartition 內部維護一個 buffer 隊列,並將讀取數據的 Reader 放入到 Readers Queue 中。(Task 線程)
(3) Netty 線程讀取 Readers Queue,取出對應的 Reader 並讀取對應 Subpartition 中的 buffer 數據,發送給下游。(Netty 線程)
咱們的指望是上游發送端在感知到下游 Task 失敗以後,直接將發送到對應 Task 的數據丟棄。那麼咱們的改動邏輯,在這個示意圖中,就是 Subpartition 經過 Netty Server 收到下游 Task Fail 的消息後,將本身設置爲 Unavailable,而後 RecordWriter 在發送數據到指定 Subpartition 時,判斷是否可用,若是不可用則直接將數據丟棄。而當 Task Failover 完成後從新與上游創建鏈接後,再將該 Subpartition 置爲 Available,則數據能夠從新被消費。
發送端的改動比較簡單,得益於 Flink 內部對 Subpartition 的邏輯作了很好的抽象,而且能夠很容易的經過參數來切換 Subpartition 初始化的類型,咱們在這裏參考 PipelinedSubpartition 的實現,根據上述的邏輯,實現了咱們本身的 Subpartition 和對應的 View。
一樣,咱們來細化一下下游接收端的細節:
仔細來看,其實和上游的線程模型很有相似之處:
(1) InputGate 初始化全部的 Channel 並經過 Netty Client 和上游 Server 創建鏈接。
(2) InputChannel 接收到數據後,緩存到 buffer 隊列中並將本身的引用放入到 Channels Queue 裏。(Netty 線程)
(3) InputGate 經過 InputProcessor 的調用,從 Queue 里拉取 Channel 並讀取 Channel 中緩存的 buffer 數據,若是 buffer 不完整(好比只有半條 record),那麼則會將不完整的 buffer 暫存到 InputProcessor 中。(Task 線程)
這裏咱們指望下游接收端感知到上游 Task 失敗以後,能將對應 InputChannel 的接收到的不完整的 buffer 直接清除。不完整的 buffer 存儲在 InputProcessor 中,那麼咱們如何讓 InputProcessor 知道哪一個 Channel 出現了問題?
簡單的方案是說,咱們在 InputChannel 中直接調用 InputGate 或者 InputProcessor,作 buffer 清空的操做,可是這樣引入一個問題,因爲 InputChannel 收到 Error 是在 Netty 線程,而 InputProcessor 的操做是在 Task 線程,這樣跨線程的調用打破了已有的線程模型,必然會引入鎖和調用時間的不肯定性,增長架構設計的複雜度,而且由於 InputProcessor 會對每一條 record 都有調用,稍有不慎就會帶來性能的降低。
咱們沿用已有的線程模型,Client 感知到上游 Task 失敗的消息以後告知對應的 Channel,Channel 向本身維護的 receivedBuffers 的末尾插入一個 UnavailableEvent,並等待 InputProcessor 拉取並清空對應 Channel 的 buffer 數據。示意圖以下所示,紅色的模塊是咱們新增的部分:
JobManager 重啓策略能夠參考社區已有的 RestartIndividualStrategy,比較重要的區別是,在從新 deploy 這個失敗的 Task 後,咱們須要經過 ExecutionGraph 中的拓撲信息,找到該 Task 的下游 Tasks,並經過 Rpc 調用讓下游 Tasks 和這個新的上游 Tasks 從新創建鏈接。
這裏實現有一個難點是若是 JobManager 去 update 下游的 Channel 信息時,舊的 Channel 對應的 buffer 數據尚未被清除怎麼辦?咱們這裏經過新增 CachedChannelProvider 來處理這一邏輯:
如圖所示,以 Channel - 1 爲例,若是 JobManager 更新 Channel 的 Rpc 請求到來時 Channel 處於不可用狀態,那麼咱們直接利用 Rpc 請求中攜帶的 Channel 信息來從新初始化 Channel。以 Channel - 3 爲例,若是 Rpc 請求到來時 Channel 仍然可用,那麼咱們將 Channel 信息暫時緩存起來,等 Channel - 3 中全部數據消費完畢後,通知 CachedChannelProvider,而後再經過 CachedChannelProvider 去更新 Channel。
這裏還須要特別提到一點,在字節跳動內部咱們實現了預留 TaskManager 的功能,當 Task 出現 Failover 時,可以直接使用 TaskManager 的資源,大大節約了 Failover 過程數據丟失的損耗。
整個解決的思路實際上是比較清晰的,相信你們也比較容易理解,可是在實現中仍然有不少須要注意的地方,舉例以下:
目前在字節跳動內部,單點恢復功能已經上線了 1000+ 做業,在機器下線、網絡抖動的狀況下,下游在上游做業作 Failover 的過程幾乎沒有感知。
接下來咱們如下面這個做業拓撲爲例,在做業正常運行時咱們手動 Kill 一個 Container,來看看不一樣並行度做業開啓單點恢復的效果:
咱們在 1000 和 4000 並行度的做業上進行測試,每一個 slot 中有 2 個 Source 和 1 個 Joiner 共 3 個 Task,手動 Kill 一個 Container 後,從故障恢復時間和斷流影響兩個維度進行收益計算:
結論:能夠看到,在 4000 個 Slot 的做業裏,若是不開啓單點恢復,做業總體的 Failover 時間爲 81s,同時對於下游服務來講,上游服務斷流 81s,這在實時服務線上的場景中明顯是不可接受的。而開啓了單點恢復和預留資源後,Kill 1 個 Container 只會影響 4 個 Slot,且 Failover 的時間只有 5s,同時對於下游服務來講,上游服務產生的數據減小 4/4000=千分之一,持續 5s,效果是很是顯而易見的。