【譯】Apache Flink 容錯機制

原文地址:flink-release-1.2 Data Streaming Fault Tolerancehtml

Introduce

Apache Flink 提供了能夠恢復數據流應用到一致狀態的容錯機制。確保在發生故障時,程序的每條記錄只會做用於狀態一次(exactly-once),固然也能夠降級爲至少一次(at-least-once)。算法

容錯機制經過持續建立分佈式數據流的快照來實現。對於狀態佔用空間小的流應用,這些快照很是輕量,能夠高頻率建立而對性能影響很小。流計算應用的狀態保存在一個可配置的環境,如:master 節點或者 HDFS上。apache

在遇到程序故障時(如機器、網絡、軟件等故障),Flink 中止分佈式數據流。系統重啓全部 operator ,重置其到最近成功的 checkpoint。輸入重置到相應的狀態快照位置。保證被重啓的並行數據流中處理的任何一個 record 都不是 checkpoint 狀態以前的一部分。後端

注意:爲了容錯機制生效,數據源(例如 queue 或者 broker)須要能重放數據流。Apache Kafka 有這個特性,Flink 中 Kafka 的 connector 利用了這個功能。緩存

注意:因爲 Flink 的 checkpoint 是經過分佈式快照實現的,接下來咱們將 snapshot 和 checkpoint 這兩個詞交替使用。網絡

Checkpointing

Flink 容錯機制的核心就是持續建立分佈式數據流及其狀態的一致快照。這些快照在系統遇到故障時,充當能夠回退的一致性檢查點(checkpoint)。Lightweight Asynchronous Snapshots for Distributed Dataflows 描述了Flink 建立快照的機制。此論文是受分佈式快照算法 Chandy-Lamport 啓發,並針對 Flink 執行模型量身定製。數據結構

Barriers

Flink 分佈式快照的核心概念之一就是數據柵欄(barrier)。這些 barrier 被插入到數據流中,做爲數據流的一部分和數據一塊兒向下流動。Barrier 不會干擾正常數據,數據流嚴格有序。一個 barrier 把數據流分割成兩部分:一部分進入到當前快照,另外一部分進入下一個快照。每個 barrier 都帶有快照 ID,而且 barrier 以前的數據都進入了此快照。Barrier 不會干擾數據流處理,因此很是輕量。多個不一樣快照的多個 barrier 會在流中同時出現,即多個快照可能同時建立。異步

steam barriers

Barrier 在數據源端插入,當 snapshot n 的 barrier 插入後,系統會記錄當前 snapshot 位置值 n (用Sn表示)。例如,在 Apache Kafka 中,這個變量表示某個分區中最後一條數據的偏移量。這個位置值 Sn 會被髮送到一個稱爲 checkpoint coordinator 的模塊。(即 Flink 的 JobManager).分佈式

而後 barrier 繼續往下流動,當一個 operator 從其輸入流接收到全部標識 snapshot n 的 barrier 時,它會向其全部輸出流插入一個標識 snapshot n 的 barrier。當 sink operator (DAG 流的終點)從其輸入流接收到全部 barrier n 時,它向 the checkpoint coordinator 確認 snapshot n 已完成。當全部 sink 都確認了這個快照,快照就被標識爲完成。函數

stream aligning

接收超過一個輸入流的 operator 須要基於 barrier 對齊(align)輸入。參見上圖:

  • operator 只要一接收到某個輸入流的 barrier n,它就不能繼續處理此數據流後續的數據,直到 operator 接收到其他流的 barrier n。不然會將屬於 snapshot n 的數據和 snapshot n+1的搞混

  • barrier n 所屬的數據流先不處理,從這些數據流中接收到的數據被放入接收緩存裏(input buffer)

  • 當從最後一個流中提取到 barrier n 時,operator 會發射出全部等待向後發送的數據,而後發射snapshot n 所屬的 barrier

  • 通過以上步驟,operator 恢復全部輸入流數據的處理,優先處理輸入緩存中的數據

State

