Apache Flink 零基礎入門教程(六):狀態管理及容錯機制

做者:孫夢瑤 整理:韓非java

本文主要分享內容以下:git

  • 狀態管理的基本概念;
  • 狀態的類型與使用示例;
  • 容錯機制與故障恢復;

一.狀態管理的基本概念

1.什麼是狀態

1

首先舉一個無狀態計算的例子:消費延遲計算。假設如今有一個消息隊列,消息隊列中有一個生產者持續往消費隊列寫入消息,多個消費者分別從消息隊列中讀取消息。從圖上能夠看出,生產者已經寫入 16 條消息,Offset 停留在 15 ;有 3 個消費者,有的消費快,而有的消費慢。消費快的已經消費了 13 條數據,消費者慢的才消費了 七、8 條數據。github

如何實時統計每一個消費者落後多少條數據,如圖給出了輸入輸出的示例。能夠了解到輸入的時間點有一個時間戳,生產者將消息寫到了某個時間點的位置,每一個消費者同一時間點分別讀到了什麼位置。剛纔也提到了生產者寫入了 15 條,消費者分別讀取了 十、七、12 條。那麼問題來了,怎麼將生產者、消費者的進度轉換爲右側示意圖信息呢?apache

consumer 0 落後了 5 條,consumer 1 落後了 8 條,consumer 2 落後了 3 條,根據 Flink 的原理,此處需進行 Map 操做。Map 首先把消息讀取進來,而後分別相減,便可知道每一個 consumer 分別落後了幾條。Map 一直往下發,則會得出最終結果。api

你們會發現,在這種模式的計算中,不管這條輸入進來多少次,輸出的結果都是同樣的,由於單條輸入中已經包含了所需的全部信息。消費落後等於生產者減去消費者。生產者的消費在單條數據中能夠獲得,消費者的數據也能夠在單條數據中獲得,因此相同輸入能夠獲得相同輸出,這就是一個無狀態的計算。數組

2

相應的什麼是有狀態的計算?性能優化

以訪問日誌統計量的例子進行說明,好比當前拿到一個 Nginx 訪問日誌,一條日誌表示一個請求,記錄該請求從哪裏來,訪問的哪一個地址,須要實時統計每一個地址總共被訪問了多少次,也即每一個 API 被調用了多少次。能夠看到下面簡化的輸入和輸出,輸入第一條是在某個時間點請求 GET 了 /api/a;第二條日誌記錄了某個時間點 Post /api/b ;第三條是在某個時間點 GET了一個 /api/a,總共有 3 個 Nginx 日誌。從這 3 條 Nginx 日誌能夠看出,第一條進來輸出 /api/a 被訪問了一次,第二條進來輸出 /api/b 被訪問了一次,緊接着又進來一條訪問 api/a,因此 api/a 被訪問了 2 次。不一樣的是,兩條 /api/a 的 Nginx 日誌進來的數據是同樣的,但輸出的時候結果可能不一樣,第一次輸出 count=1 ,第二次輸出 count=2,說明相同輸入可能獲得不一樣輸出。輸出的結果取決於當前請求的 API 地址以前累計被訪問過多少次。第一條過來累計是 0 次,count = 1,第二條過來 API 的訪問已經有一次了,因此 /api/a 訪問累計次數 count=2。單條數據其實僅包含當前此次訪問的信息,而不包含全部的信息。要獲得這個結果,還須要依賴 API 累計訪問的量,即狀態。微信

這個計算模式是將數據輸入算子中,用來進行各類複雜的計算並輸出數據。這個過程當中算子會去訪問以前存儲在裏面的狀態。另一方面,它還會把如今的數據對狀態的影響實時更新,若是輸入 200 條數據,最後輸出就是 200 條結果。網絡

3

什麼場景會用到狀態呢?下面列舉了常見的 4 種:數據結構

  • 去重:好比上游的系統數據可能會有重複,落到下游系統時但願把重複的數據都去掉。去重須要先了解哪些數據來過,哪些數據尚未來,也就是把全部的主鍵都記錄下來,當一條數據到來後,可以看到在主鍵當中是否存在。
  • 窗口計算:好比統計每分鐘 Nginx 日誌 API 被訪問了多少次。窗口是一分鐘計算一次,在窗口觸發前,如 08:00 ~ 08:01 這個窗口,前59秒的數據來了須要先放入內存,即須要把這個窗口以內的數據先保留下來,等到 8:01 時一分鐘後,再將整個窗口內觸發的數據輸出。未觸發的窗口數據也是一種狀態。
  • 機器學習/深度學習:如訓練的模型以及當前模型的參數也是一種狀態,機器學習可能每次都用有一個數據集,須要在數據集上進行學習,對模型進行一個反饋。
  • 訪問歷史數據:好比與昨天的數據進行對比,須要訪問一些歷史數據。若是每次從外部去讀,對資源的消耗可能比較大,因此也但願把這些歷史數據也放入狀態中作對比。

