Flink可靠性的基石-checkpoint機制詳細解析

Checkpoint介紹

checkpoint機制是Flink可靠性的基石,能夠保證Flink集羣在某個算子由於某些緣由(如 異常退出)出現故障時,可以將整個應用流圖的狀態恢復到故障以前的某一狀態,保 證應用流圖狀態的一致性。Flink的checkpoint機制原理來自「Chandy-Lamport algorithm」算法。css

每一個須要checkpoint的應用在啓動時,Flink的JobManager爲其建立一個 CheckpointCoordinator(檢查點協調器),CheckpointCoordinator全權負責本應用的快照製做。
node

1) CheckpointCoordinator(檢查點協調器) 週期性的向該流應用的全部source算子發送 barrier(屏障)。nginx

2) 當某個source算子收到一個barrier時,便暫停數據處理過程,而後將本身的當前狀態製做成快照,並保存到指定的持久化存儲中,最後向CheckpointCoordinator報告本身快照製做狀況,同時向自身全部下游算子廣播該barrier,恢復數據處理算法

3) 下游算子收到barrier以後,會暫停本身的數據處理過程,而後將自身的相關狀態製做成快照,並保存到指定的持久化存儲中,最後向CheckpointCoordinator報告自身快照狀況,同時向自身全部下游算子廣播該barrier,恢復數據處理。網絡

4) 每一個算子按照步驟3不斷製做快照並向下遊廣播,直到最後barrier傳遞到sink算子,快照製做完成。分佈式

5) 當CheckpointCoordinator收到全部算子的報告以後,認爲該週期的快照製做成功; 不然,若是在規定的時間內沒有收到全部算子的報告,則認爲本週期快照製做失敗。ide

若是一個算子有兩個輸入源,則暫時阻塞先收到barrier的輸入源,等到第二個輸入源相 同編號的barrier到來時,再製做自身快照並向下遊廣播該barrier。具體以下圖所示:函數

1) 假設算子C有A和B兩個輸入源性能

2) 在第i個快照週期中,因爲某些緣由(如處理時延、網絡時延等)輸入源A發出的 barrier 先到來,這時算子C暫時將輸入源A的輸入通道阻塞,僅收輸入源B的數據。學習

3) 當輸入源B發出的barrier到來時,算子C製做自身快照並向 CheckpointCoordinator 報告自身的快照製做狀況,而後將兩個barrier合併爲一個,向下遊全部的算子廣播。

4) 當因爲某些緣由出現故障時,CheckpointCoordinator通知流圖上全部算子統一恢復到某個週期的checkpoint狀態,而後恢復數據流處理。分佈式checkpoint機制保證了數據僅被處理一次(Exactly Once)。

持久化存儲

MemStateBackend

該持久化存儲主要將快照數據保存到JobManager的內存中,僅適合做爲測試以及快照的數據量很是小時使用,並不推薦用做大規模商業部署。

MemoryStateBackend 的侷限性

默認狀況下,每一個狀態的大小限制爲 5 MB。能夠在MemoryStateBackend的構造函數中增長此值。

不管配置的最大狀態大小如何,狀態都不能大於akka幀的大小(請參閱配置)。

聚合狀態必須適合 JobManager 內存。

建議MemoryStateBackend 用於

本地開發和調試。

狀態不多的做業,例如僅包含一次記錄功能的做業(Map,FlatMap,Filter,…),kafka的消費者須要不多的狀態。

FsStateBackend

該持久化存儲主要將快照數據保存到文件系統中,目前支持的文件系統主要是 HDFS和本地文件。若是使用HDFS,則初始化FsStateBackend時,須要傳入以 「hdfs://」開頭的路徑(即: new FsStateBackend("hdfs:///hacluster/checkpoint")), 若是使用本地文件,則須要傳入以「file://」開頭的路徑(即:new FsStateBackend("file:///Data"))。在分佈式狀況下,不推薦使用本地文件。若是某 個算子在節點A上失敗,在節點B上恢復,使用本地文件時,在B上沒法讀取節點 A上的數據,致使狀態恢復失敗。

建議FsStateBackend:

具備大狀態,長窗口,大鍵 / 值狀態的做業。

全部高可用性設置。

RocksDBStateBackend

RocksDBStatBackend介於本地文件和HDFS之間,平時使用RocksDB的功能,將數 據持久化到本地文件中,當製做快照時,將本地數據製做成快照,並持久化到 FsStateBackend中(FsStateBackend沒必要用戶特別指明,只需在初始化時傳入HDFS 或本地路徑便可,如new RocksDBStateBackend("hdfs:///hacluster/checkpoint")或new RocksDBStateBackend("file:///Data"))。

若是用戶使用自定義窗口(window),不推薦用戶使用RocksDBStateBackend。在自定義窗口中,狀態以ListState的形式保存在StatBackend中,若是一個key值中有多個value值,則RocksDB讀取該種ListState很是緩慢,影響性能。用戶能夠根據應用的具體狀況選擇FsStateBackend+HDFS或RocksStateBackend+HDFS。

語法

val env = StreamExecutionEnvironment.getExecutionEnvironment()
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000)
// advanced options:
// 設置checkpoint的執行模式,最多執行一次或者至少執行一次
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 設置checkpoint的超時時間
env.getCheckpointConfig.setCheckpointTimeout(60000)
// 若是在只作快照過程當中出現錯誤,是否讓總體任務失敗:true是  false不是
env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
//設置同一時間有多少 個checkpoint能夠同時執行 
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

