flink系列-九、flink的狀態與容錯

一、理解State(狀態)

1.一、State

  • 對象的狀態
    • Flink中的狀態:通常指一個具體的task/operator某時刻在內存中的狀態(例如某屬性的值)
    • 注意:State和Checkpointing 不要搞混 
    • checkpoint則表示了一個Flink Job,在一個特定時刻的一份全狀態快照,即包含一個job下全部task/operator 某時刻的狀態 
  • 狀態的做用
    • 增量計算 
    • 聚合操做 
    • 機器學習訓練模式 
    • 等等 
    • 容錯 
    • Job故障重啓 
    • 升級

1.二、狀態的分類

一、Operator Statejava

  • 綁定到特定operator並行實例,每一個operator的並行實例維護一個狀態
  • 與key無關
  • 例如:一個並行度爲3的source,若是隻考慮一個算子須要一個邏輯狀態的情形,那麼他就有3operator個狀態
  • 支持的數據類型
    • ListState

二、Keyed Statenode

  • 基於KeyedStream之上的狀態,dataStream.keyBy(),只能在做用於KeyedStrem上的function/Operator里使用
  • KeyBy以後的Operator State,可理解爲分區過的Operator State
  • 每一個並行keyed Operator的每一個實例的每一個key有一個Keyed State:即就是 一個惟一的狀態,因爲每一個key屬於一個keyed operator的並行實例,所以咱們能夠將其簡單地理解爲 <operator,key>

  

  • 支持的數據結構 
    • ValueState:保留一個能夠更新和檢索的值 
      • update(T) 
      • value() 
    • ListState<T>:保存一個元素列表 
      • add(T) 
      • addAll(List) 
      • get(T) 
      • clear() 
    • ReducingState<T>:保存一個值,該值表示添加到該狀態全部值的聚合。 
      • add(T) 
    • AggregatingState<IN,OUT><in,out>:保存一個值,該值表示添加到該狀態的全部值的聚合。(與ReducingState 相反,聚合類型添加到該狀態的元素能夠有不同類型) 
      • add(T) 
    • FoldingState<T,ACC><t,acc>:不推薦使用 
      • add(T) 
    • MapState<UK,UV><uk,uv>:保存一個映射列表 
      • put(UK,UV) 
      • putAll(Map<uk,uv>) 
      • get(UK) 

 三、狀態的表現形式web

  • Keyed State和Operator State,能夠以兩種形式存在:原始狀態和託管狀態。
    • managed(託管狀態):
      • 託管狀態是指Flink框架管理的狀態,如ValueState,ListState,MapState等。 
      • 經過框架提供的接口來更新和管理狀態的值 
      • 不須要序列化 
    • raw(原始狀態) 原始狀態是由用戶自行管理的具體的數據結構,Flink在作checkpoint的時候,使用byte[]來讀寫狀態內 容,對其內部數據結構一無所知
      • 須要序列化
      • 一般在DataStream上的狀態推薦使用託管的狀態,當用戶自定義operator時,會使用到原始狀態。
      • 大多數都是託管狀態,除非自定義實現。

四、Operator State與Keyed State的Redistribute(從新分配)數據庫

1)、Operator State Redistribute Redistributeapache

  • 當Operator改變併發度的時候(Rescale),會觸發狀態的Redistribute,即Operator State里的 數據會從新分配到Operator的Task實例
  • 例如:某Operator的並行度由3改成2

  • 不同數據結構的動態擴展方式不同樣:
    • ListState:併發度在改變的時候,會將併發上的每一個List都取出,而後把這些List合併到一個新的List,然 後根據元素的個數在均勻分配給新的Task
    • UnionListState:相比於ListState更加靈活,把劃分的方式交給用戶去作,當改變併發的時候,會將原來 的List拼接起來。而後不作劃分,直接交給用戶(每一個Task給全量的狀態,用戶本身劃分)
    • BroadcastState:如大表和小表作Join時,小表能夠直接廣播給大表的分區,在每一個併發上的數據都是完 全一致的。作的更新也相同,當改變併發的時候,把這些數據COPY到新的Task便可。
    • 以上是Flink Operator States提供的3種擴展方式,用戶能夠根據本身的需求作選擇。

