這裏將介紹Flink對有狀態計算的支持,其中包括狀態計算和無狀態計算的區別,以及在Flink中支持的不一樣狀態類型,分別有 Keyed State 和 Operator State 。另外針對狀態數據的持久化,以及整個 Flink 任務的數據一致性保證,Flink 提供了 Checkpoint 機制處理和持久化狀態結果數據,隨後對狀態數據 Flink 提供了不一樣的狀態管理器來管理狀態數據,例如: MemoryStateBackend 等。
java
在Flink架構體系中,有狀態計算能夠說是Flink很是重要的特徵之一。有狀態計算是指在程序計算過程當中,在Flink程序內部,存儲計算產生的中間結果,並提供給Functions 或 孫子計算結果使用。如圖所示:
狀態數據能夠維繫在本地存儲中,這裏的存儲能夠是 Flink 的堆內存或者堆外內存,也能夠藉助第三方的存儲介質,例如:Flink中已經實現的RocksDB,固然用戶也能夠本身實現相應的緩存系統去存儲狀態信息,以完成更加複雜的計算邏輯。和狀態計算不一樣的是,無狀態計算不會存儲計算過程當中產生的結果,也不會將結果用於下一步計算過程當中,程序只會在當前的計算流程中實行計算,計算完成就輸出結果,而後下一條數據接入,而後處理。
無狀態計算實現的複雜度相對較低,實現起來比較容易,可是沒法完成提到的比較複雜的業務場景,例如:redis
在 Flink 中根據數據集是否根據 Key 進行分區,將狀態分爲 Keyed State 和 Operator State(Non-Keyed State) 兩種類型。shell
Keyed State
表示和key相關的一種state ,只能用於 KeyedStream 類型數據集對應的Functions和Operators之上。Keyed State 是 Operator State 的特例,區別在於 Keyed State 事先按照 key 對數據集進行了分區,每一個 Key State 僅對應一個 Operator 和 Key 的組合。 Keyed State 能夠經過 Key Group 進行管理,主要用於當算子並行度發生變化時,自動從新分佈 Keyed State 數據。後端
Operator State
與 Keyed State 不一樣的是,Operator State 只和並行的算子實例綁定,和數據元素中的 Key 無關,每一個算子實例中持有全部數據元素中的一部分狀態數據。 Operator State 支持當算子實例並行度發生變化時自動從新分配狀態數據。緩存
同時在Flink中 Keyed State 和 Operator State 均具備兩種形式,其中一種爲託管狀態(Managered State)形式,由Flink Runtime 中控制和管理狀態數據,並將狀態數據轉換稱爲內存Hash tables 或 Recks DB 的對象存儲,而後將這些狀態數據經過內部接口持久化到 Checkpoints 中,任務異常時能夠經過這些狀態數據恢復任務。另一種是原生狀態(Row State)形式,由算子本身管理數據結構,當觸發 Checkpoints 過程當中,Flink並不知道狀態數據內部的數據結構,只是將數據轉換成 bytes 數據存儲在 Checkpoints 中,當從 Checkpoints 恢復任務時,算子本身在反序列化出狀態的數據結構。
Notes: Flink中推薦用戶使用 Managered State 管理狀態數據,主要緣由是:Manager State 可以更好的支持狀態數據的重平衡以及更加完善的內存管理。數據結構
Flink 有如下Managered Keyed State 類型可使用,每種狀態都有相應的的使用場景,用戶能夠根據實際需求選擇使用。架構
ValueState[T]
: 與 Key 對應單個值的狀態,例如統計 user_id 對應的交易次數,每次用戶交易都會在 count 狀態值上進行更新。 ValueState 對應的更新方法是 update(T)
, 取值是 T value()
;ListState[T]
: 與 Key 對應元素列表的狀態,狀態中存放元素的 List 列表。例如定義 ListValue存儲用戶常常訪問的 IP 地址。在 ListState 中添加元素使用 add(T) , addAll(List[T])
兩個方法。獲取元素使用 Iterable<T> get()
方法,更新元素使用 update(List[T])
方法;ReducingState[T]
: 定義與 Key 相關的數據元素單個聚合值的狀態,用戶存儲通過指定 ReduceFunction 計算以後的指標,所以,ReduceState 須要指定ReduceFunction 完成狀態數據的聚合。ReducingState 添加元素使用 add(T)
方法,獲取元素使用 T get()
;AggregeateState[IN,OUT]
: 定義 與key相關的數據元素單個聚合值的狀態,用於維護數據通過指定 AggregateFunction 計算以後的指標。和ReducingState相比,AggregeateState 的輸入輸出類型不必定相同,但ReducingState 輸入/出 類型必須保持一致。和ListState類似,AggregatingState 須要指定AggregateFunction完成狀態數據的聚合操做。AggregatringState添加元素使用 add(IN)
方法, 獲取元素使用 OUT get()
方法;MapState<UK, UV>
:這會保留一個映射列表。您能夠將鍵值對放入狀態並檢索Iterable全部當前存儲的映射。使用put(UK, UV)
或 添加映射putAll(Map[UK,UV])
(Map<UK, UV>)。可使用來檢索與用戶鍵關聯的值get(UK)
。對於映射,鍵和值可迭代視圖可使用被檢索entries()
,keys()
並values()
分別。Stateful Function定義
示例:
在RichFlatMapFunction 中定義 ValueState,已完成最小值的獲取:框架
inputStream.keyBy(_._1).flatMap( // (String,Long,Int) 輸入類型 // (String,Long,Long) 輸出類型 new RichFlatMapFunction[(Int,Long) , (Int,Long,Long)] { private var leastValueState:ValueState[Long] = _ // 定義狀態名稱 private var leastValueStateDesc:ValueStateDescriptor[Long] = _ override def open(parameters: Configuration): Unit = { // 指定狀態類型 leastValueStateDesc = new ValueStateDescriptor[Long]("leastValueState" , classOf[Long]) // 經過 getRuntimeContext.getState 拿到狀態 leastValueState = getRuntimeContext.getState(leastValueStateDesc) } override def flatMap(value: (Int, Long), out: Collector[(Int, Long, Long)]): Unit = { // 經過 value 拿到最小值 val leastValue: Long = leastValueState.value() // 若是前一個指標大於最小值,則直接輸出數據元素和最小值 if ( leastValue != 0L && value._2 > leastValue){ out.collect((value._1 , value._2 , leastValue)) }else{ // 若是當前指標小於最小值,則更新狀態中的最小值 leastValueState.update(value._2) // 將當前數據中的指標做爲最小值輸出 out.collect(value._1 , value._2 , value._2) } } }).print()
State生命週期
對於任何類型 Keyed State 均可以設定狀態生命週期(TTL),以確保可以在規定時間內即時清理狀態數據。狀態生命週期功能可經過 StateTtlConfig 配置而後將 StateTtlConfig 配置傳入StateDescriptor 中的 enableTimeToLive 方法中便可。Keyed State 配置實例以下所示:運維
val config: StateTtlConfig = StateTtlConfig // 指定TTL時長爲 5s .newBuilder(Time.seconds(5)) // 指定TTL 刷新只對建立和寫入操做有效 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 指定狀態可見性不返回過時數據 .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build() leastValueStateDesc.enableTimeToLive(config)
在StateTtlConfig中除了經過 newBuilder() 方法中設定過時時間的參數是必須的以外,其餘的參數都是可選的或使用默認值。其中 setUpdateType方法中傳入的類型有兩種:機器學習
另外,能夠經過 setStateVisibility 方法設定狀態的可見性,根據過時數據是否被清理來肯定是否返回狀態數據:
Scala DataStream API中使用狀態
直接上代碼片斷:
inputStream.keyBy(_._1) // 指定輸入參數類型和狀態參數類型 .mapWithState((in:(Int,Long) , count : Option[Int]) => // 判斷count 類型是否非空 count match { // 輸出 key , count 並在原來 count 數據上累加 case Some(c) => ((in._1 , c) , Some(c + in._2)) // 若是狀態爲空,則將指標填入 case None => ((in._1 , 0) , Some(in._2)) } )
Operator State 是一種 non-keyed-state ,與並行的操做算子實例相關聯,例如在 Kafka Connector 中,每一個 Kafka 消費端算子實例都對應到 Kafka 的一個分區中,維護Topic分區和 Offsets 偏移量做爲算子的 Operator State. 在Flink中能夠實現 CheckpointedFunction
或者 ListCheckpoint<T extends Serializable>
兩個接口來定義操做 Managered Operator State 的函數。
經過 CheckpointedFunction 接口操做Operator State
CheckpointedFunction 接口定義如圖:
@PublicEvolving @SuppressWarnings("deprecation") public interface CheckpointedFunction { /** * This method is called when a snapshot for a checkpoint is requested. This acts as a hook to the function to * ensure that all state is exposed by means previously offered through {@link FunctionInitializationContext} when * the Function was initialized, or offered now by {@link FunctionSnapshotContext} itself. * * @param context the context for drawing a snapshot of the operator * @throws Exception */ void snapshotState(FunctionSnapshotContext context) throws Exception; /** * This method is called when the parallel function instance is created during distributed * execution. Functions typically set up their state storing data structures in this method. * * @param context the context for initializing the operator * @throws Exception */ void initializeState(FunctionInitializationContext context) throws Exception; }
在每一個獨立的算子中,Managered Operator State 都是以 List 形式存儲的,算子和算子之間的狀態數據相互獨立,List存儲比較適合於狀態數據的從新分佈,Flink目前支持Manager Operator State 兩種重要分佈策略,分別是 Event-split Redistribution 和 Union Redistribution。
/** * @title CheckpointCount * @description 實現 CheckpointFunction 接口利用Operator State 統計輸入到算子的數據量 * @author Mr.Sun * @version v.1.0 * @date 2019/12/24 9:16 */ class CheckpointCount(val numElements: Int) extends FlatMapFunction[(Int, Long), (Int, Long, Long)] with CheckpointedFunction { // 定義算子實例本地變量,存儲Operator數據數量 private var operatorCount: Long = _ // 定義 keyedState ,存儲和 key 相關的狀態值 private var keyedState: ValueState[Long] = _ // 定義 operatorState , 存儲算子的狀態值 private var operatorState: ListState[Long] = _ override def flatMap(value: (Int, Long), out: Collector[(Int, Long, Long)]): Unit = { val keyedCount: Long = keyedState.value() // 更新 keyedState 數量 keyedState.update(keyedCount) // 更新本地的算子 operatorCount operatorCount = operatorCount + 1 // 輸出結果,包括 id , id 對應的的數量統計 keyedCount ,算子輸入數據的數量統計 operatorCount out.collect(value._1, keyedCount, operatorCount) } // 當發生了 snapshotState , 將 operatorCount 添加到 operatorState 中 override def snapshotState(context: FunctionSnapshotContext): Unit = { operatorState.clear() operatorState.add(operatorCount) } // 初始化狀態數據 override def initializeState(context: FunctionInitializationContext): Unit = { // 定義並獲取 keyedState keyedState = context.getKeyedStateStore.getState(new ValueStateDescriptor[Long]("keye-state", classOf[Long])) // 定義並獲取 operatorState operatorState = context.getOperatorStateStore.getListState(new ListStateDescriptor[Long]("operator-state", classOf[Long])) // 定義在 Restored 過程當中, 從 operatorState 中恢復數據的邏輯 if (context.isRestored){ val value: util.Iterator[Long] = operatorState.get().iterator() while (value.hasNext){ operatorCount += value.next() } } } }
經過 ListCheckpointed接口定義 Operator State
/** * @title NumberRecordsCount * @description 實現 ListCheckpoint接口利用Operator State 統計算子輸入數據數量 * @author Mr.Sun * @version v.1.0 * @date 2019/12/24 10:14 */ class NumberRecordsCount extends FlatMapFunction[(String, Long), (String, Long)] with ListCheckpointed[Long] { // 定義算子中接入的 numberRecords 數量 private var numberRecords: Long = 0L override def flatMap(value: (String, Long), out: Collector[(String, Long)]): Unit = { // 介入一條計算規則進行統計,並輸出 numberRecords += 1 out.collect(value._1, numberRecords) } override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] = { Collections.singletonList(numberRecords) } override def restoreState(state: util.List[Long]): Unit = { numberRecords = 0L for (count <- state) { // 從恢復狀態中 恢復 numberRecords numberRecords += count } } }
Flink 中基於異步輕量級的分佈式快照技術提供了 Checkpoints 容錯機制,分佈式快找能夠將同一時間點 Task / Operator 的狀態數據全局統一快照處理,包括前面提到的Keyed State 和 Operator State . Flink 會在輸入的數據集上間隔性的生成checkpoint barrier ,經過柵欄(barrier)將間隔時間段內的數據劃分到相應的checkpoint 中,當應用出現異常時,Operator 就可以從上一次快照中恢復全部算子以前的狀態,從而保證數據的一致性。
舉個栗子:在 KafkaConsumer 算子維護 Offset 狀態,當系統出現問題沒法從 Kafka 中消費數據時,能夠將 Offset 記錄在狀態中,當系統出現問題,沒法從Kafka消費數據時,能夠將 Offset 記錄在狀態中,當任務從新恢復時就可以指定偏移量消費數據。
Checkpoint 過程當中狀態數據通常會被保存在一個可配置的環境中,一般在 JobManager節點或者HDFS上。
開啓檢查點而且指定檢查點時間間隔爲 1000ms ,根據實際狀況自行選擇,若是狀態比較大,則建議適當增長該值;
environment.enableCheckpointing(1000)
能夠選擇 exactly-once 語義保證整個應用內 端到端 的數據一致性,這種狀況比較適合數據要求高,不容許出現數據丟失或重複,與此同時,Flink 的性能也相對較弱,而 at-least-once 語義更適合於時延和吞吐要求很是高但對數據一致性要求不高的場景。
environment.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
超時時間制定了每次Checkpoint 執行過程當中的上限時間範圍,一旦 Checkpoint 執行時間超過該閾值,Flink 將會中斷Checkpoint 過程,並按照超時處理。該指標能夠經過 setCheckpointTimeout 方法設定,默認 10 分鐘
environment.getCheckpointConfig.setCheckpointTimeout(60000)
該參數主要目的是設定兩個Checkpoint 之間最小時間間隔,防止出現例如狀態數據過大致使Checkpoint 執行時間過長,致使 Checkpoint 積壓過多,最終Flink 應用密集地觸發 Checkpoint 操做,會佔用大量計算資源而影響到整個應用的性能
environment.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
經過 setMaxCurrentCheckpoint()方法設定可以最大同時執行的 Checkpoint 數量。在默認狀況下只有一個檢查點能夠運行,根據用戶指定的數量能夠同時觸發多個Checkpoint,進而提高Checkpoint總體的效率.
environment.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
設定週期性的外部檢查點,而後將狀態數據持久化到外部系統中,使用這種方式不會在任務正常中止的過程當中清理檢查點數據,而是會一直保持在外部系統介質中,另外也能夠經過從外部檢查點中對任務進行恢復.
environment.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
ailOnCheckpointingErrors 參數決定了當Checkpoint執行過程當中若是出現失敗或者錯誤時,任務是否同時被關閉,默認值爲True
environment.getCheckpointConfig.setFailOnCheckpointingErrors(false) // 上述的方式已經被棄用了,使用下面的方式 val number: Int = environment.getCheckpointConfig.getTolerableCheckpointFailureNumber environment.getCheckpointConfig.setTolerableCheckpointFailureNumber(number)
Savepoints 是檢查點的一種特殊實現,底層實現其實也是使用Checkpoints的機制。Savepoints是用戶以手工命令的方式觸發Checkpoint,並將結果持久化到指定的存儲路徑中,其主要目的是幫助用戶在升級和維護集羣過程當中保存系統中的狀態數據,避免由於停機運維或者升級應用等正常終止應用的操做而致使系統沒法恢復到原有的計算狀態的狀況,從而沒法實現從端到端的 Excatly-Once 語義保證。
當使用 Savepoints 對整個集羣進行升級或運維操做的時候,須要中止整個 Flink 應用程序,此時用戶可能會對應用的代碼邏輯進行修改,即時 Flink 可以經過 Savepoint 將應用中的狀態數據同步到磁盤而後恢復任務,但因爲代碼邏輯發生了變化,在升級過程當中有可能致使算子的狀態沒法經過 Savepoints 中的數據恢復的狀況,在這種狀況下就須要經過惟一的 ID 標記算子。在Flink中默認支持自動生成 Operator ID, 可是這種方式不利於對代碼層面的維護和升級,建議用戶儘量使用手工方式對算子進行惟一 ID 標記, ID 的應用範圍在每一個算子內部,具體的使用方式以下:
environment.addSource(new SourceFunction[] {}) .uid("source-id") .shuffle() .map(new MapFunction[] {}) .uid("map-id") .print()
Savepoint 操做能夠經過命令行的方式進行觸發,命令行提供了取消任務,從Savepoints中恢復任務,撤銷 Savepoints 等操做,在 Flink1.2 中之後也能夠經過FlinkWeb頁面從 Savepoints中恢復應用。
手動觸發 Savepoints
bin/flink savepoint :jobId [:targetDirectory] bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
取消任務並處觸發Savepoints
bin/flink cancel -s [:targetDirectory] :jobId
經過Savepoints中恢復任務
bin/flink run -s :savepointPath [:runArgs]
釋放Savepoints數據
bin/flink savepoint -d :savepointPath
經過 --dispose (-d) 命令釋放已經存儲的 Savepoint 數據,這樣存儲在指定路徑中的 savepointPath 將會被清理掉
TargetDirectory配置
state.savepoints.dir: hdfs:///flink/savepoints
# 查看 TargetDirectory 文件目錄 hdfs dfs -ls /flink/flink-savepoints/savepoint-11bbc5-bd967f90709b
在Flink 中提供了 StateBackend 來存儲和管理 Checkpoints 過程當中的狀態數據。
Flink中一共實現了三種類型的狀態管理器,包括基於內存的MemoryStateBackend
、基於文件系統的 FsStateBackend
, 以及基於 RockDB 做爲存儲介質的 RockDBStateBackend
.
基於內存的狀態管理器將狀態數據所有存儲在JVM堆內存中,包括用戶在使用 DataStream API 中建立 Key/Value State,窗口中緩存的狀態數據,以及觸發器等數據基於內存的狀態管理器具備很是快速和高校的特色,但也有很是多的限制,最主要的就是內存的容量限制,一旦存儲的狀態數據過多就會致使系統內存溢出,從而影響整個應用的正常運行。同時若是機器出現問題,整個主機內存中的狀態數據都會丟失,進而沒法恢復任務中得玩狀態數據。所以這個玩意,避免使用。
Flink 將MemoryStateBackend 做爲默認的狀態後端管理器,也能夠經過以下參數配置初始化 MemoryStateBackend , 其中 "MAX_MEN_STATE_SIZE" 指定每一個狀態值的內存使用大小。
new MemoryStateBackend(MAX_MEN_STATE_SIZE , false)
在Flink 中 MemoryStateBackend 具備以下特色:
important MemoryStateBackend 比較適合測試環境,並用於本地調試和驗證,不建議在生產環境中使用。
與MemoryStateBackend 有所不一樣,FsStateBackend 是基於文件系統的一種狀態管理器在,這裏的文件系統能夠是本地文件系統,也能夠是HDFS分佈式文件系統。
new FsStateBackend(path , false)
FsStateBackend 的 Boolean 參數類型指定是否以同步的方式記錄狀態數據,默認採用異步方式。異步方式能夠儘量避免在Checkpoint過程當中影響流式計算任務
RockDBStateBackend 是Flink 中內置的第三方狀態管理器,和前面的狀態管理器不一樣,RocksDBStateBackend 須要單獨引入相關的依賴包到工程中,經過初始化 RockDBStateBackend 類,使能夠獲得 RockDBStateBackend 實例類。 RocksDBStateBackend 採用異步的方式進行狀態數據的 Snapshot ,任務中的狀態數據首先被寫入 RockDB中,而後再異步的將狀態數據寫入文件系統中,這樣RockDB僅會存儲在正在進行的計算的數據,對於長時間才更新的數據則寫入磁盤中進行存儲,而對於體量比較小的元數據狀態,則存儲在 JobManager 內存中。 與 FsStateBackend 相比,RockDBStateBackend性能更高,主要是由於藉助了 RockDB 存儲了最新最熱的數據,而後經過異步的方式在同步到文件系統中。