做者:孫夢瑤 整理:韓非java
本文主要分享內容以下:git
首先舉一個無狀態計算的例子:消費延遲計算。假設如今有一個消息隊列,消息隊列中有一個生產者持續往消費隊列寫入消息,多個消費者分別從消息隊列中讀取消息。從圖上能夠看出,生產者已經寫入 16 條消息,Offset 停留在 15 ;有 3 個消費者,有的消費快,而有的消費慢。消費快的已經消費了 13 條數據,消費者慢的才消費了 七、8 條數據。github
如何實時統計每一個消費者落後多少條數據,如圖給出了輸入輸出的示例。能夠了解到輸入的時間點有一個時間戳,生產者將消息寫到了某個時間點的位置,每一個消費者同一時間點分別讀到了什麼位置。剛纔也提到了生產者寫入了 15 條,消費者分別讀取了 十、七、12 條。那麼問題來了,怎麼將生產者、消費者的進度轉換爲右側示意圖信息呢?apache
consumer 0 落後了 5 條,consumer 1 落後了 8 條,consumer 2 落後了 3 條,根據 Flink 的原理,此處需進行 Map 操做。Map 首先把消息讀取進來,而後分別相減,便可知道每一個 consumer 分別落後了幾條。Map 一直往下發,則會得出最終結果。api
你們會發現,在這種模式的計算中,不管這條輸入進來多少次,輸出的結果都是同樣的,由於單條輸入中已經包含了所需的全部信息。消費落後等於生產者減去消費者。生產者的消費在單條數據中能夠獲得,消費者的數據也能夠在單條數據中獲得,因此相同輸入能夠獲得相同輸出,這就是一個無狀態的計算。數組
相應的什麼是有狀態的計算?性能優化
以訪問日誌統計量的例子進行說明,好比當前拿到一個 Nginx 訪問日誌,一條日誌表示一個請求,記錄該請求從哪裏來,訪問的哪一個地址,須要實時統計每一個地址總共被訪問了多少次,也即每一個 API 被調用了多少次。能夠看到下面簡化的輸入和輸出,輸入第一條是在某個時間點請求 GET 了 /api/a;第二條日誌記錄了某個時間點 Post /api/b ;第三條是在某個時間點 GET了一個 /api/a,總共有 3 個 Nginx 日誌。從這 3 條 Nginx 日誌能夠看出,第一條進來輸出 /api/a 被訪問了一次,第二條進來輸出 /api/b 被訪問了一次,緊接着又進來一條訪問 api/a,因此 api/a 被訪問了 2 次。不一樣的是,兩條 /api/a 的 Nginx 日誌進來的數據是同樣的,但輸出的時候結果可能不一樣,第一次輸出 count=1 ,第二次輸出 count=2,說明相同輸入可能獲得不一樣輸出。輸出的結果取決於當前請求的 API 地址以前累計被訪問過多少次。第一條過來累計是 0 次,count = 1,第二條過來 API 的訪問已經有一次了,因此 /api/a 訪問累計次數 count=2。單條數據其實僅包含當前此次訪問的信息,而不包含全部的信息。要獲得這個結果,還須要依賴 API 累計訪問的量,即狀態。微信
這個計算模式是將數據輸入算子中,用來進行各類複雜的計算並輸出數據。這個過程當中算子會去訪問以前存儲在裏面的狀態。另一方面,它還會把如今的數據對狀態的影響實時更新,若是輸入 200 條數據,最後輸出就是 200 條結果。網絡
什麼場景會用到狀態呢?下面列舉了常見的 4 種:數據結構
管理狀態最直接的方式就是將數據都放到內存中,這也是很常見的作法。好比在作 WordCount 時,Word 做爲輸入,Count 做爲輸出。在計算的過程當中把輸入不斷累加到 Count。
但對於流式做業有如下要求:
基於以上要求,內存的管理就會出現一些問題。因爲內存的容量是有限制的。若是要作 24 小時的窗口計算,將 24 小時的數據都放到內存,可能會出現內存不足;另外,做業是 7*24,須要保障高可用,機器若出現故障或者宕機,須要考慮如何備份及從備份中去恢復,保證運行的做業不受影響;此外,考慮橫向擴展,假如網站的訪問量不高,統計每一個 API 訪問次數的程序能夠用單線程去運行,但若是網站訪問量忽然增長,單節點沒法處理所有訪問數據,此時須要增長几個節點進行橫向擴展,這時數據的狀態如何平均分配到新增長的節點也問題之一。所以,將數據都放到內存中,並非最合適的一種狀態管理方式。
最理想的狀態管理須要知足易用、高效、可靠三點需求:
Managed State 是 Flink 自動管理的 State,而 Raw State 是原生態 State,二者的區別以下:
Managed State 分爲兩種,一種是 Keyed State;另一種是 Operator State。在Flink Stream模型中,Datastream 通過 keyBy 的操做能夠變爲 KeyedStream 。 每一個 Key 對應一個 State,即一個 Operator 實例處理多個 Key,訪問相應的多個 State,並由此就衍生了 Keyed State。Keyed State 只能用在 KeyedStream 的算子中,即在整個程序中沒有 keyBy 的過程就沒有辦法使用 KeyedStream。
相比較而言,Operator State 能夠用於全部算子,相對於數據源有一個更好的匹配方式,經常使用於 Source,例如 FlinkKafkaConsumer。相比 Keyed State,一個 Operator 實例對應一個 State,隨着併發的改變,Keyed State 中,State 隨着 Key 在實例間遷移,好比原來有 1 個併發,對應的 API 請求過來,/api/a 和 /api/b 都存放在這個實例當中;若是請求量變大,須要擴容,就會把 /api/a 的狀態和 /api/b 的狀態分別放在不一樣的節點。因爲 Operator State 沒有 Key,併發改變時須要選擇狀態如何從新分配。其中內置了 2 種分配方式:一種是均勻分配,另一種是將全部 State 合併爲全量 State 再分發給每一個實例。
在訪問上,Keyed State 經過 RuntimeContext 訪問,這須要 Operator 是一個Rich Function。Operator State 須要本身實現 CheckpointedFunction 或 ListCheckpointed 接口。在數據結構上,Keyed State 支持的數據結構,好比 ValueState、ListState、ReducingState、AggregatingState 和 MapState;而 Operator State 支持的數據結構相對較少,如 ListState。
Keyed State 有不少種,如圖爲幾種 Keyed State 之間的關係。首先 State 的子類中一級子類有 ValueState、MapState、AppendingState。AppendingState 又有一個子類 MergingState。MergingState 又分爲 3 個子類分別是ListState、ReducingState、AggregatingState。這個繼承關係使它們的訪問方式、數據結構也存在差別。
幾種 Keyed State 的差別具體體如今:
下面以 ValueState 爲例,來闡述一下具體如何使用,以狀態機的案例來說解 。
源代碼地址:github.com/apache/flin…
感興趣的同窗可直接查看完整源代碼,在此截取部分。如圖爲 Flink 做業的主方法與主函數中的內容,前面的輸入、後面的輸出以及一些個性化的配置項都已去掉,僅保留了主幹。
首先 events 是一個 DataStream,經過 env.addSource 加載數據進來,接下來有一個 DataStream 叫 alerts,先 keyby 一個 sourceAddress,而後在 flatMap 一個StateMachineMapper。StateMachineMapper 就是一個狀態機,狀態機指有不一樣的狀態與狀態間有不一樣的轉換關係的結合,以買東西的過程簡單舉例。首先下訂單,訂單生成後狀態爲待付款,當再來一個事件狀態付款成功,則事件的狀態將會從待付款變爲已付款,待發貨。已付款,待發貨的狀態再來一個事件發貨,訂單狀態將會變爲配送中,配送中的狀態再來一個事件簽收,則該訂單的狀態就變爲已簽收。在整個過程當中,隨時均可以來一個事件,取消訂單,不管哪一個狀態,一旦觸發了取消訂單事件最終就會將狀態轉移到已取消,至此狀態就結束了。
Flink 寫狀態機是如何實現的?首先這是一個 RichFlatMapFunction,要用 Keyed State getRuntimeContext,getRuntimeContext 的過程當中須要 RichFunction,因此須要在 open 方法中獲取 currentState ,而後 getState,currentState 保存的是當前狀態機上的狀態。
若是剛下訂單,那麼 currentState 就是待付款狀態,初始化後,currentState 就表明訂單完成。訂單來了後,就會走 flatMap 這個方法,在 flatMap 方法中,首先定義一個 State,從 currentState 取出,即 Value,Value 取值後先判斷值是否爲空,若是 sourceAddress state 是空,則說明沒有被使用過,那麼此狀態應該爲剛建立訂單的初始狀態,即待付款。而後賦值 state = State.Initial,注意此處的 State 是本地的變量,而不是 Flink 中管理的狀態,將它的值從狀態中取出。接下來在本地又會來一個變量,而後 transition,將事件對它的影響加上,剛纔待付款的訂單收到付款成功的事件,就會變成已付款,待發貨,而後 nextState 便可算出。此外,還須要判斷 State 是否合法,好比一個已簽收的訂單,又來一個狀態叫取消訂單,會發現已簽收訂單不能被取消,此時這個狀態就會下發,訂單狀態爲非法狀態。
若是不是非法的狀態,還要看該狀態是否已經沒法轉換,好比這個狀態變爲已取消時,就不會在有其餘的狀態再發生了,此時就會從 state 中 clear。clear 是全部的 Flink 管理 keyed state 都有的公共方法,意味着將信息刪除,若是既不是一個非法狀態也不是一個結束狀態,後面可能還會有更多的轉換,此時須要將訂單的當前狀態 update ,這樣就完成了 ValueState 的初始化、取值、更新以及清零,在整個過程當中狀態機的做用就是將非法的狀態進行下發,方便下游進行處理。其餘的狀態也是相似的使用方式。
Flink 狀態保存主要依靠 Checkpoint 機制,Checkpoint 會定時製做分佈式快照,對程序中的狀態進行備份。分佈式快照是如何實現的能夠參考【第二課時】的內容,這裏就不在闡述分佈式快照具體是如何實現的。分佈式快照 Checkpoint 完成後,看成業發生故障瞭如何去恢復?假如做業分佈跑在 3 臺機器上,其中一臺掛了。這個時候須要把進程或者線程移到 active 的 2 臺機器上,此時還須要將整個做業的全部 Task 都回滾到最後一次成功 Checkpoint 中的狀態,而後從該點開始繼續處理。
若是要從 Checkpoint 恢復,必要條件是數據源須要支持數據從新發送。Checkpoint恢復後, Flink 提供兩種一致性語義,一種是剛好一次,一種是至少一次。在作 Checkpoint時,可根據 Barries 對齊來判斷是剛好一次仍是至少一次,若是對齊,則爲剛好一次,不然沒有對齊即爲至少一次。若是做業是單線程處理,也就是說 Barries 是不須要對齊的;若是隻有一個 Checkpoint 在作,無論何時從 Checkpoint 恢復,都會恢復到剛纔的狀態;若是有多個節點,假如一個數據的 Barries 到了,另外一個 Barries 尚未來,內存中的狀態若是已經存儲。那麼這 2 個流是不對齊的,恢復的時候其中一個流可能會有重複。
Checkpoint 經過代碼的實現方法以下:
上面講過,除了故障恢復以外,還須要能夠手動去調整併發從新分配這些狀態。手動調整併發,必需要重啓做業並會提示 Checkpoint 已經不存在,那麼做業如何恢復數據?
一方面 Flink 在 Cancel 時容許在外部介質保留 Checkpoint ;另外一方面,Flink 還有另一個機制是 SavePoint。
Savepoint 與 Checkpoint 相似,一樣是把狀態存儲到外部介質。看成業失敗時,能夠從外部恢復。Savepoint 與 Checkpoint 有什麼區別呢?
Checkpoint 的存儲,第一種是內存存儲,即 MemoryStateBackend,構造方法是設置最大的StateSize,選擇是否作異步快照,這種存儲狀態自己存儲在 TaskManager 節點也就是執行節點內存中的,由於內存有容量限制,因此單個 State maxStateSize 默認 5 M,且須要注意 maxStateSize <= akka.framesize 默認 10 M。Checkpoint 存儲在 JobManager 內存中,所以總大小不超過 JobManager 的內存。推薦使用的場景爲:本地測試、幾乎無狀態的做業,好比 ETL、JobManager 不容易掛,或掛掉影響不大的狀況。不推薦在生產場景使用。
另外一種就是在文件系統上的 FsStateBackend ,構建方法是須要傳一個文件路徑和是否異步快照。State 依然在 TaskManager 內存中,但不會像 MemoryStateBackend 有 5 M 的設置上限,Checkpoint 存儲在外部文件系統(本地或 HDFS),打破了總大小 Jobmanager 內存的限制。容量限制上,單 TaskManager 上 State 總量不超過它的內存,總大小不超過配置的文件系統容量。推薦使用的場景、常規使用狀態的做業、例如分鐘級窗口聚合或 join、須要開啓HA的做業。
還有一種存儲爲 RocksDBStateBackend ,RocksDB 是一個 key/value 的內存存儲系統,和其餘的 key/value 同樣,先將狀態放到內存中,若是內存快滿時,則寫入到磁盤中,但須要注意 RocksDB 不支持同步的 Checkpoint,構造方法中沒有同步快照這個選項。不過 RocksDB 支持增量的 Checkpoint,也是目前惟一增量 Checkpoint 的 Backend,意味着每次用戶不須要將全部狀態都寫進去,將增量的改變的狀態寫進去便可。它的 Checkpoint 存儲在外部文件系統(本地或HDFS),其容量限制只要單個 TaskManager 上 State 總量不超過它的內存+磁盤,單 Key最大 2G,總大小不超過配置的文件系統容量便可。推薦使用的場景爲:超大狀態的做業,例如天級窗口聚合、須要開啓 HA 的做業、最好是對狀態讀寫性能要求不高的做業。
前面提到有狀態的做業要有有狀態的邏輯,有狀態的邏輯是由於數據之間存在關聯,單條數據是沒有辦法把全部的信息給表現出來。因此須要經過狀態來知足業務邏輯。
使用了狀態,爲何要管理狀態?由於實時做業須要7*24不間斷的運行,須要應對不可靠的因素而帶來的影響。
那如何選擇狀態的類型和存儲方式?結合前面的內容,能夠看到,首先是要分析清楚業務場景;好比想要作什麼,狀態到底大不大。比較各個方案的利弊,選擇根據需求合適的狀態類型和存儲方式便可。
視頻回顧:www.bilibili.com/video/av497…
▼ Apache Flink 社區推薦 ▼
Apache Flink 及大數據領域頂級盛會 Flink Forward Asia 2019 重磅開啓,目前正在徵集議題,限量早鳥票優惠ing。瞭解 Flink Forward Asia 2019 的更多信息,請查看:
developer.aliyun.com/special/ffa…
首屆 Apache Flink 極客挑戰賽重磅開啓,聚焦機器學習與性能優化兩大熱門領域,40萬獎金等你拿,加入挑戰請點擊:
tianchi.aliyun.com/markets/tia…
關注 Flink 官方社區微信公衆號,瞭解更多 Flink 資訊!