2)、Keyed State的Redistributewindows

  • Keyed State Redistribute
    • Key被Redistribute哪一個task,他對應的Keyed State就被Redistribute到哪一個Task
    • Keyed State Redistribute是基於Key Group來作分配的:
      • 將key分爲group
      • 每一個key分配到惟一的group 
      • 將group分配給task實例 
      • KeyGroup由最大並行度的大小所決定的 
      • Keyed State最終分配到哪一個Task:group ID和taskID是從0開始算的 
        • hash=hash(key) 
        • KG=hash % numOfKeyGroups 
        • Subtask=KG* taskNum / numOfKeyGroups

2、CheckPoint

 2.一、狀態容錯

  • 有了狀態天然須要狀態容錯,不然狀態就失去意義了
  • Flink狀態容錯的機制就是checkpoint

概念後端

  • 所謂checkpoint,就是在某一時刻,將全部task的狀態作一個快照(snapshot),而後存儲到State Backend (有全量 和 增量)
  • 一種連續性繪製數據流狀態的機制(週期性的),該機制確保即便出現故障,程序的狀態最終也將爲數據流中的每一條記錄提供exactly once(只處理一次)的語意保證(只能保證flink系統內,對於sink和source須要依賴的外部的組件一同保證)
  • 全局快照,持久化保存全部的task / operator的State

特色: api

  • 輕量級容錯機制
  • 可異步
  • 全量 vs 增量
  • Barrier機制(保證exactly-once 語義)
  • 失敗狀況可回滾至最近一次成功的checkpoint(自動)
  • 週期性(無需人工干預)

基本原理:數據結構

  • 經過往source 注入barrier
  • barrier做爲checkpoint的標誌
  • barrier
    • 全局異步化是snapshot的核心機制 併發

    • Flink分佈式快照的核心概念之一就是數據柵欄(barrier)。這些barrier被插入到數據流中,做爲數據流的一部分和數據一塊兒向下流動。Barrier不會干擾正常數據,數據嚴格有序。一個barrier把數據流分割成兩部分:一部 分進入到當前快照,另外一部分進入下一個快照。每個barrier都帶有快照ID,而且barrier以前的數據都進入了 此快照。Barrier不會干擾數據流處理,因此很是輕量。多個不同快照的多個barrier會在流中同時出現,即多個 快照可能同時建立。

使用Checkpointing的前提條件:

  • 在必定時間內可回溯的datasource(故障時能夠回溯數據),常見的:
    • 通常是可持久化的消息隊列:例如Kafka、RabbitMQ、Amazon Kinesis、Google PubSub
    • 也能夠是文件系統:HDFS、S三、GFS、NFS、Ceph
  • 可持久化存儲State的存儲系統,一般使用分佈式文件系統(Checkpointing就是把job的全部狀態都週期性持 久化到存儲里)
    • 通常是HDFS、S三、GFS、NFS、Ceph

2.二、狀態容錯示意圖

checkpoint:

 

Restore:

  • 恢復全部狀態
  • 設置source的位置(例如:Kafka的offset)

 2.三、使用CheckPoint

 一、開啓checkPoint

  • checkPoint默認是禁用的
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//start a checkpoint every 1000 ms   1000-checkpoint時間間隔
env.enableCheckpointing(1000);
//advanced options: checkpoint保證形式
//set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//make sure 500 ms of progress happen between checkpoints 兩次間隔最小時間,若是上次沒有完成會等待完成在執行下一次
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//checkpoints have to complete within one minute,or are discarded ;超時時間
env.getCheckpointConfig().setCheckpointTimeout(60000);
//allow only one checkpoint to be in progress at the same time; checkpoint 並行度
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//enable externalized checkpoints which are retained after job cancellation;任務結束,checkpoint是否保留
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

二、CheckpointConfig設置說明

  • checkpointMode 
    • //set mode to exactly-once (this is the default)
      env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
  • 保留策略 
    • 默認狀況下,檢查點不被保留,僅用於從故障中恢復做業。能夠啓用外部持久化檢查點,同時指定保留策略

    • ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:在做業取消時保留檢查點。注意,在這種狀況下,必須在取消後手動清理檢查點狀態。

    • ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION看成業被cancel時,刪除檢查點。檢查點狀態僅在做業失敗時可用。

  • checkpointing的超時時間:超過期間沒有完成則會被終止
    • //checkpoints have to complete within one minute, or are discarded
      env.getCheckpointConfig().setCheckpointTimeout(60000);
  • checkpointing最小間隔:用於指定上一個checkpoint完成以後最小等多久能夠出發另外一個checkpoint,當指 定這個參數時,maxConcurrentCheckpoints的值爲1
    • //make sure 500 ms of progress happen between checkpoints
      env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
  • maxConcurrentCheckpoints:指定運行中的checkpoint最多能夠有多少個(設定checkpointing最小間隔時本 參數即爲1) 
    • //allow only one checkpoint to be in progress at the same time
      env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
  • failOnCheckpointingErrors用於指定在checkpoint發生異常的時候,是否應該fail該task,默認爲true,若是設 置爲false,則task會拒絕checkpoint而後繼續運行
    • env.getCheckpointConfig().setFailOnCheckpointingErrors(true);

三、選擇State Backend 

  • State Backend就是用來保存快照的地方
  • 用來在Checkpointing機制中持久化全部狀態的一致性快照,這些狀態包括:
    • 非用戶定義的狀態:例如,timers、非用戶自定義的stateful operators(connectors,windows)
    • 用戶定義的狀態:就是前面講的用戶自定義的stateful operato所使用的Keyed State and Operator State

目前Flink自帶三個開箱即用State Backend: 

  • MemoryStateBackend(默認)
    • MemoryStateBackend在Java堆上維護狀態。Key/value狀態和窗口運算符使用哈希表存儲值和計時器等
    • Checkpoint時,MemoryStateBackend對State作一次快照,並在向JobManager發送Checkpoint確認完 成的消息中帶上此快照數據,而後快照就會存儲在JobManager的堆內存中 
    • MemoryStateBackend可使用異步的方式進行快照(默認開啓),推薦使用異步的方式避免阻塞。若是 不但願異步,能夠在構造的時候傳入false(也能夠經過全局配置文件指定),以下
      • StateBackend backend = new MemoryStateBackend(10*1024*1024,false);
        env.setStateBackend(backend);
    • 限制

      • 單個State的大小默認限制爲5MB,能夠在MemoryStateBackend的構造函數中增長

      • 不論如何配置,State大小都沒法大於akka.framesize(JobManager和TaskManager之間發送的最大消息 的大小默認是10MB)

      • JobManager必須有足夠的內存大小

    •  適用場景 
      • 本地開發和調試 
      • 小狀態job,如只使用Map、FlatMap、Filter...或Kafka Consumer
  • FsStateBackend

    • FsStateBackend須要配置一個文件系統的URL, 如 "hdfs://namenode:40010/flink/checkpoint"。

    • FsStateBackend在TaskManager的內存中持有正在處理的數據。Checkpoint時將state snapshot 寫入文件系 統目錄下的文件中。文件的路徑等元數據會傳遞給JobManager,存在其內存中。

    • FsStateBackend可使用異步的方式進行快照(默認開啓),推薦使用異步的方式避免阻塞。若是不但願異 步能夠在構造的時候傳入false(也能夠經過全局配置文件指定),以下:

      • StateBackend backend = new FsStateBackend("hdfs://namenode:40010/flink/checkpoints",false);
        env.setStateBackend(backend);
    • 適用場景 
      • 大狀態、長窗口、大鍵/值狀態的job 
      • 全部高可用性的狀況 
  • RocksDBStateBackend

    • RocksDBStateBackend須要配置一個文件系統的URL來, 如"hdfs://namenode:40010/flink/checkpoint"

    • RocksDBStateBackend將運行中的數據保存在RocksDB數據庫中,(默認狀況下)存儲在TaskManager數據 目錄中,在Checkpoint時,整個RocksDB數據庫將被Checkpointed到配置的文件系統和目錄中。文件的路徑 等元數據會傳遞給JobManager,存在其內存中。

    • RocksDBStateBackend老是執行異步快照

    • 限制

      • RocksDB JNI API是基於byte[],所以key和value最大支持大小爲2^31個字節(2GB)。RocksDB自身在 支持較大value時候有問題

    • 適用場景
      • 超大狀態,超長窗口、大鍵/值狀態的job 
      • 全部高可用性的狀況 
    • 與前兩種狀態後端對比: 
      • 目前只有RocksDBStateBackend支持增量checkpoint(默認全量) 
      • 狀態保存在數據庫中,即便用RockDB能夠保存的狀態量僅受可用磁盤空間量的限制,相比其餘的狀態後 端可保存更大的狀態,但開銷更大(讀/寫須要反序列化/序列化去檢索/存儲狀態),吞吐受到限制 
  • 三種StateBackend總結以下:

  • 配置StateBackend 
    • 全局配置(配置文件conf/flink-conf.yaml),設置集羣保存checkpoint類型和存儲路徑
    • # The backend that will be used to store operator state checkpoints
      state.backend: filesystem
      #Directory.for storing checkpoints
      state.checkpoints.dir: hdfs:namenode:40010/flink/checkpoints
    • 每一個job單獨配置State Backend(可覆蓋全局配置) ,設置計算任務的
    • StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