2.爲何要管理狀態

4

管理狀態最直接的方式就是將數據都放到內存中,這也是很常見的作法。好比在作 WordCount 時,Word 做爲輸入,Count 做爲輸出。在計算的過程當中把輸入不斷累加到 Count。

但對於流式做業有如下要求:

  • 7*24小時運行,高可靠;
  • 數據不丟不重,剛好計算一次;
  • 數據實時產出,不延遲;

基於以上要求,內存的管理就會出現一些問題。因爲內存的容量是有限制的。若是要作 24 小時的窗口計算,將 24 小時的數據都放到內存,可能會出現內存不足;另外,做業是 7*24,須要保障高可用,機器若出現故障或者宕機,須要考慮如何備份及從備份中去恢復,保證運行的做業不受影響;此外,考慮橫向擴展,假如網站的訪問量不高,統計每一個 API 訪問次數的程序能夠用單線程去運行,但若是網站訪問量忽然增長,單節點沒法處理所有訪問數據,此時須要增長几個節點進行橫向擴展,這時數據的狀態如何平均分配到新增長的節點也問題之一。所以,將數據都放到內存中,並非最合適的一種狀態管理方式。

3.理想的狀態管理

5

最理想的狀態管理須要知足易用、高效、可靠三點需求:

  • 易用,Flink 提供了豐富的數據結構、多樣的狀態組織形式以及簡潔的擴展接口,讓狀態管理更加易用;
  • 高效,實時做業通常須要更低的延遲,一旦出現故障,恢復速度也須要更快;當處理能力不夠時,能夠橫向擴展,同時在處理備份時,不影響做業自己處理性能;
  • 可靠,Flink 提供了狀態持久化,包括不丟不重的語義以及具有自動的容錯能力,好比 HA,當節點掛掉後會自動拉起,不須要人工介入。

二.Flink 狀態的類型與使用示例

1.Managed State & Raw State

6

Managed State 是 Flink 自動管理的 State,而 Raw State 是原生態 State,二者的區別以下:

  • 從狀態管理方式的方式來講,Managed State 由 Flink Runtime 管理,自動存儲,自動恢復,在內存管理上有優化;而 Raw State 須要用戶本身管理,須要本身序列化,Flink 不知道 State 中存入的數據是什麼結構,只有用戶本身知道,須要最終序列化爲可存儲的數據結構。
  • 從狀態數據結構來講,Managed State 支持已知的數據結構,如 Value、List、Map 等。而 Raw State只支持字節數組 ,全部狀態都要轉換爲二進制字節數組才能夠。
  • 從推薦使用場景來講,Managed State 大多數狀況下都可使用,而 Raw State 是當 Managed State 不夠用時,好比須要自定義 Operator 時,推薦使用 Raw State。

2.Keyed State & Operator State

7

Managed State 分爲兩種,一種是 Keyed State;另一種是 Operator State。在Flink Stream模型中,Datastream 通過 keyBy 的操做能夠變爲 KeyedStream 。 每一個 Key 對應一個 State,即一個 Operator 實例處理多個 Key,訪問相應的多個 State,並由此就衍生了 Keyed State。Keyed State 只能用在 KeyedStream 的算子中,即在整個程序中沒有 keyBy 的過程就沒有辦法使用 KeyedStream。

相比較而言,Operator State 能夠用於全部算子,相對於數據源有一個更好的匹配方式,經常使用於 Source,例如 FlinkKafkaConsumer。相比 Keyed State,一個 Operator 實例對應一個 State,隨着併發的改變,Keyed State 中,State 隨着 Key 在實例間遷移,好比原來有 1 個併發,對應的 API 請求過來,/api/a 和 /api/b 都存放在這個實例當中;若是請求量變大,須要擴容,就會把 /api/a 的狀態和 /api/b 的狀態分別放在不一樣的節點。因爲 Operator State 沒有 Key,併發改變時須要選擇狀態如何從新分配。其中內置了 2 種分配方式:一種是均勻分配,另一種是將全部 State 合併爲全量 State 再分發給每一個實例。

