「Flink」Flink的狀態管理與容錯

在Flink中的每一個函數和運算符都是有狀態的。在處理過程當中能夠用狀態來存儲數據,這樣能夠利用狀態來構建複雜操做。爲了讓狀態容錯,Flink須要設置checkpoint狀態。Flink程序是經過checkpoint來保證容錯,經過checkpoint機制,Flink可恢復做業的狀態和計算位置。html

checkpoint檢查點

前提條件

Flink的checkpoin機制須要與流和狀態的持久化存儲交互,通常它要求:redis

  • 一個持久化的數據源
    • 當Flink程序出現問題時,能夠經過checkpoint持久化存儲中恢復,而後從出錯的地方開始從新消費數據
    • 該數據源能夠在必定時間內重跑數據,例如:Kafka、RabbitMQ或者文件系統HDFS、S三、…
  • 狀態的持久存儲
    • 狀態須要永久的保存下來,一般是分佈式文件系統(例如:HDFS、S三、GFS、…)

啓用和配置檢查點

默認狀況,Flink是禁用檢查點。要啓用檢查點,調用數據庫

// 啓用檢查點
// 單位:毫秒
env.enableCheckpointing(1000);

在啓用檢查點時,還能夠配置檢查點的其餘參數。apache

  • exactly-one or at-least-once(僅一次或者至少一次)
    • 大多數程序都是設置爲exactly-once,只有在某些超低延遲的應用(例如:始終要求是毫秒級的應用)
    • 經過查看源碼,咱們看到,Flink默認是 exactly-once
      • public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;
  • checkpoint timeout(檢查點超時時間)
    • 檢查點超過規定的時間就會自動終止
  • minimum time between checkpoints
    • 檢查點之間的最小時間
    • 下一個檢查點將在上一個檢查點完成後5秒鐘啓動
    • 檢查點最小間隔時間不會受檢查點間隔更容易配置
  • number of concureent checkpoint
    • 檢查點的併發數目。默認狀況一個檢查點在運行時不會觸發另外一個檢查點,這樣能夠確保Flink不會花太多時間在checkpoint上,並確保流能夠有效進行。
    • 能夠設置多個重疊的checkpoint,這對允許有必定延遲,並但願較頻繁的檢查(100ms)來從新處理故障是有用的
  • externalized checkpoint
    • 外部檢查點
    • 能夠將檢查點設置爲外部持久化,這樣檢查點的元數據將寫入持久存儲,而且但做業運行失敗是不會自動清理
    • 這樣能夠作雙重保險
  • fail/continue task on checkpoint errors
    • 檢查點執行發生錯誤,是否執行任務。
    • 默認狀況,若是checkpoint失敗,任務也將失敗
  • perfer checkpoint for recovery
    • 即時最近有更多的savepoint可用於恢復,flink依然會選擇使用最近一次的checkpoint來進行錯誤恢復