四、配置恢復策略

Flink支持不同的重啓策略,這些策略控制在出現故障時如何從新啓動job

  • 若是沒有啓用checkpointing,則使用無重啓(no restart)策略。
  • 若是啓用了checkpointing,但沒有配置重啓策略,則使用固定延遲(fixed-delay)策略,其中嘗試重啓 次數是Integer > MAX_VALUE
  • 重啓策略能夠在flink-conf.yaml中配置,表示全局的配置。也能夠在應用代碼中動態指定,會覆蓋全局配 置

 2.四、checkpoint demo

一、operatorState的checkPoint容錯案例:

import java.util.concurrent.TimeUnit
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

/**
  * OperatorState的checkPoint容錯恢復
  * 想知道兩次事件 xxd 之間,一共發生多少次其餘事件,分別是什麼事件
  * 事件流:xxd a a a a a f d d xxd ad d s s d xxd…
  * 當事件流中出現字母e時觸發容錯
  * 輸出:
  * (8,a a a a a f d d)
  * (6,ad d s s d)
  */
object OperatorStateRecovery {
  def main(args: Array[String]): Unit = {
    import org.apache.flink.api.scala._
    //生成配置對象
    val config = new Configuration()
    //開啓spark-webui
    config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日誌文件,不然打印日誌到控制檯
    config.setString("web.log.path", "/tmp/logs/flink_log")
    //配置taskManager的日誌文件,不然打印日誌到控制檯
    config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/tmp/logs/flink_log")
    //配置tm有多少個slot
    config.setString("taskmanager.numberOfTaskSlots", "4")
    // 獲取local運行環境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
    //設置全局並行度爲1,好讓全部數據都跑到一個task中,以方便測試
    env.setParallelism(1)
    //隔多長時間執行一次ck  毫秒
    env.enableCheckpointing(1000L)
    val checkpointConfig: CheckpointConfig = env.getCheckpointConfig
    //保存EXACTLY_ONCE
    checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    //每次ck之間的間隔,不會重疊
    checkpointConfig.setMinPauseBetweenCheckpoints(2000L)
    //每次ck的超時時間
    checkpointConfig.setCheckpointTimeout(10L)
    //若是ck執行失敗,程序是否中止
    checkpointConfig.setFailOnCheckpointingErrors(true)
    //job在執行CANCE的時候是否刪除ck數據
    checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
    //指定保存ck的存儲模式
    val stateBackend = new FsStateBackend("file:/tmp/flink/checkpoints", true) //異步同步
    // val stateBackend = new MemoryStateBackend(10 * 1024 * 1024,false)
    // val stateBackend = new RocksDBStateBackend("hdfs://ns1/flink/checkpoints",true)
    env.setStateBackend(stateBackend)
    //恢復策略,恢復三次,間隔0秒
    env.setRestartStrategy(
      RestartStrategies.fixedDelayRestart(
        3, // number of restart attempts
        Time.of(0, TimeUnit.SECONDS) // delay
      )
    )
    val input: DataStream[String] = env.socketTextStream("localhost", 6666)
    input
      .flatMap(new OperatorStateRecoveryRichFunction)
      .print()
    env.execute()
  }
}

//因爲使用了本地狀態因此須要checkpoint的snapshotState方法把本地狀態放到託管狀態中
class OperatorStateRecoveryRichFunction extends RichFlatMapFunction[String, (Int, String)] with CheckpointedFunction {

  //託管狀態
  @transient 
  private var checkPointCountList: ListState[String] = _
  //原始狀態
  private var list: ListBuffer[String] = new ListBuffer[String]

  //flatMap函數處理邏輯
  override def flatMap(value: String, out: Collector[(Int, String)]): Unit = {
    if (value == "xxd") {
      if (list.size > 0) {
        val outString: String = list.foldLeft("")(_ + " " + _)
        out.collect((list.size, outString))
        list.clear()
      }
    } else if (value == "e") {
      1 / 0
    } else {
      list += value
    }
  }

