一、Operator Statejava
二、Keyed Statenode
三、狀態的表現形式web
四、Operator State與Keyed State的Redistribute(從新分配)數據庫
1)、Operator State Redistribute Redistributeapache
2)、Keyed State的Redistributewindows
概念後端
特色: api
基本原理:數據結構
全局異步化是snapshot的核心機制 併發
Flink分佈式快照的核心概念之一就是數據柵欄(barrier)。這些barrier被插入到數據流中,做爲數據流的一部分和數據一塊兒向下流動。Barrier不會干擾正常數據,數據嚴格有序。一個barrier把數據流分割成兩部分:一部 分進入到當前快照,另外一部分進入下一個快照。每個barrier都帶有快照ID,而且barrier以前的數據都進入了 此快照。Barrier不會干擾數據流處理,因此很是輕量。多個不同快照的多個barrier會在流中同時出現,即多個 快照可能同時建立。
使用Checkpointing的前提條件:
checkpoint:
Restore:
一、開啓checkPoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //start a checkpoint every 1000 ms 1000-checkpoint時間間隔 env.enableCheckpointing(1000); //advanced options: checkpoint保證形式 //set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //make sure 500 ms of progress happen between checkpoints 兩次間隔最小時間,若是上次沒有完成會等待完成在執行下一次 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); //checkpoints have to complete within one minute,or are discarded ;超時時間 env.getCheckpointConfig().setCheckpointTimeout(60000); //allow only one checkpoint to be in progress at the same time; checkpoint 並行度 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //enable externalized checkpoints which are retained after job cancellation;任務結束,checkpoint是否保留 env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
二、CheckpointConfig設置說明
//set mode to exactly-once (this is the default) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
默認狀況下,檢查點不被保留,僅用於從故障中恢復做業。能夠啓用外部持久化檢查點,同時指定保留策略
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在做業取消時保留檢查點。注意,在這種狀況下,必須在取消後手動清理檢查點狀態。
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION看成業被cancel時,刪除檢查點。檢查點狀態僅在做業失敗時可用。
//checkpoints have to complete within one minute, or are discarded env.getCheckpointConfig().setCheckpointTimeout(60000);
//make sure 500 ms of progress happen between checkpoints env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//allow only one checkpoint to be in progress at the same time env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
三、選擇State Backend
目前Flink自帶三個開箱即用State Backend:
StateBackend backend = new MemoryStateBackend(10*1024*1024,false); env.setStateBackend(backend);
限制
單個State的大小默認限制爲5MB,能夠在MemoryStateBackend的構造函數中增長
不論如何配置,State大小都沒法大於akka.framesize(JobManager和TaskManager之間發送的最大消息 的大小默認是10MB)
JobManager必須有足夠的內存大小
FsStateBackend
FsStateBackend須要配置一個文件系統的URL, 如 "hdfs://namenode:40010/flink/checkpoint"。
FsStateBackend在TaskManager的內存中持有正在處理的數據。Checkpoint時將state snapshot 寫入文件系 統目錄下的文件中。文件的路徑等元數據會傳遞給JobManager,存在其內存中。
FsStateBackend可使用異步的方式進行快照(默認開啓),推薦使用異步的方式避免阻塞。若是不但願異 步能夠在構造的時候傳入false(也能夠經過全局配置文件指定),以下:
StateBackend backend = new FsStateBackend("hdfs://namenode:40010/flink/checkpoints",false); env.setStateBackend(backend);
RocksDBStateBackend
RocksDBStateBackend須要配置一個文件系統的URL來, 如"hdfs://namenode:40010/flink/checkpoint"
RocksDBStateBackend將運行中的數據保存在RocksDB數據庫中,(默認狀況下)存儲在TaskManager數據 目錄中,在Checkpoint時,整個RocksDB數據庫將被Checkpointed到配置的文件系統和目錄中。文件的路徑 等元數據會傳遞給JobManager,存在其內存中。
RocksDBStateBackend老是執行異步快照
限制
RocksDB JNI API是基於byte[],所以key和value最大支持大小爲2^31個字節(2GB)。RocksDB自身在 支持較大value時候有問題
# The backend that will be used to store operator state checkpoints state.backend: filesystem #Directory.for storing checkpoints state.checkpoints.dir: hdfs:namenode:40010/flink/checkpoints
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
四、配置恢復策略
Flink支持不同的重啓策略,這些策略控制在出現故障時如何從新啓動job
一、operatorState的checkPoint容錯案例:
import java.util.concurrent.TimeUnit import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} import org.apache.flink.api.common.time.Time import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation} import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.util.Collector import scala.collection.mutable.ListBuffer /** * OperatorState的checkPoint容錯恢復 * 想知道兩次事件 xxd 之間,一共發生多少次其餘事件,分別是什麼事件 * 事件流:xxd a a a a a f d d xxd ad d s s d xxd… * 當事件流中出現字母e時觸發容錯 * 輸出: * (8,a a a a a f d d) * (6,ad d s s d) */ object OperatorStateRecovery { def main(args: Array[String]): Unit = { import org.apache.flink.api.scala._ //生成配置對象 val config = new Configuration() //開啓spark-webui config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日誌文件,不然打印日誌到控制檯 config.setString("web.log.path", "/tmp/logs/flink_log") //配置taskManager的日誌文件,不然打印日誌到控制檯 config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/tmp/logs/flink_log") //配置tm有多少個slot config.setString("taskmanager.numberOfTaskSlots", "4") // 獲取local運行環境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) //設置全局並行度爲1,好讓全部數據都跑到一個task中,以方便測試 env.setParallelism(1) //隔多長時間執行一次ck 毫秒 env.enableCheckpointing(1000L) val checkpointConfig: CheckpointConfig = env.getCheckpointConfig //保存EXACTLY_ONCE checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) //每次ck之間的間隔,不會重疊 checkpointConfig.setMinPauseBetweenCheckpoints(2000L) //每次ck的超時時間 checkpointConfig.setCheckpointTimeout(10L) //若是ck執行失敗,程序是否中止 checkpointConfig.setFailOnCheckpointingErrors(true) //job在執行CANCE的時候是否刪除ck數據 checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION) //指定保存ck的存儲模式 val stateBackend = new FsStateBackend("file:/tmp/flink/checkpoints", true) //異步同步 // val stateBackend = new MemoryStateBackend(10 * 1024 * 1024,false) // val stateBackend = new RocksDBStateBackend("hdfs://ns1/flink/checkpoints",true) env.setStateBackend(stateBackend) //恢復策略,恢復三次,間隔0秒 env.setRestartStrategy( RestartStrategies.fixedDelayRestart( 3, // number of restart attempts Time.of(0, TimeUnit.SECONDS) // delay ) ) val input: DataStream[String] = env.socketTextStream("localhost", 6666) input .flatMap(new OperatorStateRecoveryRichFunction) .print() env.execute() } } //因爲使用了本地狀態因此須要checkpoint的snapshotState方法把本地狀態放到託管狀態中 class OperatorStateRecoveryRichFunction extends RichFlatMapFunction[String, (Int, String)] with CheckpointedFunction { //託管狀態 @transient private var checkPointCountList: ListState[String] = _ //原始狀態 private var list: ListBuffer[String] = new ListBuffer[String] //flatMap函數處理邏輯 override def flatMap(value: String, out: Collector[(Int, String)]): Unit = { if (value == "xxd") { if (list.size > 0) { val outString: String = list.foldLeft("")(_ + " " + _) out.collect((list.size, outString)) list.clear() } } else if (value == "e") { 1 / 0 } else { list += value } } //再checkpoint時存儲,把正在處理的原始狀態的數據保存到託管狀態中 override def snapshotState(context: FunctionSnapshotContext): Unit = { checkPointCountList.clear() list.foreach(f => checkPointCountList.add(f)) println(s"snapshotState:${ list }, Time=${System.currentTimeMillis()}") } //從statebackend中恢復保存的託管狀態,並未來數據放到程序處理的原始狀態中 // 出錯一次就調用一次這里,能調用幾回是根據setRestartStrategy設置的 override def initializeState(context: FunctionInitializationContext): Unit = { val lsd: ListStateDescriptor[String] = new ListStateDescriptor[String]("xxdListState", TypeInformation.of(new TypeHint[String] {})) checkPointCountList = context.getOperatorStateStore.getListState(lsd) if (context.isRestored) {// 出錯恢復 import scala.collection.convert.wrapAll._ for (e <- checkPointCountList.get()) { list += e } } println(s"initializeState:${list},Time=${System.currentTimeMillis()}") } }
二、Keyed State容錯實現方法
StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class) stateDescriptor.enableTimeToLive(ttlConfig);
Refresh策略(默認是OnCreateAndWrite):設置如何更新keyedState的最後訪問時間
StateTtlConfig.UpdateType.Disabled - 禁用TTL,永不過時
StateTtlConfig.UpdateType.OnCreateAndWrite - 每次寫操做均更新State的最後訪問時間(Create、 Update)
StateTtlConfig.UpdateType.OnReadAndWrite - 每次讀寫操做均更新State的最後訪問時間
KeyedState的checkPoint容錯恢復 :
import java.util.concurrent.TimeUnit import org.apache.flink.api.common.functions.RichFlatMapFunction import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.state.{StateTtlConfig, ValueState, ValueStateDescriptor} import org.apache.flink.api.common.time.Time import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation} import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.util.Collector import scala.collection.mutable /** * KeyedState的checkPoint容錯恢復 * 將輸入格式爲"字符串 數字"的字符串轉換成(字符串,數字)的元組類型 * 事件流:xxd 666 * 當事件流中出現"任意字符串 888"時觸發容錯 * 輸出: * (xxd,666) */ object KeyedStateRecovery { def main(args: Array[String]): Unit = { import org.apache.flink.api.scala._ //生成配置對象 val config = new Configuration() //開啓spark-webui config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) //配置webui的日誌文件,不然打印日誌到控制檯 config.setString("web.log.path", "/tmp/logs/flink_log") //配置taskManager的日誌文件,不然打印日誌到控制檯 config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/tmp/logs/flink_log") //配置tm有多少個slot config.setString("taskmanager.numberOfTaskSlots", "4") val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) //並行度設置爲1,是想讓全部的key都跑到一個task中,以方便測試 env.setParallelism(1) //隔多長時間執行一次ck env.enableCheckpointing(1000L) val checkpointConfig: CheckpointConfig = env.getCheckpointConfig //保存EXACTLY_ONCE checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) //每次ck之間的間隔,不會重疊 checkpointConfig.setMinPauseBetweenCheckpoints(2000L) //每次ck的超時時間 checkpointConfig.setCheckpointTimeout(10L) //若是ck執行失敗,程序是否中止 checkpointConfig.setFailOnCheckpointingErrors(true) //job在執行CANCE的時候是否刪除ck數據 checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION) //指定保存ck的存儲模式 val stateBackend = new FsStateBackend("file:/tmp/flink/checkpoints", true) // val stateBackend = new MemoryStateBackend(10 * 1024 * 1024,false) // val stateBackend = new RocksDBStateBackend("hdfs://ns1/flink/checkpoints",true) env.setStateBackend(stateBackend) //恢復策略 env.setRestartStrategy( RestartStrategies.fixedDelayRestart( 3, // number of restart attempts Time.of(3, TimeUnit.SECONDS) // delay ) ) val input: DataStream[String] = env.socketTextStream("localhost", 6666) //由於KeyedStateRichFunctionString中使用了keyState,因此它必須在keyBy算子的後面 input .map(f => { val strings: mutable.ArrayOps[String] = f.split(" ") (strings(0), strings(1).toInt) }) .keyBy(0) .flatMap(new KeyedStateRecoveryRichFunctionString) .print() env.execute() } } //因爲沒有使用本地的狀態因此不須要實現checkpoint接口 class KeyedStateRecoveryRichFunctionString extends RichFlatMapFunction[(String, Int), (String, Int)] { //ValueState是Key的state類型,是隻能存在於KeyedStream的operator中 @transient private var sum: ValueState[(String, Int)] = null override def flatMap(value: (String, Int), out: Collector[(String, Int)]): Unit = { println(s"state value:${sum.value()}") //當value值爲888時,觸發異常 if (value._2 != 888) { sum.clear() sum.update(value) out.collect(value) } else { 1 / 0 } } //在operator啓動時執行一次 //若是operator出現異常,在恢復operator時會被再次執行 override def open(parameters: Configuration): Unit = { //keyState的TTL策略 val ttlConfig = StateTtlConfig //keyState的超時時間爲10秒 .newBuilder(Time.seconds(10)) //當建立和更新時,從新計時超時時間 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) //失敗時不返回keyState的值 .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) //失敗時返回keyState的值 // .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) //ttl的時間處理等級目前只支持ProcessingTime .setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime) .build //從runtimeContext中得到ck時保存的狀態 val descriptor = new ValueStateDescriptor[(String, Int)]("xxdValueState", TypeInformation.of(new TypeHint[(String, Int)] {})) descriptor.enableTimeToLive(ttlConfig) sum = getRuntimeContext.getState(descriptor) } }
概念:
Savepoint由兩部分組成:
與Checkpoint的區別:
做用: