Flink(四)Flink的State

1、State概述

Flink裏面有state的概念
State:通常指一個具體的task/operator的狀態。State能夠被記錄,在失敗的狀況下數據還能夠恢復,Flink中有兩種基本類型的State:Keyed State,Operator State,他們兩種均可以以兩種形式存在:原始狀態(raw state)和託管狀態(managed state)
託管狀態:由Flink框架管理的狀態,咱們一般使用的就是這種。
原始狀態:由用戶自行管理狀態具體的數據結構,框架在作checkpoint的時候,使用byte[]來讀寫狀態內容,對其內部數據結構一無所知。一般在DataStream上的狀態推薦使用託管的狀態,當實現一個用戶自定義的operator時,會使用到原始狀態。可是咱們工做中通常不經常使用,因此咱們不考慮他。
Flink(四)Flink的Statenode

2、State類型

一、Operator State類型

裏面沒有shuffle的操做,或者說裏面沒有key by的操做。
Flink(四)Flink的State數據庫

(1)operator state是task級別的state,說白了就是每一個task對應一個state數據結構

(2)Kafka Connector source中的每一個分區(task)都須要記錄消費的topic的partition和offset等信息。
(3)operator state 只有一種託管狀態:ValueState
(4)operator state能夠定義爲source和sink的狀態進行管理框架

二、Keyed State類型

Flink(四)Flink的State

(1)keyed state記錄的是每一個key的狀態
(2)Keyed state託管狀態有六種類型分佈式

(2.1)ValueState
             /**
             *  ValueState<T> :這個狀態爲每個 key 保存一個值
             *      value() 獲取狀態值
             *      update() 更新狀態值
             *      clear() 清除狀態
             */
     (2.2)ListState
             /**
             *  ListState<T> :這個狀態爲每個 key 保存集合的值
             *      get() 獲取狀態值
             *      add() / addAll() 更新狀態值,將數據放到狀態中
             *      clear() 清除狀態
             */
     (2.3)MapState
             /**
             *  MapState<K, V> :這個狀態爲每個 key 保存一個 Map 集合
             *      put() 將對應的 key 的鍵值對放到狀態中
             *      values() 拿到 MapState 中全部的 value
             *      clear() 清除狀態
             */
     (2.4)ReducingState
            /**
             *  ReducingState<T> :這個狀態爲每個 key 保存一個聚合以後的值
             *      get() 獲取狀態值
             *      add()  更新狀態值,將數據放到狀態中
             *      clear() 清除狀態
             */
     (2.5)AggregatingState

        (2.6)FoldingState
(3) flink的keyed的state經過繼承RichFlatMapFunction,重寫open和flatMap,定義狀態,實現狀態的管理和自定義

3、State backend

一、概述

Flink支持的StateBackend:
MemoryStateBackend
FsStateBackend
RocksDBStateBackendide

(1)、MemoryStateBackend

Flink(四)Flink的State
默認狀況下,狀態信息是存儲在 TaskManager 的堆內存中的,checkpoint 的時候將狀態保存到JobManager 的堆內存中。
缺點:
只能保存數據量小的狀態
狀態數據有可能會丟失
優勢:
開發測試很方便測試

(2)、 FSStateBackend

Flink(四)Flink的State

狀態信息存儲在TaskManager 的堆內存中的,checkpoint 的時候將狀態保存到指定的文件中 (HDFS等文件系統)
缺點:
狀態大小受TaskManager內存限制(默認支持5M)
優勢:
狀態訪問速度很快
狀態信息不會丟失
用於: 生產,也可存儲狀態數據量大的狀況3d

(3)、 RocksDBStateBackend

Flink(四)Flink的State
狀態信息存儲在 RocksDB 數據庫 (key-value 的數據存儲服務), 最終保存在本地文件中
checkpoint 的時候將狀態保存到指定的文件中 (HDFS 等文件系統)
缺點:
狀態訪問速度有所降低
優勢:
能夠存儲超大量的狀態信息
狀態信息不會丟失
用於: 生產,能夠存儲超大量的狀態信息rest

二、 StateBackend配置方式

(1)單任務調整

修改當前任務代碼 env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints")); 或者new MemoryStateBackend() 或者new RocksDBStateBackend(filebackend, true);【須要添加第三方依賴】code

(2)全局調整

修改flink-conf.yaml state.backend: filesystem state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints 注意:state.backend的值能夠是下面幾種:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

4、checkpoint(容錯)

一、checkpoint概述

(1)爲了保證state的容錯性,Flink須要對state進行checkpoint。 (2)Checkpoint是Flink實現容錯機制最核心的功能,它可以根據配置週期性地基於Stream中各個
Operator/task的狀態來生成快照,從而將這些狀態數據按期持久化存儲下來,當Flink程序一旦意外崩
潰時,從新運行程序時能夠有選擇地從這些快照進行恢復,從而修正由於故障帶來的程序數據異常
(3)Flink的checkpoint機制能夠與(stream和state)的持久化存儲交互的前提:
持久化的source,它須要支持在必定時間內重放事件。這種sources的典型例子是持久化的消息隊列
(好比Apache Kafka,RabbitMQ等)或文件系統(好比HDFS,S3,GFS等)
用於state的持久化存儲,例如分佈式文件系統(好比HDFS,S3,GFS等)
生成快照:5秒
Flink(四)Flink的State