修改State Backend的兩種方式

第一種:單任務調整

修改當前任務代碼

env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【須要添加第三方依賴】

第二種:全局調整

修改flink-conf.yaml

state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints

注意:state.backend的值能夠是下面幾種:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

Checkpoint的高級選項

默認checkpoint功能是disabled的,想要使用的時候須要先啓用checkpoint開啓以後,默認的checkPointMode是Exactly-once

//配置一秒鐘開啓一個checkpoint
env.enableCheckpointing(1000)
//指定checkpoint的執行模式
//兩種可選:
//CheckpointingMode.EXACTLY_ONCE:默認值
//CheckpointingMode.AT_LEAST_ONCE

env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

通常狀況下選擇CheckpointingMode.EXACTLY_ONCE,除非場景要求極低的延遲(幾毫秒)

注意:若是須要保證EXACTLY_ONCE,source和sink要求必須同時保證EXACTLY_ONCE
//若是程序被cancle,保留之前作的checkpoint
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

默認狀況下,檢查點不被保留,僅用於在故障中恢復做業,能夠啓用外部持久化檢查點,同時指定保留策略:

ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在做業取消時保留檢查點,注意,在這種狀況下,您必須在取消後手動清理檢查點狀態

ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:看成業在被cancel時,刪除檢查點,檢查點僅在做業失敗時可用
//設置checkpoint超時時間
env.getCheckpointConfig.setCheckpointTimeout(60000)
//Checkpointing的超時時間,超時時間內沒有完成則被終止
//Checkpointing最小時間間隔,用於指定上一個checkpoint完成以後
//最小等多久能夠觸發另外一個checkpoint,當指定這個參數時,maxConcurrentCheckpoints的值爲1
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
//設置同一個時間是否能夠有多個checkpoint執行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
指定運行中的checkpoint最多能夠有多少個

env.getCheckpointConfig.setFailOnCheckpointingErrors(true)
用於指定在checkpoint發生異常的時候,是否應該fail該task,默認是true,若是設置爲false,則task會拒絕checkpoint而後繼續運行

Flink的重啓策略

Flink支持不一樣的重啓策略,這些重啓策略控制着job失敗後如何重啓。集羣能夠經過默認的重啓策略來重啓,這個默認的重啓策略一般在未指定重啓策略的狀況下使用,而若是Job提交的時候指定了重啓策略,這個重啓策略就會覆蓋掉集羣的默認重啓策略。

概覽

默認的重啓策略是經過Flink的 flink-conf.yaml 來指定的,這個配置參數 restart-strategy 定義了哪一種策略會被採用。若是checkpoint未啓動,就會採用 no restart 策略,若是啓動了checkpoint機制,可是未指定重啓策略的話,就會採用 fixed-delay 策略,重試 Integer.MAX_VALUE 次。請參考下面的可用重啓策略來了解哪些值是支持的。

每一個重啓策略都有本身的參數來控制它的行爲,這些值也能夠在配置文件中設置,每一個重啓策略的描述都包含着各自的配置值信息。

除了定義一個默認的重啓策略以外,你還能夠爲每個Job指定它本身的重啓策略,這個重啓策略能夠在 ExecutionEnvironment 中調用 setRestartStrategy() 方法來程序化地調用,注意這種方式一樣適用於 StreamExecutionEnvironment

下面的例子展現瞭如何爲Job設置一個固定延遲重啓策略,一旦有失敗,系統就會嘗試每10秒重啓一次,重啓3次。

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3// 重啓次數
  Time.of(10, TimeUnit.SECONDS) // 延遲時間間隔
))

固定延遲重啓策略(Fixed Delay Restart Strategy)

固定延遲重啓策略會嘗試一個給定的次數來重啓Job,若是超過了最大的重啓次數,Job最終將失敗。在連續的兩次重啓嘗試之間,重啓策略會等待一個固定的時間。

重啓策略能夠配置flink-conf.yaml的下面配置參數來啓用,做爲默認的重啓策略:

restart-strategy: fixed-delay

例子:

restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s

固定延遲重啓也能夠在程序中設置:

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
  3// 重啓次數
  Time.of(10, TimeUnit.SECONDS) // 重啓時間間隔
))

失敗率重啓策略

失敗率重啓策略在Job失敗後會重啓,可是超過失敗率後,Job會最終被認定失敗。在兩個連續的重啓嘗試之間,重啓策略會等待一個固定的時間。

失敗率重啓策略能夠在flink-conf.yaml中設置下面的配置參數來啓用:

restart-strategy:failure-rate

例子:

restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s

失敗率重啓策略也能夠在程序中設置:

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.failureRateRestart(
  3// 每一個測量時間間隔最大失敗次數
  Time.of(5, TimeUnit.MINUTES), //失敗率測量的時間間隔
  Time.of(10, TimeUnit.SECONDS) // 兩次連續重啓嘗試的時間間隔
))

無重啓策略

Job直接失敗,不會嘗試進行重啓

restart-strategy: none

無重啓策略也能夠在程序中設置

val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.noRestart())

搜索公衆號:五分鐘學大數據,發送 祕籍,便可獲取大數據學習祕籍大禮包,深刻鑽研大數據技術!

相關文章
相關標籤/搜索