Flink State和容錯機制

1. Flink Barriers

     Flink分佈式快照的核心元素是流barriers。 這些barriers被注入數據流並與記錄一塊兒做爲數據流的一部分流動。 barriers永遠不會超過記錄,流量嚴格符合要求。 barriers將數據流中的記錄分爲進入當前快照的記錄集和進入下一個快照的記錄。 每一個barriers都攜帶快照的ID,該快照的記錄barriers前面推送的數據。 barriers不會中斷流的流動,所以很是輕量級。 來自不一樣快照的多個障礙能夠同時在流中,這意味着能夠同時發生各類快照。html

Checkpoint barriers in data streams

2. Flink Checkpoint的過程

    Aligning data streams at operators with multiple inputs

2.1 Barries 對齊過程

(1).   一旦operator從輸入流接收到快照barrier n,它就不能處理來自該流的任何其餘記錄,直到它從其餘輸入接收到barrier n爲止。 不然,它會混合屬於快照n的記錄和屬於快照n + 1的記錄。java

(2).  包含barrier n的流數據暫時被Operator擱置。 從這些流接收的記錄不會被處理,而是放入輸入緩衝區。apache

(3).  一旦最後一個流接收到屏障n,Operator就會向下一個Operator發出全部掛起的流數據,而後本身發出快照n個屏障。app

(4).  以後,它將繼續處理來自全部輸入流的記錄,在處理來自流的記錄以前,會優先處理來自輸入緩衝區的記錄。分佈式

2.2 Checkpoint 過程

2.2.1 State數據

  • 用戶定義的狀態:這是由轉換函數(如map()或filter())直接建立和修改的狀態。   函數

  • 系統狀態:此狀態是指做爲運算符計算一部分的數據緩衝區。 此狀態的典型示例是窗口緩衝區,系統在其中收集(和聚合)窗口記錄,直到窗口被評估和逐出。

2.2.2 快照數據

  • 對於每一個並行流數據源,啓動快照時流中的偏移/位置工具

  • 對於每一個運算符,state數據會被做爲快照的一部分

 

Illustration of the Checkpointing Mechanism

2.2.3 State存儲配置

   https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/state_backends.html this

   Flink使用state: 分爲 Keyed State和 Operator Statespa

    https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.htmlcode

   Keyed State和 Operator State的區別如圖

   Flink状态管理和容错机制介绍

2.2.4 Checkpoint代碼設置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

2.2.5 SavePoint

Savepoint:是一種特殊的checkpoint,只不過不像checkpoint按期的從系統中去觸發的,它是用戶經過命令觸發,存儲格式和checkpoint也是不相同的,會將數據按照一個標準的格式存儲,無論配置什麼樣,Flink都會從這個checkpoint恢復,是用來作版本升級一個很是好的工具;

https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/savepoints.html

 a. 中止flink任務,而且savepoint

bin/flink cancel -s [:targetDirectory] :jobId

b. 從savepoint中恢復任務

bin/flink run -s :savepointPath [:runArgs]

 

2.3 Exactly Once 和 At Least Once的區別

      因爲存在Barriers 對齊的步驟,因此會存在毫秒級別的延遲。若是對實時性要求很高的程序,能夠在checkpoint 期間跳過barriers對齊,一旦operator看到每一個輸入的barrier,就會繪製檢查點快照。當跳過對齊時,即便在檢查點n的某些檢查點barriers到達以後,operator也會繼續處理全部輸入。這樣,在獲取檢查點n的狀態快照以前,operator還處理屬於檢查點n + 1的元素。在還原時,這些記錄將做爲重複記錄出現,由於它們都包含在檢查點n的狀態快照中,並將在檢查點n以後做爲數據的一部分進行重放。

       barriers 對齊僅適用於具備多個前驅(鏈接)的運算符以及具備多個發送方的運算符(在流repartitioning/shuffle)。所以,數據流在並行流操做(map(),flatMap(),filter(),...)實際上即便在At Least Once模式下也能提供Exactly Once。

// exactly_once 模式
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// at least once模式
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
相關文章
相關標籤/搜索