Flink裏面有state的概念
State:通常指一個具體的task/operator的狀態。State能夠被記錄,在失敗的狀況下數據還能夠恢復,Flink中有兩種基本類型的State:Keyed State,Operator State,他們兩種均可以以兩種形式存在:原始狀態(raw state)和託管狀態(managed state)
託管狀態:由Flink框架管理的狀態,咱們一般使用的就是這種。
原始狀態:由用戶自行管理狀態具體的數據結構,框架在作checkpoint的時候,使用byte[]來讀寫狀態內容,對其內部數據結構一無所知。一般在DataStream上的狀態推薦使用託管的狀態,當實現一個用戶自定義的operator時,會使用到原始狀態。可是咱們工做中通常不經常使用,因此咱們不考慮他。
node
裏面沒有shuffle的操做,或者說裏面沒有key by的操做。
數據庫
(1)operator state是task級別的state,說白了就是每一個task對應一個state數據結構
(2)Kafka Connector source中的每一個分區(task)都須要記錄消費的topic的partition和offset等信息。
(3)operator state 只有一種託管狀態:ValueState
(4)operator state能夠定義爲source和sink的狀態進行管理框架
(1)keyed state記錄的是每一個key的狀態
(2)Keyed state託管狀態有六種類型分佈式
(2.1)ValueState /** * ValueState<T> :這個狀態爲每個 key 保存一個值 * value() 獲取狀態值 * update() 更新狀態值 * clear() 清除狀態 */ (2.2)ListState /** * ListState<T> :這個狀態爲每個 key 保存集合的值 * get() 獲取狀態值 * add() / addAll() 更新狀態值,將數據放到狀態中 * clear() 清除狀態 */ (2.3)MapState /** * MapState<K, V> :這個狀態爲每個 key 保存一個 Map 集合 * put() 將對應的 key 的鍵值對放到狀態中 * values() 拿到 MapState 中全部的 value * clear() 清除狀態 */ (2.4)ReducingState /** * ReducingState<T> :這個狀態爲每個 key 保存一個聚合以後的值 * get() 獲取狀態值 * add() 更新狀態值,將數據放到狀態中 * clear() 清除狀態 */ (2.5)AggregatingState (2.6)FoldingState (3) flink的keyed的state經過繼承RichFlatMapFunction,重寫open和flatMap,定義狀態,實現狀態的管理和自定義
Flink支持的StateBackend:
MemoryStateBackend
FsStateBackend
RocksDBStateBackendide
默認狀況下,狀態信息是存儲在 TaskManager 的堆內存中的,checkpoint 的時候將狀態保存到JobManager 的堆內存中。
缺點:
只能保存數據量小的狀態
狀態數據有可能會丟失
優勢:
開發測試很方便測試
狀態信息存儲在TaskManager 的堆內存中的,checkpoint 的時候將狀態保存到指定的文件中 (HDFS等文件系統)
缺點:
狀態大小受TaskManager內存限制(默認支持5M)
優勢:
狀態訪問速度很快
狀態信息不會丟失
用於: 生產,也可存儲狀態數據量大的狀況3d
狀態信息存儲在 RocksDB 數據庫 (key-value 的數據存儲服務), 最終保存在本地文件中
checkpoint 的時候將狀態保存到指定的文件中 (HDFS 等文件系統)
缺點:
狀態訪問速度有所降低
優勢:
能夠存儲超大量的狀態信息
狀態信息不會丟失
用於: 生產,能夠存儲超大量的狀態信息rest
修改當前任務代碼 env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints")); 或者new MemoryStateBackend() 或者new RocksDBStateBackend(filebackend, true);【須要添加第三方依賴】
code
修改flink-conf.yaml state.backend: filesystem state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints 注意:state.backend的值能夠是下面幾種:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
(1)爲了保證state的容錯性,Flink須要對state進行checkpoint。 (2)Checkpoint是Flink實現容錯機制最核心的功能,它可以根據配置週期性地基於Stream中各個
Operator/task的狀態來生成快照,從而將這些狀態數據按期持久化存儲下來,當Flink程序一旦意外崩
潰時,從新運行程序時能夠有選擇地從這些快照進行恢復,從而修正由於故障帶來的程序數據異常
(3)Flink的checkpoint機制能夠與(stream和state)的持久化存儲交互的前提:
持久化的source,它須要支持在必定時間內重放事件。這種sources的典型例子是持久化的消息隊列
(好比Apache Kafka,RabbitMQ等)或文件系統(好比HDFS,S3,GFS等)
用於state的持久化存儲,例如分佈式文件系統(好比HDFS,S3,GFS等)
生成快照:5秒
恢復快照:
默認checkpoint功能是disabled的,想要使用的時候須要先啓用,checkpoint開啓以後,
checkPointMode有兩種,Exactly-once和At-least-once,默認的checkPointMode是Exactly-once,
Exactly-once對於大多數應用來講是最合適的。At-least-once可能用在某些延遲超低的應用程序(始終延遲爲幾毫秒)。
默認checkpoint功能是disabled的,想要使用的時候須要先啓用 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 每隔1000 ms進行啓動一個檢查點【設置checkpoint的週期】 env.enableCheckpointing(1000); // 高級選項: // 設置模式爲exactly-once (這是默認值) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 檢查點必須在一分鐘內完成,或者被丟棄【checkpoint的超時時間】 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一時間只容許進行一個檢查點 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 表示一旦Flink處理程序被cancel後,會保留Checkpoint數據,以便根據實際須要恢復到指定的 Checkpoint【詳細解釋見備註】 env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCl eanup.RETAIN_ON_CANCELLATION);
Flink支持不一樣的重啓策略,以在故障發生時控制做業如何重啓,集羣在啓動時會伴隨一個默認的重啓策略,在沒有定義具體重啓策略時會使用該默認策略。 若是在工做提交時指定了一個重啓策略,該策略會覆蓋集羣的默認策略,默認的重啓策略能夠經過 Flink 的配置文件 flink-conf.yaml 指定。配置參數restart-strategy 定義了哪一個策略被使用。
經常使用的重啓策略
(1.1)固定間隔 (Fixed delay)
(1.2)失敗率 (Failure rate)
(1.3)無重啓 (No restart)
若是沒有啓用 checkpointing,則使用無重啓 (no restart) 策略。
若是啓用了 checkpointing,但沒有配置重啓策略,則使用固定間隔 (fixed-delay) 策略, 嘗試重啓次數默認值是:Integer.MAX_VALUE,重啓策略能夠在flink-conf.yaml中配置,表示全局的配置。也能夠在應用代碼中動態指定,會覆蓋全局配置。
(2.1)固定間隔 (Fixed delay)
第一種:全局配置 flink-conf.yaml restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 3 restart-strategy.fixed-delay.delay: 10 s 第二種:應用代碼設置 env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 嘗試重啓的次數 Time.of(10, TimeUnit.SECONDS) // 間隔 ));
(2.2)失敗率 (Failure rate)
第一種:全局配置 flink-conf.yaml restart-strategy: failure-rate restart-strategy.failure-rate.max-failures-per-interval: 3 restart-strategy.failure-rate.failure-rate-interval: 5 min restart-strategy.failure-rate.delay: 10 s 第二種:應用代碼設置 env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // 一個時間段內的最大失敗次數 Time.of(5, TimeUnit.MINUTES), // 衡量失敗次數的是時間段 Time.of(10, TimeUnit.SECONDS) // 間隔 ));
(2..3)無重啓 (No restart)
第一種:全局配置 flink-conf.yaml restart-strategy: none 第二種:應用代碼設置 env.setRestartStrategy(RestartStrategies.noRestart());
默認狀況下,若是設置了Checkpoint選項,則Flink只保留最近成功生成的1個Checkpoint,而當Flink程序失敗時,能夠從最近的這個Checkpoint來進行恢復。可是,若是咱們但願保留多個Checkpoint,並可以根據實際須要選擇其中一個進行恢復,這樣會更加靈活,好比,咱們發現最近4個小時數據記錄處理有問題,但願將整個狀態還原到4小時以前Flink能夠支持保留多個Checkpoint,須要在Flink的配置文件conf/flink-conf.yaml中,添加以下配置,指定最多須要保存Checkpoint的個數:
state.checkpoints.num-retained: 20
這樣設置之後就查看對應的Checkpoint在HDFS上存儲的文件目錄hdfs dfs -ls hdfs://namenode:9000/flink/checkpoints
若是但願回退到某個Checkpoint點,只須要指定對應的某個Checkpoint路徑便可實現
若是Flink程序異常失敗,或者最近一段時間內數據處理錯誤,咱們能夠將程序從某一個Checkpoint點進行恢復
bin/flink run -s hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk- 56/_metadata flink-job.jar ### ``` ----- 程序正常運行後,還會按照Checkpoint配置進行運行,繼續生成Checkpoint數據。 固然恢復數據的方式還能夠在本身的代碼裏面指定checkpoint目錄,這樣下一次啓動的時候即便代碼發生了改變就自動恢復數據了。