  //再checkpoint時存儲,把正在處理的原始狀態的數據保存到託管狀態中
  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    checkPointCountList.clear()
    list.foreach(f => checkPointCountList.add(f))
    println(s"snapshotState:${
      list
    }, Time=${System.currentTimeMillis()}")
  }

  //從statebackend中恢復保存的託管狀態,並未來數據放到程序處理的原始狀態中
  // 出錯一次就調用一次這里,能調用幾回是根據setRestartStrategy設置的
  override def initializeState(context: FunctionInitializationContext): Unit = {
    val lsd: ListStateDescriptor[String] = new ListStateDescriptor[String]("xxdListState", TypeInformation.of(new TypeHint[String] {}))
    checkPointCountList = context.getOperatorStateStore.getListState(lsd)
    if (context.isRestored) {// 出錯恢復
      import scala.collection.convert.wrapAll._
      for (e <- checkPointCountList.get()) {
        list += e
      }
    }
    println(s"initializeState:${list},Time=${System.currentTimeMillis()}")
  }
}

二、Keyed State容錯實現方法

  • Keyed State之過時超時策略
    • 因爲Keyed State太多,因此flink提供了針對Keyed State TTL的設置
    • 任何類型的keyed State均可以設置TTL。若是TTL已配置,且狀態已過時,則將以最佳方式處理
    • 全部State collection都支持條目級別的TTL,即list、map中的條目獨立expire
    • 用法
      • StateTtlConfig ttlConfig = StateTtlConfig
            .newBuilder(Time.seconds(1))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .build();
        ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class)
        stateDescriptor.enableTimeToLive(ttlConfig);
    • Refresh策略(默認是OnCreateAndWrite):設置如何更新keyedState的最後訪問時間

      • StateTtlConfig.UpdateType.Disabled - 禁用TTL,永不過時

      • StateTtlConfig.UpdateType.OnCreateAndWrite - 每次寫操做均更新State的最後訪問時間(Create、 Update)

      • StateTtlConfig.UpdateType.OnReadAndWrite - 每次讀寫操做均更新State的最後訪問時間

    • 狀態可見性(默認是NeverReturnExpired):設置是否返回過時的值(過時還沒有清理,此時正好被訪問)
      • StateTtlConfig.StateVisibility.NeverReturnExpired - 永不返回過時狀態 
      • StateTtlConfig.StateVisibility.ReturnExpiredlfNotCleanedUp - 能夠返回過時但還沒有清理的狀態值 
    • TTL time等級
      • setTimeCharacteristic(TimeCharacteristic timeCharacteristic) 
      • 目前只支持ProcessingTime
  • Keyed State之過時狀態清理
    • 清理策略
    • 默認:已通過期的數據被顯示讀取時纔會清理(可能會致使狀態愈來愈大) 
    • FULL_STATE_SCAN_SNAPSHOT:在checkpoint時清理full snapshot中的expired state 
      • CleanupFullSnapshot() 
      • 不適用於在RocksDB state backend上的incremental checkpointing

KeyedState的checkPoint容錯恢復 :