在訪問上,Keyed State 經過 RuntimeContext 訪問,這須要 Operator 是一個Rich Function。Operator State 須要本身實現 CheckpointedFunction 或 ListCheckpointed 接口。在數據結構上,Keyed State 支持的數據結構,好比 ValueState、ListState、ReducingState、AggregatingState 和 MapState;而 Operator State 支持的數據結構相對較少,如 ListState。

3.Keyed State 使用示例

8

Keyed State 有不少種,如圖爲幾種 Keyed State 之間的關係。首先 State 的子類中一級子類有 ValueState、MapState、AppendingState。AppendingState 又有一個子類 MergingState。MergingState 又分爲 3 個子類分別是ListState、ReducingState、AggregatingState。這個繼承關係使它們的訪問方式、數據結構也存在差別。

9

幾種 Keyed State 的差別具體體如今:

  • ValueState 存儲單個值,好比 Wordcount,用 Word 當 Key,State 就是它的 Count。這裏面的單個值多是數值或者字符串,做爲單個值,訪問接口可能有兩種,get 和 set。在 State 上體現的是 update(T) / T value()。
  • MapState 的狀態數據類型是 Map,在 State 上有 put、remove等。須要注意的是在 MapState 中的 key 和 Keyed state 中的 key 不是同一個。
  • ListState 狀態數據類型是 List,訪問接口如 add、update 等。
  • ReducingState 和 AggregatingState 與 ListState 都是同一個父類,但狀態數據類型上是單個值,緣由在於其中的 add 方法不是把當前的元素追加到列表中,而是把當前元素直接更新進了 Reducing 的結果中。
  • AggregatingState 的區別是在訪問接口,ReducingState 中 add(T)和 T get() 進去和出來的元素都是同一個類型,但在 AggregatingState 輸入的 IN,輸出的是 OUT。

10

下面以 ValueState 爲例,來闡述一下具體如何使用,以狀態機的案例來說解 。

源代碼地址:github.com/apache/flin…

感興趣的同窗可直接查看完整源代碼,在此截取部分。如圖爲 Flink 做業的主方法與主函數中的內容,前面的輸入、後面的輸出以及一些個性化的配置項都已去掉,僅保留了主幹。

首先 events 是一個 DataStream,經過 env.addSource 加載數據進來,接下來有一個 DataStream 叫 alerts,先 keyby 一個 sourceAddress,而後在 flatMap 一個StateMachineMapper。StateMachineMapper 就是一個狀態機,狀態機指有不一樣的狀態與狀態間有不一樣的轉換關係的結合,以買東西的過程簡單舉例。首先下訂單,訂單生成後狀態爲待付款,當再來一個事件狀態付款成功,則事件的狀態將會從待付款變爲已付款,待發貨。已付款,待發貨的狀態再來一個事件發貨,訂單狀態將會變爲配送中,配送中的狀態再來一個事件簽收,則該訂單的狀態就變爲已簽收。在整個過程當中,隨時均可以來一個事件,取消訂單,不管哪一個狀態,一旦觸發了取消訂單事件最終就會將狀態轉移到已取消,至此狀態就結束了。

Flink 寫狀態機是如何實現的?首先這是一個 RichFlatMapFunction,要用 Keyed State getRuntimeContext,getRuntimeContext 的過程當中須要 RichFunction,因此須要在 open 方法中獲取 currentState ,而後 getState,currentState 保存的是當前狀態機上的狀態。

若是剛下訂單,那麼 currentState 就是待付款狀態,初始化後,currentState 就表明訂單完成。訂單來了後,就會走 flatMap 這個方法,在 flatMap 方法中,首先定義一個 State,從 currentState 取出,即 Value,Value 取值後先判斷值是否爲空,若是 sourceAddress state 是空,則說明沒有被使用過,那麼此狀態應該爲剛建立訂單的初始狀態,即待付款。而後賦值 state = State.Initial,注意此處的 State 是本地的變量,而不是 Flink 中管理的狀態,將它的值從狀態中取出。接下來在本地又會來一個變量,而後 transition,將事件對它的影響加上,剛纔待付款的訂單收到付款成功的事件,就會變成已付款,待發貨,而後 nextState 便可算出。此外,還須要判斷 State 是否合法,好比一個已簽收的訂單,又來一個狀態叫取消訂單,會發現已簽收訂單不能被取消,此時這個狀態就會下發,訂單狀態爲非法狀態。

