Flink CheckPoint奇技淫巧 | 原理和在生產中的應用

簡介

Flink自己爲了保證其高可用的特性,以及保證做用的Exactly Once的快速恢復,進而提供了一套強大的Checkpoint機制。Checkpoint機制是Flink可靠性的基石,能夠保證Flink集羣在某個算子由於某些緣由(如異常退出)出現故障時,可以將整個應用流圖的狀態恢復到故障以前的某一狀態,保 證應用流圖狀態的一致性。Flink的Checkpoint機制原理來自「Chandy-Lamport algorithm」算法 (分佈式快照算法)。node

Checkpoint的執行流程

每一個須要checkpoint的應用在啓動時,Flink的JobManager爲其建立一個 CheckpointCoordinator,CheckpointCoordinator全權負責本應用的快照製做。面試

file

  • CheckpointCoordinator週期性的向該流應用的全部source算子發送barrier;
  • 當某個source算子收到一個barrier時,便暫停數據處理過程,而後將本身的當前狀 態製做成快照,並保存到指定的持久化存儲中,最後向CheckpointCoordinator報告 本身快照製做狀況,同時向自身全部下游算子廣播該barrier,恢復數據處理;
  • 下游算子收到barrier以後,會暫停本身的數據處理過程,而後將自身的相關狀態製做成快照,並保存到指定的持久化存儲中,最後向CheckpointCoordinator報告自身 快照狀況,同時向自身全部下游算子廣播該barrier,恢復數據處理;
  • 每一個算子按照步驟3不斷製做快照並向下遊廣播,直到最後barrier傳遞到sink算子,快照製做完成。
  • 當CheckpointCoordinator收到全部算子的報告以後,認爲該週期的快照製做成功; 不然,若是在規定的時間內沒有收到全部算子的報告,則認爲本週期快照製做失敗 ;

Checkpoint經常使用設置

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
複製代碼

  • 使用StreamExecutionEnvironment.enableCheckpointing方法來設置開啓checkpoint;具體可使用enableCheckpointing(long interval),或者enableCheckpointing(long interval, CheckpointingMode mode);interval用於指定checkpoint的觸發間隔(單位milliseconds),而CheckpointingMode默認是CheckpointingMode.EXACTLYONCE,也能夠指定爲CheckpointingMode.ATLEAST_ONCE
  • 也能夠經過StreamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode來設置CheckpointingMode,通常對於超低延遲的應用(大概幾毫秒)可使用CheckpointingMode.ATLEASTONCE,其餘大部分應用使用CheckpointingMode.EXACTLY_ONCE就能夠
  • checkpointTimeout用於指定checkpoint執行的超時時間(單位milliseconds),超時沒完成就會被abort掉
  • minPauseBetweenCheckpoints用於指定checkpoint coordinator上一個checkpoint完成以後最小等多久能夠出發另外一個checkpoint,當指定這個參數時,maxConcurrentCheckpoints的值爲1
  • maxConcurrentCheckpoints用於指定運行中的checkpoint最多能夠有多少個,用於包裝topology不會花太多的時間在checkpoints上面;若是有設置了minPauseBetweenCheckpoints,則maxConcurrentCheckpoints這個參數就不起做用了(大於1的值不起做用)
  • enableExternalizedCheckpoints用於開啓checkpoints的外部持久化,可是在job失敗的時候不會自動清理,須要本身手工清理state;ExternalizedCheckpointCleanup用於指定當job canceled的時候externalized checkpoint該如何清理,DELETEONCANCELLATION的話,在job canceled的時候會自動刪除externalized state,可是若是是FAILED的狀態則會保留;RETAINONCANCELLATION則在job canceled的時候會保留externalized checkpoint state
  • failOnCheckpointingErrors用於指定在checkpoint發生異常的時候,是否應該fail該task,默認爲true,若是設置爲false,則task會拒絕checkpoint而後繼續運行

