Apache Flink 端到端(end-to-end)Exactly-Once特性概覽 (翻譯)

Apache Flink 端到端(end-to-end)Exactly-Once特性概覽

本文是flink博文的翻譯,原文連接https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.htmlhtml

2017年12月份發佈的Apache Flink 1.4版本,引進了一個重要的特性:TwoPhaseCommitSinkFunction (關聯Jirahttps://issues.apache.org/jira/browse/FLINK-7210) ,它抽取了兩階段提交協議的公共部分,使得構建端到端Excatly-Once的Flink程序變爲了可能。這些外部系統包括Kafka0.11及以上的版本,以及一些其餘的數據輸入(data sources)和數據接收(data sink)。它提供了一個抽象層,須要用戶本身手動去實現Exactly-Once語義。java

若是僅僅是使用,能夠查看這個文檔https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html算法

若是想要了解更多,這篇文章咱們會深刻了解這個特性,以及Flink背後作的工做。apache

縱覽全篇,有如下幾點:api

  • 描述Flink checkpoints的做用,以及它是如何保障Flink程序Exactly-Once的語義的。
  • 展示Flink如何與兩階段提交協議與輸入輸出(data sources and data sinks)交互,藉此傳遞端到端的Exactly-Once語義保證。
  • 經過一個簡單的例子來展示如何使用TwoPhaseCommitSinkFunction,來實現Exactly-Once的文件輸出(file sink)。

Apache Flink程序的Exactly-Once語義

當咱們在討論Exactly-Once語義的時候,咱們指的是每個到來的事件僅會影響最終結果一次。就算機器宕機或者軟件崩潰,即沒有數據重複,也沒有數據丟失。網絡

Flink好久以前就提供了Exactly-Once語義。在過去的幾年時間裏,咱們對Flink的checkpoint作了深刻的描述 ,這個是Flink可以提供Exactly-Once語義的核心。Flink文檔也對這個特性作了深刻的介紹異步

在咱們繼續以前,有一個關於checkpoint算法的簡要介紹,這對於瞭解更廣的主題來講是十分必要的。分佈式

一個checkpoint是Flink的一致性快照,它包括:性能

  1. 程序當前的狀態
  2. 輸入流的位置

Flink經過一個可配置的時間,週期性的生成checkpoint,將它寫入到存儲中,例如S3或者HDFS。寫入到存儲的過程是異步的,意味着Flink程序在checkpoint運行的同時還能夠處理數據。翻譯

在機器或者程序遇到錯誤重啓的時候,Flink程序會使用最新的checkpoint進行恢復。Flink會恢復程序的狀態,將輸入流回滾到checkpoint保存的位置,而後從新開始運行。這意味着Flink能夠像沒有發生錯誤同樣計算結果。

在Flink 1.4.0版本以前,Flink僅保證Flink程序內部的Exactly-Once語義,沒有擴展到在Flink數據處理完成後存儲的外部系統。

Flink程序能夠和不一樣的接收器(sink)交互,開發者須要有能力在一個組件的上下文中維持Exactly-Once語義。

爲了提供端到端Exactly-Once語義,除了Flink應用程序自己的狀態,Flink寫入的外部存儲也須要知足這個語義。也就是說,這些外部系統必須提供提交或者回滾的方法,而後經過Flink的checkpoint來協調。

在分佈式系統中,協調提交和回滾的通用作法是兩階段提交。接下來,咱們討論Flink的TwoPhaseCommitSinkFunction如何使用兩階段提交協議來保證端到端的Exactly-Once語義。

Flink程序端到端的Exactly-Once語義

咱們簡略的看一下兩階段提交協議,以及它如何在一個讀寫Kafka的Flink實例程序中提供端到端的Exactly-Once語義。Kafka是一個流行的消息中間件,常常被拿來和Flink一塊兒使用,Kafka 在最近的0.11版本中添加了對事務的支持。這意味着如今Flink讀寫Kafka有了必要的支持,使之能提供端到端的Exactly-Once語義。

Flink對端到端的Exactly-Once語義不只僅侷限在Kafka,你可使用任一輸入輸出源(source、sink),只要他們提供了必要的協調機制。例如Pravega ,來自DELL/EMC的流數據存儲系統,經過Flink的TwoPhaseCommitSinkFunction也能支持端到端的Exactly-Once語義。

在這個示例程序中,咱們有:

  • 從Kafka讀取數據的data source(KafkaConsumer,在Flink中)
  • 窗口聚合
  • 將數據寫回到Kafka的data sink(KafkaProducer,在Flink中)

在data sink中要保證Exactly-Once語義,它必須將全部的寫入數據經過一個事務提交到Kafka。在兩個checkpoint之間,一個提交綁定了全部要寫入的數據。

這保證了當出錯的時候,寫入的數據能夠被回滾。

然而在分佈式系統中,一般擁有多個並行執行的寫入任務,簡單的提交和回滾是效率低下的。爲了保證一致性,全部的組件必須先達成一致,才能進行提交或者回滾。Flink使用了兩階段提交協議以及預提交階段來解決這個問題。

在checkpoint開始的時候,即兩階段提交中的預提交階段。首先,Flink的JobManager在數據流中注入一個checkpoint屏障(它將數據流中的記錄分割開,一些進入到當前的checkpoint,另外一些進入下一個checkpoint)。

屏障經過operator傳遞。對於每個operator,它將觸發operator的狀態快照寫入到state backend。

data source保存了Kafka的offset,以後把checkpoint屏障傳遞到後續的operator。

這種方式僅適用於operator有他的內部狀態。內部狀態是指,Flink state backends保存和管理的內容-舉例來講,第二個operator中window聚合算出來的sum。當一個進程有它的內部狀態的時候,除了在checkpoint以前將須要將數據更改寫入到state backend,不須要在預提交階段作其餘的動做。在checkpoint成功的時候,Flink會正確的提交這些寫入,在checkpoint失敗的時候會終止提交。

然而,當一個進程有外部狀態的時候,須要用一種不一樣的方式來處理。外部狀態一般由須要寫入的外部系統引入,例如Kafka。所以,爲了提供Exactly-Once保證,外部系統必須提供事務支持,藉此和兩階段提交協議交互。

咱們知道在咱們的例子中,因爲須要將數據寫到Kafka,data sink有外部的狀態。所以,在預提交階段,除了將狀態寫入到state backend以外,data sink必須預提交本身的外部事務。

當checkpoint屏障在全部operator中都傳遞了一遍,以及它觸發的快照寫入完成,預提交階段結束。這個時候,快照成功結束,整個程序的狀態,包括預提交的外部狀態是一致的。萬一出錯的時候,咱們能夠經過checkpoint從新初始化。

下一步是通知全部operator,checkpoint已經成功了。這時兩階段提交中的提交階段,Jobmanager爲程序中的每個operator發起checkpoint已經完成的回調。data source和window operator沒有外部的狀態,在提交階段中,這些operator不會執行任何動做。data sink擁有外部狀態,因此經過事務提交外部寫入。

讓咱們對上述的知識點彙總一下:

  • 一旦全部的operator完成預提交,就提交一個commit。
  • 若是至少有一個預提交失敗,其餘的都會失敗,這時回滾到上一個checkpoint保存的位置。
  • 預提交成功後,提交的commit也須要保障最終成功-operator和外部系統須要提供這個保障。若是commit失敗了(好比網絡中斷引發的故障),整個flink程序也所以失敗,它會根據用戶的重啓策略重啓,可能還會有一個嘗試性的提交。這個過程很是嚴苛,由於若是提交沒有最終生效,會致使數據丟失。

所以,咱們能夠肯定全部的operator贊成checkpoint的最終結果:要麼都贊成提交數據,要麼提交被終止而後回滾。

在Flink程序中實現兩階段提交

完整的實現兩階段提交協議可能會有一點複雜,所以Flink將通用邏輯提取到一個abstract的類TwoPhaseCommitSinkFunction。

讓咱們經過一個簡單的文件操做例子來講明如何使用TwoPhaseCommitSinkFunction。咱們只須要實現四個method,並使sink呈現Exactly-Once語義。

  1. beginTransaction - 在事務開始前,咱們在目標文件系統上面的臨時目錄上建立一個臨時文件。隨後,咱們在程序處理的時候能夠將數據寫入到這個文件。
  2. preCommit - 在預提交階段,咱們刷新文件到磁盤,關閉文件,不要從新打開寫入。咱們也會爲下一個checkpoint的文件寫入開啓一個新的事務。
  3. commit - 在提交階段,咱們原子性的將預提交階段的文件移動到真正的目標目錄。須要注意的是,這增長了輸出數據的可見性的延遲。
  4. abort - 在終止階段,咱們刪除臨時文件。

咱們知道,若是步驟中有任何錯誤,Flink會經過最新的checkpoint來恢復程序狀態。在一個罕見的場景中,預提交成功了,在通知到達operator以前失敗了。這時候,Flink將operator的狀態恢復到預提交階段,即還未真正提交的時候。

爲了能在重啓的時候可以正確的終止或者提交事務,咱們須要在預提交階段將足夠的信息保存到checkpoint中。在這個例子中,這些信息是臨時文件以及目標目錄的地址。

TwoPhaseCommitSinkFunction 已經把這個場景考慮進去了,在從checkpoint恢復的時候,它會優先提交一個commit。咱們的任務是將commit實現成一個冪等的操做。通常的,這不是難題。在這個例子中,咱們能夠發現這種狀況:臨時文件不在臨時目錄中,可是已經移動到目標目錄了。

在TwoPhaseCommitSinkFunction中,有一些其餘的邊界條件也考慮在內了。經過Flink文檔查看更多。

總結

若是你看到了這麼後面,很感謝你通讀這個詳細的帖子。如下是咱們主要覆蓋的關鍵點:

  • Flink的checkpoint系統是它支撐兩階段協議和保障Exactly-Once語義的基礎設施,
  • 這種實現方案的優勢是,Flink不像其餘系統那樣,經過網絡傳輸存儲數據 - 它不須要像大部分批處理程序那樣,將每個計算結果保存到磁盤。
  • Flink的TwoPhaseCommitSinkFunction提取了兩階段提交協議的通用部分,經過這個方法結合Flink以及支持事務的外部系統,能夠構建端到端的Exactly-Once程序。
  • Flink 1.4.0開始,Pravega和Kafka 0.11 producer提供了Exactly-Once語義;經過Kafka在0.11版本第一次引入的事務,爲在Flink中使用Kafka producer提供Exactly-Once語義提供了可能性。
  • Kafka 0.11版本的producer 是在TwoPhaseCommitSinkFunction基礎上實現的,它at-least-once的producer的基礎上增長了很小的開銷。

咱們爲這個特性能提供的功能感到很興奮,從此但願能找到更多支持 TwoPhaseCommitSinkFunction的producer。

相關文章
相關標籤/搜索