若是不是非法的狀態,還要看該狀態是否已經沒法轉換,好比這個狀態變爲已取消時,就不會在有其餘的狀態再發生了,此時就會從 state 中 clear。clear 是全部的 Flink 管理 keyed state 都有的公共方法,意味着將信息刪除,若是既不是一個非法狀態也不是一個結束狀態,後面可能還會有更多的轉換,此時須要將訂單的當前狀態 update ,這樣就完成了 ValueState 的初始化、取值、更新以及清零,在整個過程當中狀態機的做用就是將非法的狀態進行下發,方便下游進行處理。其餘的狀態也是相似的使用方式。

三.容錯機制與故障恢復

1.狀態如何保存及恢復

11

Flink 狀態保存主要依靠 Checkpoint 機制,Checkpoint 會定時製做分佈式快照,對程序中的狀態進行備份。分佈式快照是如何實現的能夠參考【第二課時】的內容,這裏就不在闡述分佈式快照具體是如何實現的。分佈式快照 Checkpoint 完成後,看成業發生故障瞭如何去恢復?假如做業分佈跑在 3 臺機器上,其中一臺掛了。這個時候須要把進程或者線程移到 active 的 2 臺機器上,此時還須要將整個做業的全部 Task 都回滾到最後一次成功 Checkpoint 中的狀態,而後從該點開始繼續處理。

若是要從 Checkpoint 恢復,必要條件是數據源須要支持數據從新發送。Checkpoint恢復後, Flink 提供兩種一致性語義,一種是剛好一次,一種是至少一次。在作 Checkpoint時,可根據 Barries 對齊來判斷是剛好一次仍是至少一次,若是對齊,則爲剛好一次,不然沒有對齊即爲至少一次。若是做業是單線程處理,也就是說 Barries 是不須要對齊的;若是隻有一個 Checkpoint 在作,無論何時從 Checkpoint 恢復,都會恢復到剛纔的狀態;若是有多個節點,假如一個數據的 Barries 到了,另外一個 Barries 尚未來,內存中的狀態若是已經存儲。那麼這 2 個流是不對齊的,恢復的時候其中一個流可能會有重複。

Checkpoint 經過代碼的實現方法以下:

  • 首先從做業的運行環境 env.enableCheckpointing 傳入 1000,意思是作 2 個 Checkpoint 的事件間隔爲 1 秒。Checkpoint 作的越頻繁,恢復時追數據就會相對減小,同時 Checkpoint 相應的也會有一些 IO 消耗。
  • 接下來是設置 Checkpoint 的 model,即設置了 Exactly_Once 語義,而且須要 Barries 對齊,這樣能夠保證消息不會丟失也不會重複。
  • setMinPauseBetweenCheckpoints 是 2 個 Checkpoint 之間最少是要等 500ms,也就是剛作完一個 Checkpoint。好比某個 Checkpoint 作了700ms,按照原則過 300ms 應該是作下一個 Checkpoint,由於設置了 1000ms 作一次 Checkpoint 的,可是中間的等待時間比較短,不足 500ms 了,須要多等 200ms,所以以這樣的方式防止 Checkpoint 太過於頻繁而致使業務處理的速度降低。
  • setCheckpointTimeout 表示作 Checkpoint 多久超時,若是 Checkpoint 在 1min 以內還沒有完成,說明 Checkpoint 超時失敗。 setMaxConcurrentCheckpoints 表示同時有多少個 Checkpoint 在作快照,這個能夠根據具體需求去作設置。
  • enableExternalizedCheckpoints 表示下 Cancel 時是否須要保留當前的 Checkpoint,默認 Checkpoint 會在整個做業 Cancel 時被刪除。Checkpoint 是做業級別的保存點。

上面講過,除了故障恢復以外,還須要能夠手動去調整併發從新分配這些狀態。手動調整併發,必需要重啓做業並會提示 Checkpoint 已經不存在,那麼做業如何恢復數據?

一方面 Flink 在 Cancel 時容許在外部介質保留 Checkpoint ;另外一方面,Flink 還有另一個機制是 SavePoint。

12