恢復快照:
Flink(四)Flink的State

二、checkpoint配置

默認checkpoint功能是disabled的,想要使用的時候須要先啓用,checkpoint開啓以後,
checkPointMode有兩種,Exactly-once和At-least-once,默認的checkPointMode是Exactly-once,
Exactly-once對於大多數應用來講是最合適的。At-least-once可能用在某些延遲超低的應用程序(始終延遲爲幾毫秒)。

默認checkpoint功能是disabled的,想要使用的時候須要先啓用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
// 每隔1000 ms進行啓動一個檢查點【設置checkpoint的週期】 
env.enableCheckpointing(1000); 
// 高級選項: 
// 設置模式爲exactly-once (這是默認值) 
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); 
// 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】 
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); 
// 檢查點必須在一分鐘內完成,或者被丟棄【checkpoint的超時時間】 
env.getCheckpointConfig().setCheckpointTimeout(60000); 
// 同一時間只容許進行一個檢查點 
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 
// 表示一旦Flink處理程序被cancel後,會保留Checkpoint數據,以便根據實際須要恢復到指定的 Checkpoint【詳細解釋見備註】 
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCl 
eanup.RETAIN_ON_CANCELLATION);

三、恢復數據(容錯)

(1)重啓策略概述

Flink支持不一樣的重啓策略,以在故障發生時控制做業如何重啓,集羣在啓動時會伴隨一個默認的重啓策略,在沒有定義具體重啓策略時會使用該默認策略。 若是在工做提交時指定了一個重啓策略,該策略會覆蓋集羣的默認策略,默認的重啓策略能夠經過 Flink 的配置文件 flink-conf.yaml 指定。配置參數restart-strategy 定義了哪一個策略被使用。
經常使用的重啓策略
(1.1)固定間隔 (Fixed delay)
(1.2)失敗率 (Failure rate)
(1.3)無重啓 (No restart)
若是沒有啓用 checkpointing,則使用無重啓 (no restart) 策略。
若是啓用了 checkpointing,但沒有配置重啓策略,則使用固定間隔 (fixed-delay) 策略, 嘗試重啓次數默認值是:Integer.MAX_VALUE,重啓策略能夠在flink-conf.yaml中配置,表示全局的配置。也能夠在應用代碼中動態指定,會覆蓋全局配置。

(2)重啓策略

(2.1)固定間隔 (Fixed delay)

第一種:全局配置 flink-conf.yaml 
restart-strategy: fixed-delay 
restart-strategy.fixed-delay.attempts: 3 
restart-strategy.fixed-delay.delay: 10 s 
第二種:應用代碼設置 
env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 嘗試重啓的次數
Time.of(10, TimeUnit.SECONDS) // 間隔 ));

(2.2)失敗率 (Failure rate)

第一種:全局配置 flink-conf.yaml 
restart-strategy: failure-rate 
restart-strategy.failure-rate.max-failures-per-interval: 3 
restart-strategy.failure-rate.failure-rate-interval: 5 min 
restart-strategy.failure-rate.delay: 10 s 
第二種:應用代碼設置 
env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // 一個時間段內的最大失敗次數 
Time.of(5, TimeUnit.MINUTES), // 衡量失敗次數的是時間段 
Time.of(10, TimeUnit.SECONDS) // 間隔 ));

(2..3)無重啓 (No restart)

第一種:全局配置 flink-conf.yaml 
restart-strategy: none 第二種:應用代碼設置
env.setRestartStrategy(RestartStrategies.noRestart());

四、多checkpoint

默認狀況下,若是設置了Checkpoint選項,則Flink只保留最近成功生成的1個Checkpoint,而當Flink程序失敗時,能夠從最近的這個Checkpoint來進行恢復。可是,若是咱們但願保留多個Checkpoint,並可以根據實際須要選擇其中一個進行恢復,這樣會更加靈活,好比,咱們發現最近4個小時數據記錄處理有問題,但願將整個狀態還原到4小時以前Flink能夠支持保留多個Checkpoint,須要在Flink的配置文件conf/flink-conf.yaml中,添加以下配置,指定最多須要保存Checkpoint的個數:

state.checkpoints.num-retained: 20

這樣設置之後就查看對應的Checkpoint在HDFS上存儲的文件目錄
hdfs dfs -ls hdfs://namenode:9000/flink/checkpoints
若是但願回退到某個Checkpoint點,只須要指定對應的某個Checkpoint路徑便可實現

五、從checkpoint恢復數據

若是Flink程序異常失敗,或者最近一段時間內數據處理錯誤,咱們能夠將程序從某一個Checkpoint點進行恢復

bin/flink run -s hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk- 56/_metadata flink-job.jar
### ```

-----

程序正常運行後,還會按照Checkpoint配置進行運行,繼續生成Checkpoint數據。
固然恢復數據的方式還能夠在本身的代碼裏面指定checkpoint目錄,這樣下一次啓動的時候即便代碼發生了改變就自動恢復數據了。
相關文章
相關標籤/搜索