flink-conf.yaml相關配置

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
# state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend). 
#
# state.backend.incremental: false
複製代碼

  • state.backend用於指定checkpoint state存儲的backend,默認爲none
  • state.backend.async用於指定backend是否使用異步snapshot(默認爲true),有些不支持async或者只支持async的state backend可能會忽略這個參數
  • state.backend.fs.memory-threshold,默認爲1024,用於指定存儲於files的state大小閾值,若是小於該值則會存儲在root checkpoint metadata file
  • state.backend.incremental,默認爲false,用於指定是否採用增量checkpoint,有些不支持增量checkpoint的backend會忽略該配置
  • state.backend.local-recovery,默認爲false
  • state.checkpoints.dir,默認爲none,用於指定checkpoint的data files和meta data存儲的目錄,該目錄必須對全部參與的TaskManagers及JobManagers可見
  • state.checkpoints.num-retained,默認爲1,用於指定保留的已完成的checkpoints個數
  • state.savepoints.dir,默認爲none,用於指定savepoints的默認目錄
  • taskmanager.state.local.root-dirs,默認爲none

增量式的檢查點- Checkpoint設置的奇技淫巧

增量式檢查點

Flink的檢查點是一個全局的、異步的程序快照,它週期性的生成並送到持久化存儲(通常使用分佈式系統)。當發生故障時,Flink使用最新的檢查點進行重啓。一些Flink的用戶在程序「狀態」中保存了GB甚至TB的數據。這些用戶反饋在大量 的狀態下,建立檢查點一般很慢而且耗資源,這也是爲何Flink在 1.3版本開始引入「增量式的檢查點」。算法

在引入「增量式的檢查點」以前,每個Flink的檢查點都保存了程序完整的狀態。後來咱們意識到在大部分狀況下這是沒必要要的,由於上一次和此次的檢查點以前 ,狀態發生了很大的變化,因此咱們建立了「增量式的檢查點」。增量式的檢查點僅保存過去和如今狀態的差別部分。架構

增量式的檢查點能夠爲擁有大量狀態的程序帶來很大的提高。在早期的測試中,一個擁有TB級別「狀態」程序將生成檢查點的耗時從3分鐘以上下降 到了30秒左右。由於增量式的檢查點不須要每次把完整的狀態發送到存儲中。app

如今只能經過RocksDB state back-end來獲取增量式檢查點的功能,Flink使用RocksDB內置的備份機制來合併檢查點數據。這樣Flink增量式檢查點的數據不會無限制的增大,它會自動合併老的檢查點數據並清理掉。異步

要啓用這個機制,能夠以下設置:RocksDBStateBackend backend =new RocksDBStateBackend(filebackend, true);async

增量式檢查點如何工做

Flink 增量式的檢查點以「RocksDB」爲基礎,RocksDB是一個基於 LSM樹的KV存儲,新的數據保存在內存中,稱爲memtable。若是Key相同,後到的數據將覆蓋以前的數據,一旦memtable寫滿了,RocksDB將數據壓縮並寫入到磁盤。memtable的數據持久化到磁盤後,他們就變成了不可變的sstable。分佈式

RocksDB會在後臺執行compaction,合併sstable並刪除其中重複的數據。以後RocksDB刪除原來的sstable,替換成新合成的ssttable,這個sstable包含了以前的sstable中的信息。測試

在這個基礎之上,Flink跟蹤前一個checkpoint建立和刪除的RocksDB sstable文件,由於sstable是不可變的,Flink能夠所以計算出 狀態有哪些改變。爲了達到這個目標,Flink在RocksDB上觸發了一個刷新操做,強制將memtable刷新到磁盤上。這個操做在Flink中是同步的,其餘的操做是異步的,不會阻塞數據處理。大數據

Flink 的checkpoint會將新的sstable發送到持久化存儲(例如HDFS,S3)中,同時保留引用。Flink不會發送全部的sstable, 一些數據在以前的checkpoint存在而且寫入到持久化存儲中了,這樣只須要增長引用次數就能夠了。由於compaction的做用,一些sstable會合併成一個sstable並刪除這些sstable,這也是爲何Flink能夠減小checkpoint的歷史文件。

爲了分析checkpoint的數據變動,而上傳整理過的sstable是多餘的(這裏的意思是以前已經上傳過的,不須要再次上傳)。Flink處理這種狀況,僅帶來一點點開銷。這個過程很重要,由於在任務須要重啓的時候,Flink只須要保留較少的歷史文件。

