本文是博主閱讀Flink官方文檔以及《Flink基礎教程》後結合本身理解所寫,如有表達有誤的地方歡迎大夥留言指出。html
流式計算分爲有狀態和無狀態兩種狀況,所謂狀態就是計算過程當中的中間值。對於無狀態計算,會獨立觀察每一個獨立事件,並根據最後一個事件輸出結果。什麼意思?大白話舉例:對於一個流式系統,接受到一系列的數字,當數字大於N則輸出,這時候在此以前的數字的值、和等狀況,壓根不關心,只和最後這個大於N的數字相關,這就是無狀態計算。什麼是有狀態計算了?想求過去一分鐘內全部數字的和或者平均數等,這種須要保存中間結果的狀況是有狀態的計算。java
當分佈式計系統中引入狀態計算時,就無可避免一致性的問題。爲何了?由於如果計算過程當中出現故障,中間數據咋辦了?如果不保存,那就只能從新從頭計算了,否則怎麼保證計算結果的正確性。這就是要求系統具備容錯性了。算法
談到容錯性,就無法避免一致性這個概念。所謂一致性就是:成功處理故障並恢復以後獲得的結果與沒有發生任何故障是獲得的結果相比,前者的正確性。換句大白話,就是故障的發生是否影響獲得的結果。在流處理過程,一致性分爲3個級別[1]:apache
Flink的容錯機制保證了exactly-once,也能夠選擇at-least-once。Flink的容錯機制是經過對數據流不停的作快照(snapshot)實現的。針對FLink的容錯機制須要注意的是:要徹底保證exactly-once,Flink的數據源系統須要有「重放」功能,什麼意思了?且聽下面慢慢道來。緩存
Flink作快照的過程是基於「輕量級異步快照」的算法,其核心思想就是在計算過程當中保存中間狀態和在數據流中對應的位置,至於如何實現的會後續的博客中會詳細說明。這些保存的信息(快照)就至關因而系統的檢查點(checkpoint)(相似於window系統發生死機等問題時恢復系統到某個時間點的恢復點),作snapshot也是作一個checkpoint。在系統故障恢復時,系統會從最新的一個checkpoint開始從新計算,對應的數據源也會在對應的位置「重放「。這裏的「重放」可能會致使數據的二次輸出,這點的處理也在後續的博客中說明。數據結構
在Flink作分佈式快照過程當中核心一個元素Barriers的使用。這些Barriers是在數據接入到Flink之初就注入到數據流中,並隨着數據流向每一個算子(operator,這裏所說的算子不是指相似map()等具體意義上個的,指在JobGraph中優化後的「頂點」),這裏須要說明的有兩點:異步
以下圖所示,Barriers將將數據流分紅了一個個數據集。值得提醒的是,當barriers流經算子時,會觸發與checkpoint相關的行爲,保存的barriers的位置和狀態(中間計算結果)。async
Update:checkpoint是由JobManager中的CheckpointCoordinator週期性觸發,而後在Task側生成barrier,具體爲:在Source task(TaskManager中)中barrier會根據命令週期性的在原始數據中注入barrier,而對非source task則是遇到barrier作checkpoint,即非source task其作checkpoint的時間間隔也許不是週期的,影響因素較多。此外,每一個算子作checkpoint的方式也許不一樣。分佈式
能夠打個比方,在河上有個大壩(至關於算子),接上級通知(Flink中的JobManager)要統計水流量等信息,因此有人在上游按期(source task)放一根木頭(barrier)到河中,當第一木頭遇到大壩時,大壩就記下經過大壩木頭的位置、水流量等相關狀況,即作checkpoint(實際生活中不太可能),當第二木頭遇到大壩時記下第一個木頭和第二根木頭之間的水流量等狀況,不須要重開始計算。這裏先無論故障了,否則就很差解釋相同的水的「重放」問題了。 優化
當一個算子有多個數據源時,又如何作checkpoint了?
以下圖,從左往右一共4副圖。當算子收到其中一個數據源的barriers,而未收到另外一個數據源的barriers時(如左1圖),會將先到barriers的數據源中的數據先緩衝起來,等待另外一個barriers(如左2圖),當收到兩個barriers(如左3圖)即接收到所有數據源的barrier時,會作checkpoint,保存barriers位置和狀態,發射緩衝中的數據,釋放一個對應的barriers。這裏須要注意是,當緩存中數據沒有被髮射完時,是不會處理後續數據的,這樣是爲了保證數據的有序性。
這裏其實有一點須要注意的是,由於系統設置checkpoint的方式是經過時間間隔的形式(enableCheckpointing(intervalTime)
),因此會致使一個問題:當一個checkpoint所需時間遠大於兩次checkpoint之間的時間間隔時,就頗有可能會致使後續的checkpoint會失敗,如果這樣狀況比較嚴重時會致使任務失敗,這樣Flink系統的容錯性的優點就等不到保證了,因此須要合理設計checkpoint間隔時間。
以下圖所示,在一次snapshot中,算子會在接受到其數據源的全部barriers的之後snapshot它們的狀態,而後在發射barriers到輸出流中,直到最後全部的sink算子都完成snapshot纔算完成一次snapshot。其中,在準備發射的barriers造成以前,state 形式是能夠改變的,以後就不能夠了。state的存貯方式是能夠配置的,如HDFS,默認是在JobManager的內存中。
上述描述中,須要等待算子接收到全部barriers後,開始作snapshot,存儲對應的狀態後,再進行下一次snapshot,其狀態的存儲是同步的,這樣可能會形成因snapshot引發較大延時。可讓算子在存儲快照時繼續處理數據,讓快照存儲異步在後臺運行。爲此,算子必須能生成一個 state 對象,保證後續狀態的修改不會改變這個 state 對象。例如 RocksDB 中使用的 copy-on-write(寫時複製)類型的數據結構,即異步狀態快照。對異步狀態快照,其可讓算子接受到barriers後開始在後臺異步拷貝其狀態,而沒必要等待全部的barriers的到來。一旦後臺的拷貝完成,將會通知JobManager。只有當全部的sink接收到這個barriers,和全部的有狀態的算子都確認完成狀態的備份時,一次snapshot纔算完成。如何實現的,這點後續博客將仔細分析。
Ref:
[1]《Flink基礎教程》
[2]https://ci.apache.org/projects/flink/flink-docs-release-1.6/internals/stream_checkpointing.html