Savepoint 與 Checkpoint 相似,一樣是把狀態存儲到外部介質。看成業失敗時,能夠從外部恢復。Savepoint 與 Checkpoint 有什麼區別呢?

  • 從觸發管理方式來說,Checkpoint 由 Flink 自動觸發並管理,而 Savepoint 由用戶手動觸發並人肉管理;
  • 從用途來說,Checkpoint 在 Task 發生異常時快速恢復,例如網絡抖動或超時異常,而 Savepoint 有計劃地進行備份,使做業能中止後再恢復,例如修改代碼、調整併發;
  • 最後從特色來說,Checkpoint 比較輕量級,做業出現問題會自動從故障中恢復,在做業中止後默認清除;而 Savepoint 比較持久,以標準格式存儲,容許代碼或配置發生改變,恢復須要啓動做業手動指定一個路徑恢復。

2.可選的狀態存儲方式

13

Checkpoint 的存儲,第一種是內存存儲,即 MemoryStateBackend,構造方法是設置最大的StateSize,選擇是否作異步快照,這種存儲狀態自己存儲在 TaskManager 節點也就是執行節點內存中的,由於內存有容量限制,因此單個 State maxStateSize 默認 5 M,且須要注意 maxStateSize <= akka.framesize 默認 10 M。Checkpoint 存儲在 JobManager 內存中,所以總大小不超過 JobManager 的內存。推薦使用的場景爲:本地測試、幾乎無狀態的做業,好比 ETL、JobManager 不容易掛,或掛掉影響不大的狀況。不推薦在生產場景使用。

14

另外一種就是在文件系統上的 FsStateBackend ,構建方法是須要傳一個文件路徑和是否異步快照。State 依然在 TaskManager 內存中,但不會像 MemoryStateBackend 有 5 M 的設置上限,Checkpoint 存儲在外部文件系統(本地或 HDFS),打破了總大小 Jobmanager 內存的限制。容量限制上,單 TaskManager 上 State 總量不超過它的內存,總大小不超過配置的文件系統容量。推薦使用的場景、常規使用狀態的做業、例如分鐘級窗口聚合或 join、須要開啓HA的做業。

15

還有一種存儲爲 RocksDBStateBackend ,RocksDB 是一個 key/value 的內存存儲系統,和其餘的 key/value 同樣,先將狀態放到內存中,若是內存快滿時,則寫入到磁盤中,但須要注意 RocksDB 不支持同步的 Checkpoint,構造方法中沒有同步快照這個選項。不過 RocksDB 支持增量的 Checkpoint,也是目前惟一增量 Checkpoint 的 Backend,意味着每次用戶不須要將全部狀態都寫進去,將增量的改變的狀態寫進去便可。它的 Checkpoint 存儲在外部文件系統(本地或HDFS),其容量限制只要單個 TaskManager 上 State 總量不超過它的內存+磁盤,單 Key最大 2G,總大小不超過配置的文件系統容量便可。推薦使用的場景爲:超大狀態的做業,例如天級窗口聚合、須要開啓 HA 的做業、最好是對狀態讀寫性能要求不高的做業。

四.總結

1.爲何要使用狀態?

前面提到有狀態的做業要有有狀態的邏輯,有狀態的邏輯是由於數據之間存在關聯,單條數據是沒有辦法把全部的信息給表現出來。因此須要經過狀態來知足業務邏輯。

2.爲何要管理狀態?

使用了狀態,爲何要管理狀態?由於實時做業須要7*24不間斷的運行,須要應對不可靠的因素而帶來的影響。

3.如何選擇狀態的類型和存儲方式?

那如何選擇狀態的類型和存儲方式?結合前面的內容,能夠看到,首先是要分析清楚業務場景;好比想要作什麼,狀態到底大不大。比較各個方案的利弊,選擇根據需求合適的狀態類型和存儲方式便可。

視頻回顧:www.bilibili.com/video/av497…


▼ Apache Flink 社區推薦 ▼

Apache Flink 及大數據領域頂級盛會 Flink Forward Asia 2019 重磅開啓,目前正在徵集議題,限量早鳥票優惠ing。瞭解 Flink Forward Asia 2019 的更多信息,請查看:

developer.aliyun.com/special/ffa…

首屆 Apache Flink 極客挑戰賽重磅開啓,聚焦機器學習與性能優化兩大熱門領域,40萬獎金等你拿,加入挑戰請點擊:

tianchi.aliyun.com/markets/tia…

關注 Flink 官方社區微信公衆號,瞭解更多 Flink 資訊!

相關文章
相關標籤/搜索