Flink自己爲了保證其高可用的特性,以及保證做用的Exactly Once的快速恢復,進而提供了一套強大的Checkpoint機制。Checkpoint機制是Flink可靠性的基石,能夠保證Flink集羣在某個算子由於某些緣由(如異常退出)出現故障時,可以將整個應用流圖的狀態恢復到故障以前的某一狀態,保 證應用流圖狀態的一致性。Flink的Checkpoint機制原理來自「Chandy-Lamport algorithm」算法 (分佈式快照算法)。node
每一個須要checkpoint的應用在啓動時,Flink的JobManager爲其建立一個 CheckpointCoordinator,CheckpointCoordinator全權負責本應用的快照製做。面試
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
複製代碼
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
# state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend).
#
# state.backend.incremental: false
複製代碼
Flink的檢查點是一個全局的、異步的程序快照,它週期性的生成並送到持久化存儲(通常使用分佈式系統)。當發生故障時,Flink使用最新的檢查點進行重啓。一些Flink的用戶在程序「狀態」中保存了GB甚至TB的數據。這些用戶反饋在大量 的狀態下,建立檢查點一般很慢而且耗資源,這也是爲何Flink在 1.3版本開始引入「增量式的檢查點」。算法
在引入「增量式的檢查點」以前,每個Flink的檢查點都保存了程序完整的狀態。後來咱們意識到在大部分狀況下這是沒必要要的,由於上一次和此次的檢查點以前 ,狀態發生了很大的變化,因此咱們建立了「增量式的檢查點」。增量式的檢查點僅保存過去和如今狀態的差別部分。架構
增量式的檢查點能夠爲擁有大量狀態的程序帶來很大的提高。在早期的測試中,一個擁有TB級別「狀態」程序將生成檢查點的耗時從3分鐘以上下降 到了30秒左右。由於增量式的檢查點不須要每次把完整的狀態發送到存儲中。app
如今只能經過RocksDB state back-end來獲取增量式檢查點的功能,Flink使用RocksDB內置的備份機制來合併檢查點數據。這樣Flink增量式檢查點的數據不會無限制的增大,它會自動合併老的檢查點數據並清理掉。異步
要啓用這個機制,能夠以下設置:RocksDBStateBackend backend =new RocksDBStateBackend(filebackend, true);async
Flink 增量式的檢查點以「RocksDB」爲基礎,RocksDB是一個基於 LSM樹的KV存儲,新的數據保存在內存中,稱爲memtable。若是Key相同,後到的數據將覆蓋以前的數據,一旦memtable寫滿了,RocksDB將數據壓縮並寫入到磁盤。memtable的數據持久化到磁盤後,他們就變成了不可變的sstable。分佈式
RocksDB會在後臺執行compaction,合併sstable並刪除其中重複的數據。以後RocksDB刪除原來的sstable,替換成新合成的ssttable,這個sstable包含了以前的sstable中的信息。測試
在這個基礎之上,Flink跟蹤前一個checkpoint建立和刪除的RocksDB sstable文件,由於sstable是不可變的,Flink能夠所以計算出 狀態有哪些改變。爲了達到這個目標,Flink在RocksDB上觸發了一個刷新操做,強制將memtable刷新到磁盤上。這個操做在Flink中是同步的,其餘的操做是異步的,不會阻塞數據處理。大數據
Flink 的checkpoint會將新的sstable發送到持久化存儲(例如HDFS,S3)中,同時保留引用。Flink不會發送全部的sstable, 一些數據在以前的checkpoint存在而且寫入到持久化存儲中了,這樣只須要增長引用次數就能夠了。由於compaction的做用,一些sstable會合併成一個sstable並刪除這些sstable,這也是爲何Flink能夠減小checkpoint的歷史文件。
爲了分析checkpoint的數據變動,而上傳整理過的sstable是多餘的(這裏的意思是以前已經上傳過的,不須要再次上傳)。Flink處理這種狀況,僅帶來一點點開銷。這個過程很重要,由於在任務須要重啓的時候,Flink只須要保留較少的歷史文件。
假設有一個子任務,擁有一個keyed state的operator,checkpoint最多保留2個。上面的圖片描述了每一個checkpoint對應的RocksDB 的狀態,它引用到的文件,以及在checkpoint完成後共享狀態中的count值。
checkpoint ‘CP2’,本地的RocksDB目錄有兩個sstable文件,這些文件是新生成的,因而Flink將它們傳到了checkpoint 對應的存儲目錄。當checkpoint完成後,Flink在共享狀態中建立兩個實體,並將count設爲1。在這個共享狀態中,這個key 由operator、subtask,原始的sstable名字組成,value爲sstable實際存儲目錄。
checkpoint‘CP2’,RocksDB有2個老的sstable文件,又建立了2個新的sstable文件。Flink將這兩個新的sstable傳到 持久化存儲中,而後引用他們。當checkpoint完成後,Flink將全部的引用的相應計數加1。
checkpoint‘CP3’,RocksDB的compaction將sstable-(1), sstable-(2), sstable-(3) 合併成 sstable-(1,2,3),而後刪除 原始的sstable。這個合併後的文件包含了和以前源文件同樣的信息,而且清理掉了重複的部分。sstable-(4)還保留着,而後有一個 新生成的sstable-(5)。Flink將新的 sstable-(1,2,3)以及 sstable-(5)傳到持久化存儲中, sstable-(4)仍被‘CP2’引用,因此 將計數增長1。如今有了3個checkpoint,'CP1','CP2','CP3',超過了預設的保留數目2,因此CP1被刪除。做爲刪除的一部分, CP1對應的文件(sstable-(1)、sstable-(2)) 的引用計數減1。
checkpoint‘CP4’,RocksDB將sstable-(4), sstable-(5), 新的 sstable-(6) 合併成 sstable-(4,5,6)。Flink將新合併 的 sstable-(4,5,6)發送到持久化存儲中,sstable-(1,2,3)、sstable-(4,5,6) 的引用計數增長1。因爲再次到達了checkpoint的 保留數目,‘CP2’將被刪除,‘CP2’對應的文件(sstable-(1)、sstable-(2)、sstable(3) )的引用計數減1。因爲‘CP2’對應 的文件的引用計數達到0,這些文件將被刪除。
若是使用增量式的checkpoint,那麼在錯誤恢復的時候,不須要考慮不少的配置項。一旦發生了錯誤,Flink的JobManager會告訴 task須要從最新的checkpoint中恢復,它能夠是全量的或者是增量的。以後TaskManager從分佈式系統中下載checkpoint文件, 而後從中恢復狀態。
增量式的checkpoint能爲擁有大量狀態的程序帶來較大的提高,但還有一些trade-off須要考慮。總的來講,增量式減小了checkpoint操做的時間,可是相對的,從checkpoint中恢復可能更耗時,具體狀況須要根據應用程序包含的狀態大小而定。相對的,若是程序只是部分失敗,Flink TaskManager須要從多個checkpoint中讀取數據,這時候使用全量的checkpoint來恢復數據可能更加耗時。同時,因爲新的checkpoint可能引用到老的checkpoint,這樣老的checkpoint就不能被刪除,這樣下去,歷史的版本數據會愈來愈大。須要考慮使用分佈式來存儲checkpoint,另外還須要考慮讀取帶來的帶寬消耗。
聲明:本號全部文章除特殊註明,都爲原創,公衆號讀者擁有優先閱讀權,未經做者本人容許不得轉載,不然追究侵權責任。
關注個人公衆號,後臺回覆【JAVAPDF】獲取200頁面試題!5萬人關注的大數據成神之路,不來了解一下嗎?5萬人關注的大數據成神之路,真的不來了解一下嗎?5萬人關注的大數據成神之路,肯定真的不來了解一下嗎?
備註:全部內容首發公衆號,這裏不保證明時性和完整性,你們掃描文末二維碼關注哦~