file

假設有一個子任務,擁有一個keyed state的operator,checkpoint最多保留2個。上面的圖片描述了每一個checkpoint對應的RocksDB 的狀態,它引用到的文件,以及在checkpoint完成後共享狀態中的count值。

checkpoint ‘CP2’,本地的RocksDB目錄有兩個sstable文件,這些文件是新生成的,因而Flink將它們傳到了checkpoint 對應的存儲目錄。當checkpoint完成後,Flink在共享狀態中建立兩個實體,並將count設爲1。在這個共享狀態中,這個key 由operator、subtask,原始的sstable名字組成,value爲sstable實際存儲目錄。

checkpoint‘CP2’,RocksDB有2個老的sstable文件,又建立了2個新的sstable文件。Flink將這兩個新的sstable傳到 持久化存儲中,而後引用他們。當checkpoint完成後,Flink將全部的引用的相應計數加1。

checkpoint‘CP3’,RocksDB的compaction將sstable-(1), sstable-(2), sstable-(3) 合併成 sstable-(1,2,3),而後刪除 原始的sstable。這個合併後的文件包含了和以前源文件同樣的信息,而且清理掉了重複的部分。sstable-(4)還保留着,而後有一個 新生成的sstable-(5)。Flink將新的 sstable-(1,2,3)以及 sstable-(5)傳到持久化存儲中, sstable-(4)仍被‘CP2’引用,因此 將計數增長1。如今有了3個checkpoint,'CP1','CP2','CP3',超過了預設的保留數目2,因此CP1被刪除。做爲刪除的一部分, CP1對應的文件(sstable-(1)、sstable-(2)) 的引用計數減1。

checkpoint‘CP4’,RocksDB將sstable-(4), sstable-(5), 新的 sstable-(6) 合併成 sstable-(4,5,6)。Flink將新合併 的 sstable-(4,5,6)發送到持久化存儲中,sstable-(1,2,3)、sstable-(4,5,6) 的引用計數增長1。因爲再次到達了checkpoint的 保留數目,‘CP2’將被刪除,‘CP2’對應的文件(sstable-(1)、sstable-(2)、sstable(3) )的引用計數減1。因爲‘CP2’對應 的文件的引用計數達到0,這些文件將被刪除。

須要注意的地方

若是使用增量式的checkpoint,那麼在錯誤恢復的時候,不須要考慮不少的配置項。一旦發生了錯誤,Flink的JobManager會告訴 task須要從最新的checkpoint中恢復,它能夠是全量的或者是增量的。以後TaskManager從分佈式系統中下載checkpoint文件, 而後從中恢復狀態。

增量式的checkpoint能爲擁有大量狀態的程序帶來較大的提高,但還有一些trade-off須要考慮。總的來講,增量式減小了checkpoint操做的時間,可是相對的,從checkpoint中恢復可能更耗時,具體狀況須要根據應用程序包含的狀態大小而定。相對的,若是程序只是部分失敗,Flink TaskManager須要從多個checkpoint中讀取數據,這時候使用全量的checkpoint來恢復數據可能更加耗時。同時,因爲新的checkpoint可能引用到老的checkpoint,這樣老的checkpoint就不能被刪除,這樣下去,歷史的版本數據會愈來愈大。須要考慮使用分佈式來存儲checkpoint,另外還須要考慮讀取帶來的帶寬消耗。

聲明:本號全部文章除特殊註明,都爲原創,公衆號讀者擁有優先閱讀權,未經做者本人容許不得轉載,不然追究侵權責任。

關注個人公衆號,後臺回覆【JAVAPDF】獲取200頁面試題!5萬人關注的大數據成神之路,不來了解一下嗎?5萬人關注的大數據成神之路,真的不來了解一下嗎?5萬人關注的大數據成神之路,肯定真的不來了解一下嗎?

歡迎您關注《大數據成神之路》

大數據技術與架構

備註:全部內容首發公衆號,這裏不保證明時性和完整性,你們掃描文末二維碼關注哦~

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息