本文由Flink 博客 翻譯而來,爲了敘述的可讀性和流暢性,筆者作了少許的修改。html
Apache Flink是爲了「有狀態」的處理流式數據創建的。那麼,在流式計算程序中,狀態的含義是什麼? 我在前面的博客中作了「狀態」以及 「有狀態的流式處理」的定義。這裏回顧一下,狀態指的是,在程序中,Operator將過去處理過的event信息保存在內存中, 這樣能夠在以後的處理中使用。git
「狀態」是一個基礎的功能,使得在流式計算中複雜的用戶使用場景成爲可能。在Flink 文檔中列舉 了一些例子。github
可是,只有「狀態」擁有容錯能力,這樣才能在生產環境使用。「容錯性」意味着,即便有軟件或者機器的故障,最終的計算結果也是精確的,沒有數據丟失也沒有重複處理。apache
Flink的容錯特性很是強大,它不只對軟件和機器的負載很小,而且也提供了「端到端僅一次」的消息傳遞保證。緩存
Flink程序容錯機制的核心是檢查點。Flink的檢查點是一個全局的、異步的程序快照,它週期性的生成並送到持久化存儲(通常使用分佈式系統)。 當發生故障時,Flink使用最新的檢查點進行重啓。一些Flink的用戶在程序「狀態」中保存了GB甚至TB的數據。這些用戶反饋在大量 的狀態下,建立檢查點一般很慢而且耗資源,這也是爲何Flink在 1.3版本開始引入「增量式的檢查點」。併發
在引入「增量式的檢查點」以前,每個Flink的檢查點都保存了程序完整的狀態。後來咱們意識到在大部分狀況下這是沒必要要的,由於上一次和此次的檢查點以前 ,狀態發生了很大的變化,因此咱們建立了「增量式的檢查點」。增量式的檢查點僅保存過去和如今狀態的差別部分。異步
增量式的檢查點能夠爲擁有大量狀態的程序帶來很大的提高。在早期的測試中,一個擁有TB級別「狀態」程序將生成檢查點的耗時從3分鐘以上下降 到了30秒左右。由於增量式的檢查點不須要每次把完整的狀態發送到存儲中。分佈式
如今只能經過RocksDB state back-end來獲取增量式檢查點的功能,Flink使用RocksDB內置的備份機制來合併檢查點數據。這樣, Flink 增量式檢查點的數據不會無限制的增大,它會自動合併老的檢查點數據並清理掉。性能
想要在程序中使用增量式的檢查點,我建議詳細的閱讀Flink 檢查點的官方文檔 總的來講,要啓用這個機制,能夠以下設置:測試
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new RocksDBStateBackend(filebackend, true));//第二個參數爲true
默認的,Flink保留一個完整的檢查點,若是你須要保留更多,能夠經過以下的配置設置:
state.checkpoints.num-retained
Flink 增量式的檢查點以「RocksDB」爲基礎,RocksDB是一個基於 LSM樹的KV存儲,新的數據保存在內存中,稱爲memtable。若是Key相同,後到的數據將覆蓋以前的數據,一旦memtable寫滿了,RocksDB將數據壓縮並寫入到磁盤。memtable的數據持久化到磁盤後,他們就變成了不可變的sstable。
RocksDB會在後臺執行compaction,合併sstable並刪除其中重複的數據。以後RocksDB刪除原來的sstable,替換成新合成的ssttable,這個sstable包含了以前的sstable中的信息。
在這個基礎之上,Flink跟蹤前一個checkpoint建立和刪除的RocksDB sstable文件,由於sstable是不可變的,Flink能夠所以計算出 狀態有哪些改變。爲了達到這個目標,Flink在RocksDB上觸發了一個刷新操做,強制將memtable刷新到磁盤上。這個操做在Flink中是同步的,其餘的操做是異步的,不會阻塞數據處理。
Flink 的checkpoint會將新的sstable發送到持久化存儲(例如HDFS,S3)中,同時保留引用。Flink不會發送全部的sstable, 一些數據在以前的checkpoint存在而且寫入到持久化存儲中了,這樣只須要增長引用次數就能夠了。由於compaction的做用,一些sstable會合併成一個sstable並刪除這些sstable,這也是爲何Flink能夠減小checkpoint的歷史文件。
爲了分析checkpoint的數據變動,而上傳整理過的sstable是多餘的(下文會有描述,這裏的意思是以前已經上傳過的,不須要再次上傳)。Flink處理這種狀況,僅帶來一點點開銷。這個過程很重要,由於在任務須要重啓的時候,Flink只須要保留較少的歷史文件。
假設有一個子任務,擁有一個keyed state的operator,checkpoint最多保留2個。上面的圖片描述了每一個checkpoint對應的RocksDB 的狀態,它引用到的文件,以及在checkpoint完成後共享狀態中的count值。
checkpoint ‘CP2’,本地的RocksDB目錄有兩個sstable文件,這些文件是新生成的,因而Flink將它們傳到了checkpoint 對應的存儲目錄。當checkpoint完成後,Flink在共享狀態中建立兩個實體,並將count設爲1。在這個共享狀態中,這個key 由operator、subtask,原始的sstable名字組成,value爲sstable實際存儲目錄。
checkpoint‘CP2’,RocksDB有2個老的sstable文件,又建立了2個新的sstable文件。Flink將這兩個新的sstable傳到 持久化存儲中,而後引用他們。當checkpoint完成後,Flink將全部的引用的相應計數加1。
checkpoint‘CP3’,RocksDB的compaction將sstable-(1), sstable-(2), sstable-(3) 合併成 sstable-(1,2,3),而後刪除 原始的sstable。這個合併後的文件包含了和以前源文件同樣的信息,而且清理掉了重複的部分。sstable-(4)還保留着,而後有一個 新生成的sstable-(5)。Flink將新的 sstable-(1,2,3)以及 sstable-(5)傳到持久化存儲中, sstable-(4)仍被‘CP2’引用,因此 將計數增長1。如今有了3個checkpoint,'CP1','CP2','CP3',超過了預設的保留數目2,因此CP1被刪除。做爲刪除的一部分, CP1對應的文件(sstable-(1)、sstable-(2)) 的引用計數減1。
checkpoint‘CP4’,RocksDB將sstable-(4), sstable-(5), 新的 sstable-(6) 合併成 sstable-(4,5,6)。Flink將新合併 的 sstable-(4,5,6)發送到持久化存儲中,sstable-(1,2,3)、sstable-(4,5,6) 的引用計數增長1。因爲再次到達了checkpoint的 保留數目,‘CP2’將被刪除,‘CP2’對應的文件(sstable-(1)、sstable-(2)、sstable(3) )的引用計數減1。因爲‘CP2’對應 的文件的引用計數達到0,這些文件將被刪除。
因爲Flink能夠並行的執行多個checkpoint,有時候前面的checkpoint尚未完成,後面的新的checkpoint就啓動了。所以,在 使用增量式的checkpoint的時候,你須要考慮使用哪個checkpoint啓動。Flink在使用checkpoint以前須要checkpoint協調器的確認, 因此不會使用那些被刪除的checkpoint。
若是使用增量式的checkpoint,那麼在錯誤恢復的時候,不須要考慮不少的配置項。一旦發生了錯誤,Flink的JobManager會告訴 task須要從最新的checkpoint中恢復,它能夠是全量的或者是增量的。以後TaskManager從分佈式系統中下載checkpoint文件, 而後從中恢復狀態。
增量式的checkpoint能爲擁有大量狀態的程序帶來較大的提高,但還有一些trade-off須要考慮。總的來講,增量式減小了checkpoint操做的時間,可是相對的,從checkpoint中恢復可能更耗時,具體狀況須要根據應用程序包含的狀態大小而定。相對的,若是程序只是部分失敗,Flink TaskManager須要從多個checkpoint中讀取數據,這時候使用全量的checkpoint來恢復數據可能更加耗時。同時,因爲新的checkpoint可能引用到老的checkpoint,這樣老的checkpoint就不能被刪除,這樣下去,歷史的版本數據會愈來愈大。須要考慮使用分佈式來存儲checkpoint,另外還須要考慮讀取帶來的帶寬消耗。
還有一些便利性和性能的trade-off,能夠經過閱讀Flink 文檔 瞭解更多。