在Flink中的每一個函數和運算符都是有狀態的。在處理過程當中能夠用狀態來存儲數據,這樣能夠利用狀態來構建複雜操做。爲了讓狀態容錯,Flink須要設置checkpoint狀態。Flink程序是經過checkpoint來保證容錯,經過checkpoint機制,Flink可恢復做業的狀態和計算位置。html
Flink的checkpoin機制須要與流和狀態的持久化存儲交互,通常它要求:redis
默認狀況,Flink是禁用檢查點。要啓用檢查點,調用數據庫
// 啓用檢查點// 單位:毫秒
env.enableCheckpointing(1000);
在啓用檢查點時,還能夠配置檢查點的其餘參數。apache
public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;
參考配置:後端
// -------- // 配置checkpoint // 啓用檢查點 env.enableCheckpointing(1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
Flink的checkpoint機制能夠存儲計時器和有狀態operation的全部快照,包括:鏈接器、窗口或者用戶自定義狀態。具體checkpoint存儲在哪兒(例如:是JobManager內存、文件系統或者數據庫),依賴於狀態後端的配置。數據結構
默認狀況,狀態保存在TaskManager的內存中,檢查點存儲在TM的內存中。爲了適當地保存大狀態,Flink支持其餘的存儲。咱們能夠經過:併發
StreamExecutionEnvironment.setStateBackend(…)機器學習
來指定存儲方式分佈式
狀態的應用場景:函數
Flink狀態能夠保存在堆內、或者是堆外。Flink也能夠管理應用程序的狀態,必要時也能夠溢出到磁盤,若是應用要保持很是大的狀態,能夠不修改程序邏輯狀況下配置狀態後端存儲。
Flink中有兩種基本的狀態:
Keyed State
Keyed State一般和key相關,僅僅在KeyedStream的方法和算子中使用。能夠把 Keyed State看做是分區,並且每個key僅出如今一個分區內。邏輯上每一個 keyed-state和惟一元組<算子併發實例, key>綁定,因爲每一個key僅屬於算子的一個併發,所以能夠簡化爲<算子, key>
Operator State
對於 Operator State來講,每一個Operator State和一個併發實例綁定。Kafka connector是Flink中使用operator state的一個很好的示例。每一個Kafka消費者的併發在Operator State中維護一個 topic partition到offset的映射關係。
Operator state在Flink做業的併發改變後,會從新分發狀態,分發的策略和keyed stated不同。
Raw State與Managed State
Keyed Stated和Operator State分別有兩種形式:managed 和 raw
Managed State是由Flink運行時管理的數據結構來表示的,例如:內部的Hash Table或者RocksDB。例如:ValueState、ListState等。Flink運行時會對這些狀態進行編碼並寫入Checkpoint。
Raw State則保存在本身的數據結構中。checkpoint的時候,Flink並不知道狀態裏面具體的內容,僅僅寫入一串字節序列到checkpoint中。
全部的DataStream的function均可以使用managed state,但raw state只能在實現算子時使用。因爲Flink能夠在修改併發時更好的分發狀態數據,而且可以更好的管理內存,由於講義使用 managed state.
Managed keyed state接口提供不一樣類型的狀態訪問接口,這些狀態都做用在當前輸入數據的key下。這些狀態僅可在KeyedStream上使用,能夠經過 stream.keyBy(…)獲得KeyedStream。
全部支持的狀態類型以下:
注意:
- 這些狀態對象僅用於狀態交互。狀態自己不必定存儲在內存中,還有可能保存在磁盤或者其餘位置
- 從狀態中獲取的值取決於輸入元素說表明的key,所以,在不一樣key上調用同一個接口,可能獲得不一樣的值
能夠經過實現 CheckpointedFunction 或者 ListCheckpointed<T extends Serialized>接口來使用Managed Operator State。
CheckpointedFunction接口:
void snapshotState(FunctionSnapshotContext context) throws Exception;void initializeState(FunctionInitializationContext context) throws Exception;
在Flink進行checkpoint時,會調用snapshotstate(),用戶自定義函數初始化時會調用 initializeState。初始化包括第一次自定義函數初始化和從以前的 checkpoint 回覆。所以,initializeState 中應該也包括狀態恢復的邏輯。
Managed Operator State以list的形式存在,這些狀態是一個可序列化對象的集合List,彼此獨立,方便在改變併發後進行狀態的從新分派。換句話說,這些對象是從新分配 non-keyed state的最細粒度。根據狀態的不一樣訪問方式,有如下兩種分配模式:
ListCheckpointed接口:
ListCheckpointed接口是CheckpointedFunction接口的精簡版,僅支持 even-split redistribution的list state
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;void restoreState(List<T> state) throws Exception;
snapshotState()須要返回一個將寫入到checkpoint的對象列表, restoreState則須要處理恢復回來的對象列表。
參考文獻:
Flink官方文檔:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/stream/state/checkpointing.html
https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/ops/state/checkpoints.html
https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/stream/state/state.html