import java.util.concurrent.TimeUnit

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state.{StateTtlConfig, ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.time.Time
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector

import scala.collection.mutable


/**
  * KeyedState的checkPoint容錯恢復
  * 將輸入格式爲"字符串 數字"的字符串轉換成(字符串,數字)的元組類型
  * 事件流:xxd 666
  * 當事件流中出現"任意字符串 888"時觸發容錯
  * 輸出:
  * (xxd,666)
  */
object KeyedStateRecovery {
  def main(args: Array[String]): Unit = {
    import org.apache.flink.api.scala._
    //生成配置對象
    val config = new Configuration()
    //開啓spark-webui
    config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    //配置webui的日誌文件,不然打印日誌到控制檯
    config.setString("web.log.path", "/tmp/logs/flink_log")
    //配置taskManager的日誌文件,不然打印日誌到控制檯
    config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/tmp/logs/flink_log")
    //配置tm有多少個slot
    config.setString("taskmanager.numberOfTaskSlots", "4")
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
    //並行度設置爲1,是想讓全部的key都跑到一個task中,以方便測試
    env.setParallelism(1)
    //隔多長時間執行一次ck
    env.enableCheckpointing(1000L)
    val checkpointConfig: CheckpointConfig = env.getCheckpointConfig
    //保存EXACTLY_ONCE
    checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    //每次ck之間的間隔,不會重疊
    checkpointConfig.setMinPauseBetweenCheckpoints(2000L)
    //每次ck的超時時間
    checkpointConfig.setCheckpointTimeout(10L)
    //若是ck執行失敗,程序是否中止
    checkpointConfig.setFailOnCheckpointingErrors(true)
    //job在執行CANCE的時候是否刪除ck數據
    checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
    //指定保存ck的存儲模式
    val stateBackend = new FsStateBackend("file:/tmp/flink/checkpoints", true)
    // val stateBackend = new MemoryStateBackend(10 * 1024 * 1024,false)
    // val stateBackend = new RocksDBStateBackend("hdfs://ns1/flink/checkpoints",true)
    env.setStateBackend(stateBackend)
    //恢復策略
    env.setRestartStrategy(
      RestartStrategies.fixedDelayRestart(
        3, // number of restart attempts
        Time.of(3, TimeUnit.SECONDS) // delay
      )
    )
    val input: DataStream[String] = env.socketTextStream("localhost", 6666)
    //由於KeyedStateRichFunctionString中使用了keyState,因此它必須在keyBy算子的後面
    input
      .map(f => {
        val strings: mutable.ArrayOps[String] = f.split(" ")
        (strings(0), strings(1).toInt)
      })
      .keyBy(0)
      .flatMap(new KeyedStateRecoveryRichFunctionString)
      .print()
    env.execute()
  }
}

//因爲沒有使用本地的狀態因此不須要實現checkpoint接口
class KeyedStateRecoveryRichFunctionString extends RichFlatMapFunction[(String, Int), (String, Int)] {
  //ValueState是Key的state類型,是隻能存在於KeyedStream的operator中
  @transient private var sum: ValueState[(String, Int)] = null

  override def flatMap(value: (String, Int), out: Collector[(String, Int)]): Unit = {
    println(s"state value:${sum.value()}")
    //當value值爲888時,觸發異常
    if (value._2 != 888) {
      sum.clear()
      sum.update(value)
      out.collect(value)
    } else {
      1 / 0
    }
  }

  //在operator啓動時執行一次
  //若是operator出現異常,在恢復operator時會被再次執行
  override def open(parameters: Configuration): Unit = {
    //keyState的TTL策略
    val ttlConfig = StateTtlConfig
      //keyState的超時時間爲10秒
      .newBuilder(Time.seconds(10))
      //當建立和更新時,從新計時超時時間
      .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
      //失敗時不返回keyState的值
      .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
      //失敗時返回keyState的值
//      .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
      //ttl的時間處理等級目前只支持ProcessingTime
      .setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)
      .build
    //從runtimeContext中得到ck時保存的狀態
    val descriptor = new ValueStateDescriptor[(String, Int)]("xxdValueState", TypeInformation.of(new TypeHint[(String, Int)] {}))
    descriptor.enableTimeToLive(ttlConfig)
    sum = getRuntimeContext.getState(descriptor)
  }
}

3、SavePoint

概念:

  • savepoint能夠理解爲是一種特殊的checkpoint,savepoint就是指向checkpoint的一個指針,實際上也是 使用經過checkpointing機制建立的streaming job的一致性快照,能夠保存數據源的offset、並行操做狀態 也就是流處理過程當中的狀態歷史版本。須要手動觸發,並且不會過時,不會被覆蓋,除非手動刪除。正常 狀況下的線上環境是不須要設置savepoint的。除非對job或集羣作出重大改動的時候, 須要進行測試運 行。
  • 能夠從應用在過去的任意作了savepoint的時刻開始繼續消費,具備能夠replay的功能

Savepoint由兩部分組成:

  • 數據目錄:穩定存儲上的目錄,里面的二進制文件是streaming job狀態的快照
  • 元數據文件:指向數據目錄中屬於當前Savepoint的數據文件的指針(絕對路徑)

與Checkpoint的區別:

  • Savepoint至關於備份(類比數據庫備份)、Checkpoint至關於recovery log
  • Checkpoint是Flink自動建立的"recovery log"用於故障自動恢復,由Flink建立,不須要用戶交互。用戶 cancel做業時就刪除,除非啓動了保留機制(External Checkpoint)
  • Savepoint由用戶建立,擁有和刪除,保存點在做業終止後仍然存在。

做用:

  • job開發新版本(更改job graph、更改並行度等等),應用從新發布
  • Flink版本的更新
  • 業務遷移,集羣須要遷移,不允許數據丟失
相關文章
相關標籤/搜索