做者:沈磊(有贊大數據)緩存
有贊實時任務主要以 Flink 爲主,爲了保證明時任務的容錯恢復以及中止重啓時的狀態恢復,幾乎全部的實時任務都會開啓 Checkpoint 或者觸發 Savepoint 進行狀態保存。因爲 Savepoint 底層原理的實現和 Checkpoint 幾乎一致,本文結合 Flink 1.9 版本,重點講述 Flink Checkpoint 原理流程以及常見緣由分析,讓用戶可以更好的理解 Flink Checkpoint,從而開發出更健壯的實時任務。網絡
Flink Checkpoint 是一種容錯恢復機制。這種機制保證了實時程序運行時,即便忽然遇到異常或者機器問題時也可以進行自我恢復。Flink Checkpoint 對於用戶層面來講,是透明的,用戶會感受實時任務一直在運行。架構
Flink Checkpoint 是 Flink 自身的系統行爲,用戶沒法對其進行交互,用戶能夠在程序啓動以前,設置好實時任務 Checkpoint 相關的參數,當任務啓動以後,剩下的就全交給 Flink 自行管理。併發
實時任務不一樣於批處理任務,除非用戶主動中止,通常會一直運行,運行的過程當中可能存在機器故障、網絡問題、外界存儲問題等等,要想實時任務一直可以穩定運行,實時任務要有自動容錯恢復的功能。而批處理任務在遇到異常狀況時,在從新計算一遍便可。實時任務由於會一直運行的特性,若是在從頭開始計算,成本會很大,尤爲是對於那種運行時間好久的實時任務來講。框架
實時任務開啓 Checkpoint 功能,也可以減小容錯恢復的時間。由於每次都是從最新的 Chekpoint 點位開始狀態恢復,而不是從程序啓動的狀態開始恢復。舉個列子,若是你有一個運行一年的實時任務,若是容錯恢復是從一年前啓動時的狀態恢復,實時任務可能須要運行好久才能恢復到如今狀態,這通常是業務方所不容許的。函數
Flink Checkpoint 會將實時任務的狀態存儲到遠端存儲,好比 HDFS ,亞馬遜的 S3 等等。Flink 任務狀態能夠理解爲實時任務計算過程當中,中間產生的數據結果,同時這些計算結果會在後續實時任務處理時,可以繼續進行使用。實時任務的狀態能夠是一個聚合結果值,好比 WordCount 統計的每一個單詞的數量,也能夠是消息流中的明細數據。大數據
Flink 任務狀態總體能夠劃分兩種:Operator 狀態和 KeyedState。常見的 Operator 狀態,好比 Kafka Topic 每一個分區的偏移量。KeyedState 是基於 KeyedStream 來使用的,因此在使用前,你須要對你的流經過 keyby 來進行分區,常見的狀態好比有 MapState、ListState、ValueState 等等。優化
下面是一個實時計算奇數和偶數的任務的示例:spa
在上圖中,假如輸入的流來自於 Kafka ,那麼 Kafka Topic 分區的偏移量是狀態,全部奇數的和、全部偶數的和也都是狀態。線程
想要使用 Flink Checkpoint 功能,首先是要在實時任務開啓 Checkpoint。Flink 默認狀況下是關閉 Checkpoint 功能,下面代碼是開啓 Checkpoint :
上述代碼中,設置了 Flink Checkpoint 的間隔 3 秒,設置的 Checkpoint 的語義爲 EXACTLY_ONCE。Flink 默認的 Checkpoint 語義爲 EXACTLY_ONCE。上述代碼也使用 RocksDBStateBackend 進行狀態存儲。用戶也能夠本身設置 Flink Checkpoint 的參數,經過 CheckpointConfig 這個類進行設置,代碼以下:
CheckpointConfig chkConfig = env.getCheckpointConfig(); /** 調用 CheckpointConfig 各類 set 方法 */ chkConfig.setX
Flink 總體做業採用主從架構,Master 爲 JobManager,Slave 爲 TaskManager,Client 則是負責提交用戶實時任務的代碼邏輯 ,Flink 總體框架圖以下圖所示:
JobManager 主要負責實時任務的調度以及對 Checkpoint 的觸發,TaskManager 負責真正用戶的代碼執行邏輯,具體表現形式則是 Task 在 TaskManager上面進行運行,一個 Task 對應一個線程,它可能運行一個算子的 SubTask,也多是運行多個 Chain 起來的算子的 SubTask。
Flink 實時任務一次 Checkpoint 的參與者主要包括三塊:JobManager、TaskManager以及 Zookeeper。JobManager 定時會觸發執行 Checkpoint,具體則是在 JobManager 中運行的 CheckpointCoordinator 中觸發全部 Source 的 SubTask 向下遊廣播 CheckpointBarrier。
TaskManager 收到 CheckpointBarrier 後,根據 Checkpoint 的語義,決定是否在進行 CheckpointBarrier 對齊時,緩衝後續的數據記錄,當收到全部上游輸入的 CheckpointBarrier 後,開始作 Checkpoint。TaskManager Checkpoint 完成後,會向 JobManager 發送確認完成的消息。只有當全部 Sink 算子完成 Checkpoint 且發送確認消息後,該次 Checkpoint 纔算完成。
在高可用模式下,ZooKeeper 主要存儲最新一次 Checkpoint 成功的目錄,當Flink 任務容錯恢復時,會從最新成功的 Checkpoint 恢復。Zookeeper 同時也存儲着 Flink 做業的元數據信息。好比在高可用模式下,Flink 會將 JobGraph 以及相關 Jar 包存儲在 HDFS 上面,Zookeeper 記錄着該信息。再次容錯重啓時,讀取這些信息,進行任務啓動。
下圖是一次 Checkpoint 的參與者:
CheckpointCoordinator,是 Checkpoint 中最重要的類,協調着實時任務整個 Checkpoint 的執行。下圖是 CheckpointCoordinator 中的方法:
Flink CheckpointCoordinator 中有幾個比較重要的方法:
Flink CheckpointCoordinator 類是在 ExecutionGraph 造成時進行初始化的,具體則是在 ExecutionGraph 建立以後,調用 enableCheckpointing 方法,而後在該方法中,CheckpointCoordinator 進行建立。如下是 Flink Checkpoint 觸發的時序圖:
當 Flink 做業狀態由建立到運行時,CheckpointCoordinator 中的 ScheduledThreadPoolExecutor 會定時執行 ScheduledTrigger 中的邏輯。ScheduledTrigger 本質就是一個 Runnable,run 方法中執行 triggerCheckpoint 方法。
一次 Flink Checkpoint 的流程是從 CheckpointCoordinator 的 triggerCheckpoint 方法開始,下面來看看一次 Flink Checkpoint 涉及到的主要內容:
Flink 會定時在任務的 Source 算子的 SubTask 觸發 CheckpointBarrier,CheckpointBarrier 是一種特殊的消息事件,會隨着消息通道流入到下游的算子中。只有當最後 Sink 端的算子接收到 CheckpointBarrier 並確認該次 Checkpoint 完成時,該次 Checkpoint 纔算完成。因此在某些算子的 Task 有多個輸入時,會存在 Barrier 對齊時間,咱們能夠在 Flink Web UI上面看到各個 Task 的 CheckpointBarrier 對齊時間。
下圖是一次 Flink Checkpoint 實例流程示意圖:
Flink Checkpoint 保存的任務狀態在程序取消中止時,默認會進行清除。Checkpoint 狀態保留策略主要有兩種:
DELETE_ON_CANCELLATION,RETAIN_ON_CANCELLATION
DELETE_ON_CANCELLATION 表示當程序取消時,刪除 Checkpoint 存儲的狀態文件。RETAIN_ON_CANCELLATION 表示當程序取消時,保存以前的 Checkpoint 存儲的狀態文件 用戶能夠結合業務狀況,設置 Checkpoint 保留模式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /** 開啓 checkpoint */ env.enableCheckpointing(10000); /** 設置 checkpoint 保留策略,取消程序時,保留 checkpoint 狀態文件 */ env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Flink Checkpoint 支持兩種語義:Exactly_Once 和 At_least_Once,默認的 Checkpoint 語義是 Exactly_Once。具體語義含義以下:
Exactly_Once 含義是:保證每條數據對於 Flink 任務的狀態結果隻影響一次。打個比方,好比 WordCount 程序,目前實時統計的 "hello" 這個單詞數爲 5,同時這個結果在此次 Checkpoint 成功後,保存在了 HDFS。在下次 Checkpoint 以前, 又來 2 個 "hello" 單詞,忽然程序遇到外部異常自動容錯恢復,會從最近的 Checkpoint 點開始恢復,那麼會從單詞數爲 5 的這個狀態點開始恢復,Kafka 消費的數據點位也是狀態爲 5 這個點位開始計算,因此即便程序遇到外部異常自動恢復時,也不會影響到 Flink 狀態的結果計算。
At_Least_Once 含義是:每條數據對於 Flink 任務的狀態計算至少影響一次。好比在 WordCount 程序中,你統計到的某個單詞的單詞數可能會比真實的單詞數要大,由於同一條消息,當 Flink 任務容錯恢復後,可能將其計算屢次。
Flink 中 Exactly_Once 和 At_Least_Once 具體是針對 Flink 任務狀態而言的,並非 Flink 程序對消息記錄只處理一次。舉個例子,當前 Flink 任務正在作 Checkpoint,該次 Checkpoint 尚未完成,此次 Checkpoint 時間段的數據其實已經進入 Flink 程序處理,只是程序狀態沒有最終存儲到遠程存儲。當程序忽然遇到異常,進行容錯恢復時,那麼就會從最新的 Checkpoint 進行狀態恢復重啓,上一次 Checkpoint 成功到此次 Checkpoint 失敗的數據還會進入 Flink 系統從新處理,具體實例以下圖:
上圖中表示一個 WordCount 實時任務的 Checkpoint,在進行 chk-5 Checkpoint 時,忽然遇到程序異常,那麼實時任務會從 chk-4 進行恢復,那麼以前 chk-5 處理的數據,Flink 系統會再次進行處理。不過這些數據的狀態沒有 Checkpoint 成功,因此 Flink 任務容錯恢復再次運行時,對於狀態的影響仍是隻有一次。
Exactly_Once 和 At_Least_Once 具體在底層實現大體相同,具體差別表如今 CheckpointBarrier 對齊方式的處理:
若是是 Exactly_Once 模式,某個算子的 Task 有多個輸入通道時,當其中一個輸入通道收到 CheckpointBarrier 時,Flink Task 會阻塞該通道,其不會處理該通道後續數據,可是會將這些數據緩存起來,一旦完成了全部輸入通道的 CheckpointBarrier 對齊,纔會繼續對這些數據進行消費處理。
對於 At_least_Once,一樣針對某個算子的 Task 有多個輸入通道的狀況下,當某個輸入通道接收到 CheckpointBarrier 時,它不一樣於 Exactly Once,即便沒有完成全部輸入通道 CheckpointBarrier 對齊,At Least Once 也會繼續處理後續接收到的數據。因此使用 At Least Once 不能保證數據對於狀態計算只有一次的計算影響。
Flink Checkpoint 失敗有不少種緣由,常見的失敗緣由以下:
從目前的具體實踐狀況來看,Flink Checkpoint 異常覺大多數仍是用戶代碼邏輯的問題,對於程序異常沒有正確的處理致使。因此在編寫 Flink 實時任務時,必定要注意處理程序可能出現的各類異常。這樣,也會讓實時任務的邏輯更加的健壯。
當本身的 Flink 實時任務 Checkpoint 失敗時,用戶能夠先經過 Flink Web UI 進行快速定位 Checkpoint 失敗的緣由,若是在 Flink Web UI 上面沒有看到異常信息,能夠去看任務的具體日誌進行定位,以下是 Flink Web UI 查看錯誤緣由示意圖:
下面是設置 Flink Checkpoint 參數配置的建議及注意點:
下圖是不設置 Checkpoint 最小時間間隔示例圖,能夠看到,系統一致在進行 Checkpoint,大量的資源使用在 Flink Chekpoint 上,可能對運行的任務產生必定影響:
還有一種特殊的狀況,Flink 端到端 Sink 的 EXACTLYONCE 的問題,也就是數據從 Flink 端到外部消息系統的消息一致性。打個比方,Flink 輸出數據到 Kafka 消息系統中,若是使用 Kafka 0.10 的版本,Flink 不支持端到端的 EXACTLYONCE,可能存在消息重複輸入到 Kafka。
如上圖所示,當作 chk-5 Checkpoint 的時候,chk-5 失敗,而後從 chk-4 來進行恢復,可是 chk-5 的部分數據在 Chekpoint 失敗以前就有部分進入到 Kafka 消息系統,再次恢復時,該部分數據可能再次重放到 Kafka 消息系統中。
Flink 中解決端到端的一致性有兩種方法:作冪等以及事務寫,冪等的話,可使用 KV 存儲系統來作冪等,由於 KV 存儲系統的屢次操做結果都是相同的。Flink 內部目前支持二階段事務提交,Kafka 0.11 以上版本支持事務寫,因此支持 Flink 端到 Kafka 端的 EXACTLY_ONCE。
有贊實時計算對於 Flink 任務的 Checkpoint 和 Savepoint 作了兩個方面工做,第一個工做是對於 Flink Checkpoint 失敗的狀況,若是 Checkpoint 失敗過於頻繁,同時 Flink Checkpoint 失敗次數若是達到平臺默認的失敗閾值,平臺會及時給用戶報警提示。咱們會每 5 分鐘檢查一次實時任務,統計實時任務近 15 分鐘內,Flink Checkpoint 失敗次數的最大值和最小值的差值達到平臺默認的閾值,則會立馬給用戶報警,讓用戶可以及時的處理問題。
固然,並非全部的 Flink 實時任務 Checkpoint 失敗平臺都能發現,由於 Checkpoint 失敗次數的檢查,首先與用戶配置的 Checkpoint 的時間間隔有關。舉個例子,若是用戶配置的 Checkpoint 間隔爲 1 小時,其實平臺默認 Checkpoint 邏輯檢查根本就沒法發現實時任務 Checkpoint 失敗。
針對這種狀況,實時平臺也支持用戶自定義設置 Checkpoint 失敗閾值,目前支持兩種 Checkpoint 失敗邏輯檢查,一個是 實時任務的 Checkpoint 失敗次數的總和達到閾值,另外一個則是近 10 分鐘內,Flink Checkpoint 次數的最大值和最小值的差值的計算邏輯,用戶能夠根據實時任務的敏感度,設置具體的參數。
第二個方面則是針對 Flink 任務的狀態恢復,爲了防止實時任務的狀態丟失,實時計算平臺會按期的對實時任務進行 Savepoint 觸發,當任務因爲外界因素致使任務失敗時,這種失敗是任務直接掛掉,Yarn 任務的狀態直接爲 Killed,這種狀況下,若是用戶開啓自動拉起功能,實時平臺自動拉起實時任務,同時從最新的 Savepoint 進行狀態恢復,以致於狀態不丟失。同時,實時計算平臺也支持用戶中止任務時,觸發 Savepoint,再次重啓實時任務時,仍是從中止時的任務狀態進行恢復。
目前,有贊在實時計算方面,還有很長的路要走。在知足業務的同時,可能也會有不少的坑須要踩。後面有贊實時計算會重點在實時數倉方面進行投入,同時會基於 Flink SQL 進行功能擴展和開發。
爲了用戶開發實時任務的便利性,後面有贊會開始進行在線實時計算平臺的設計開發。將來也會將實時任務遷移到 K8S 上面,這樣在大促場景下,可以更方便的進行資源的擴容和縮容。
將來,有贊實時計算平臺會爲用戶帶來更好的開發體驗,下降用戶開發實時任務的難度,讓咱們一塊兒拭目以待。