參考配置:後端

        // --------
        // 配置checkpoint
        // 啓用檢查點
        env.enableCheckpointing(1000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

選擇狀態後端存儲

Flink的checkpoint機制能夠存儲計時器和有狀態operation的全部快照,包括:鏈接器、窗口或者用戶自定義狀態。具體checkpoint存儲在哪兒(例如:是JobManager內存、文件系統或者數據庫),依賴於狀態後端的配置。數據結構

默認狀況,狀態保存在TaskManager的內存中,檢查點存儲在TM的內存中。爲了適當地保存大狀態,Flink支持其餘的存儲。咱們能夠經過:併發

StreamExecutionEnvironment.setStateBackend(…)機器學習

來指定存儲方式分佈式

Flink狀態管理

狀態的應用場景:函數

  • 當應用程序想要按照某種模式搜索某些事件時,狀態能夠保存迄今全部的事件序列
  • 當每分鐘/小時/天須要對流數據進行聚合,狀態能夠保存掛起的聚合
  • 當在數據流上訓練機器學習模型時,狀態能夠用來保存某一類參數的版本
  • 當須要管理歷史數據時,狀態容許訪問過去歷史數據

Flink狀態能夠保存在堆內、或者是堆外。Flink也能夠管理應用程序的狀態,必要時也能夠溢出到磁盤,若是應用要保持很是大的狀態,能夠不修改程序邏輯狀況下配置狀態後端存儲。

Flink狀態分類

Flink中有兩種基本的狀態:

  • Keyed State
  • Operator State

Keyed State

Keyed State一般和key相關,僅僅在KeyedStream的方法和算子中使用。能夠把 Keyed State看做是分區,並且每個key僅出如今一個分區內。邏輯上每一個 keyed-state和惟一元組<算子併發實例, key>綁定,因爲每一個key僅屬於算子的一個併發,所以能夠簡化爲<算子, key>

Operator State

對於 Operator State來講,每一個Operator State和一個併發實例綁定。Kafka connector是Flink中使用operator state的一個很好的示例。每一個Kafka消費者的併發在Operator State中維護一個 topic partition到offset的映射關係。

Operator state在Flink做業的併發改變後,會從新分發狀態,分發的策略和keyed stated不同。

Raw State與Managed State

Keyed Stated和Operator State分別有兩種形式:managed 和 raw

Managed State是由Flink運行時管理的數據結構來表示的,例如:內部的Hash Table或者RocksDB。例如:ValueState、ListState等。Flink運行時會對這些狀態進行編碼並寫入Checkpoint。

Raw State則保存在本身的數據結構中。checkpoint的時候,Flink並不知道狀態裏面具體的內容,僅僅寫入一串字節序列到checkpoint中。

全部的DataStream的function均可以使用managed state,但raw state只能在實現算子時使用。因爲Flink能夠在修改併發時更好的分發狀態數據,而且可以更好的管理內存,由於講義使用 managed state.

使用Managed Keyed State

Managed keyed state接口提供不一樣類型的狀態訪問接口,這些狀態都做用在當前輸入數據的key下。這些狀態僅可在KeyedStream上使用,能夠經過 stream.keyBy(…)獲得KeyedStream。

全部支持的狀態類型以下:

  • ValueState<T>
    • 保存一個能夠更新和獲取的值,算子接收到的每一個key均可能對應一個值
    • 能夠經過update(T)進行更新,經過value()獲取
  • ListState<T>
    • 保存一個元素的列表,能夠往這個列表中追加數據,並在當前列表上檢索
    • 能夠經過 add(T)或者addAll(List<T>)進行追加元素
    • 經過get()獲取整個列表
    • 經過 update(List<T>)覆蓋當前列表
  • ReducingState<T>
    • 保存一個單值,表示添加到狀態的全部值的聚合。接口與ListState相似
  • AggregatingState<IN, OUT>
    • 保存一個單值,表示添加到狀態的全部值的聚合
    • 與ReducingState相反的是,聚合類型可能與添加到狀態的元素類型不一樣。接口與ListState相似
  • FoldingState<T, ACC>(後續將過時)
    • 保存一個單值,白搜狐添加到狀態的全部值的集合
    • 與ReducingState相反的是,聚合類型可能與添加到狀態的元素類型不一樣。接口與ListState相似
  • MapState<UK, UV>
    • 維護一個映射列表,能夠添加鍵值到狀態中,能夠獲取當前映射的迭代器
    • 使用put、putAll添加映射,使用 get檢索特定key

注意:

  • 這些狀態對象僅用於狀態交互。狀態自己不必定存儲在內存中,還有可能保存在磁盤或者其餘位置
  • 從狀態中獲取的值取決於輸入元素說表明的key,所以,在不一樣key上調用同一個接口,可能獲得不一樣的值

使用Managed Operator State

能夠經過實現 CheckpointedFunction 或者 ListCheckpointed<T extends Serialized>接口來使用Managed Operator State。

CheckpointedFunction接口:

void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;

在Flink進行checkpoint時,會調用snapshotstate(),用戶自定義函數初始化時會調用 initializeState。初始化包括第一次自定義函數初始化和從以前的 checkpoint 回覆。所以,initializeState 中應該也包括狀態恢復的邏輯。

Managed Operator State以list的形式存在,這些狀態是一個可序列化對象的集合List,彼此獨立,方便在改變併發後進行狀態的從新分派。換句話說,這些對象是從新分配 non-keyed state的最細粒度。根據狀態的不一樣訪問方式,有如下兩種分配模式:

  • Even-split redistribution
    • 每一個算子都存儲一個列表形式的狀態集合,整個狀態由全部的列表拼接而成
    • 但做業恢復或者從新分配時,整個狀態按照算子的並行度均勻分配
  • Union redistribution
    • 每一個算子保存一個列表形式的狀態集合,整個狀態由全部的列表拼接而成
    • 但做業恢復或者從新分配時,每一個算子都將得到全部的狀態數據

ListCheckpointed接口:

ListCheckpointed接口是CheckpointedFunction接口的精簡版,僅支持 even-split redistribution的list state

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;

snapshotState()須要返回一個將寫入到checkpoint的對象列表, restoreState則須要處理恢復回來的對象列表。


參考文獻:

Flink官方文檔:

https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/stream/state/checkpointing.html

https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/ops/state/checkpoints.html

https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/stream/state/state.html

相關文章
相關標籤/搜索