operator 包含任何形式的狀態,這些狀態都必須包含在快照中。狀態有不少種形式:

  • 用戶自定義狀態:由 transformation 函數例如( map() 或者 filter())直接建立或者修改的狀態。用戶自定義狀態能夠是:轉換函數中的 Java 對象的一個簡單變量或者函數關聯的 key/value 狀態。參見 State in Streaming Applications

  • 系統狀態:這種狀態是指做爲 operator 計算中一部分緩存數據。典型例子就是: 窗口緩存(window buffers),系統收集窗口對應數據到其中,直到窗口計算和發射。

operator 在收到全部輸入數據流中的 barrier 以後,在發射 barrier 到其輸出流以前對其狀態進行快照。此時,在 barrier 以前的數據對狀態的更新已經完成,不會再依賴 barrier 以前數據。因爲快照可能很是大,因此後端存儲系統可配置。默認是存儲到 JobManager 的內存中,可是對於生產系統,須要配置成一個可靠的分佈式存儲系統(例如 HDFS)。狀態存儲完成後,operator 會確認其 checkpoint 完成,發射出 barrier 到後續輸出流。

快照如今包含了:

  • 對於並行輸入數據源:快照建立時數據流中的位置偏移

  • 對於 operator:存儲在快照中的狀態指針

checkpointing

Exactly Once vs. At Least Once

對齊操做可能會對流程序增長延遲。一般,這種額外的延遲在幾毫秒的數量級,可是咱們也遇到過延遲顯著增長的異常狀況。針對那些須要對全部輸入都保持毫秒級的應用,Flink 提供了在 checkpoint 時關閉對齊的方法。當 operator 接收到一個 barrier 時,就會打一個快照,而不會等待其餘 barrier。

跳過對齊操做使得即便在 barrier 到達時,Operator 依然繼續處理輸入。這就是說:operator 在 checkpoint n 建立以前,繼續處理屬於 checkpoint n+1 的數據。因此當異常恢復時,這部分數據就會重複,由於它們被包含在了 checkpoint n 中,同時也會在以後再次被處理。

注意:對齊操做只會發生在擁有多輸入運算(join)或者多個輸出的 operator(重分區、分流)的場景下。因此,對於自由 map(), flatmap(), fliter() 等的並行操做即便在至少一次的模式中仍然會保證嚴格一次。

Asynchronous State Snapshots

咱們注意到上面描述的機制意味着當 operator 向後端存儲快照時,會中止處理輸入的數據。這種同步操做會在每次快照建立時引入延遲。

咱們徹底能夠在存儲快照時,讓 operator 繼續處理數據,讓快照存儲在後臺異步運行。爲了作到這一點,operator 必須可以生成一個後續修改不影響以前狀態的狀態對象。例如 RocksDB 中使用的寫時複製( copy-on-write )類型的數據結構。

接收到輸入的 barrier 時,operator 異步快照複製出的狀態。而後當即發射 barrier 到輸出流,繼續正常的流處理。一旦後臺異步快照完成,它就會向 checkpoint coordinator(JobManager)確認 checkpoint 完成。如今 checkpoint 完成的充分條件是:全部 sink 接收到了 barrier,全部有狀態 operator 都確認完成了狀態備份(可能會比 sink 接收到 barrier 晚)。

更多狀態快照參見:state backends

Recovery

在這種容錯機制下的錯誤回覆很明顯:一旦遇到故障,Flink 選擇最近一個完成的 checkpoint k。系統從新部署整個分佈式數據流,重置全部 operator 的狀態到 checkpoint k。數據源被置爲從 Sk 位置讀取。例如在 Apache Kafka 中,意味着讓消費者從 Sk 處偏移開始讀取。

若是是增量快照,operator 須要從最新的全量快照回覆,而後對此狀態進行一系列增量更新。

Operator Snapshot Implementation

當 operator 快照建立時有兩部分操做:同步操做和異步操做。

operator 和後端存儲將快照以 Java FutureTask 的方式提供。這個 task 包含了同步操做已經完成,異步操做還在等待的狀態(state)。異步操做在後臺線程中被執行。

徹底同步的 operator 返回一個已經完成的 FutureTask 。若是異步操做須要執行,FutureTask 中的 run() 方法會被調用。

爲了釋放流和其餘資源的消耗,能夠取消這些 task。

相關文章
相關標籤/搜索