Flink分佈式快照的核心元素是流barriers。 這些barriers被注入數據流並與記錄一塊兒做爲數據流的一部分流動。 barriers永遠不會超過記錄,流量嚴格符合要求。 barriers將數據流中的記錄分爲進入當前快照的記錄集和進入下一個快照的記錄。 每一個barriers都攜帶快照的ID,該快照的記錄barriers前面推送的數據。 barriers不會中斷流的流動,所以很是輕量級。 來自不一樣快照的多個障礙能夠同時在流中,這意味着能夠同時發生各類快照。html
(1). 一旦operator從輸入流接收到快照barrier n,它就不能處理來自該流的任何其餘記錄,直到它從其餘輸入接收到barrier n爲止。 不然,它會混合屬於快照n的記錄和屬於快照n + 1的記錄。java
(2). 包含barrier n的流數據暫時被Operator擱置。 從這些流接收的記錄不會被處理,而是放入輸入緩衝區。apache
(3). 一旦最後一個流接收到屏障n,Operator就會向下一個Operator發出全部掛起的流數據,而後本身發出快照n個屏障。app
(4). 以後,它將繼續處理來自全部輸入流的記錄,在處理來自流的記錄以前,會優先處理來自輸入緩衝區的記錄。分佈式
用戶定義的狀態:這是由轉換函數(如map()或filter())直接建立和修改的狀態。 函數
對於每一個並行流數據源,啓動快照時流中的偏移/位置工具
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/state/state_backends.html this
Flink使用state: 分爲 Keyed State和 Operator Statespa
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.htmlcode
Keyed State和 Operator State的區別如圖
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // start a checkpoint every 1000 ms env.enableCheckpointing(1000); // advanced options: // set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig().setCheckpointTimeout(60000); // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // enable externalized checkpoints which are retained after job cancellation env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Savepoint:是一種特殊的checkpoint,只不過不像checkpoint按期的從系統中去觸發的,它是用戶經過命令觸發,存儲格式和checkpoint也是不相同的,會將數據按照一個標準的格式存儲,無論配置什麼樣,Flink都會從這個checkpoint恢復,是用來作版本升級一個很是好的工具;
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/savepoints.html
a. 中止flink任務,而且savepoint
bin/flink cancel -s [:targetDirectory] :jobId
b. 從savepoint中恢復任務
bin/flink run -s :savepointPath [:runArgs]
因爲存在Barriers 對齊的步驟,因此會存在毫秒級別的延遲。若是對實時性要求很高的程序,能夠在checkpoint 期間跳過barriers對齊,一旦operator看到每一個輸入的barrier,就會繪製檢查點快照。當跳過對齊時,即便在檢查點n的某些檢查點barriers到達以後,operator也會繼續處理全部輸入。這樣,在獲取檢查點n的狀態快照以前,operator還處理屬於檢查點n + 1的元素。在還原時,這些記錄將做爲重複記錄出現,由於它們都包含在檢查點n的狀態快照中,並將在檢查點n以後做爲數據的一部分進行重放。
barriers 對齊僅適用於具備多個前驅(鏈接)的運算符以及具備多個發送方的運算符(在流repartitioning/shuffle)。所以,數據流在並行流操做(map(),flatMap(),filter(),...)實際上即便在At Least Once模式下也能提供Exactly Once。
// exactly_once 模式 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // at least once模式 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);