本文是flink博文的翻譯,原文連接https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.htmlhtml
2017年12月份發佈的Apache Flink 1.4版本,引進了一個重要的特性:TwoPhaseCommitSinkFunction (關聯Jirahttps://issues.apache.org/jira/browse/FLINK-7210) ,它抽取了兩階段提交協議的公共部分,使得構建端到端Excatly-Once的Flink程序變爲了可能。這些外部系統包括Kafka0.11及以上的版本,以及一些其餘的數據輸入(data sources)和數據接收(data sink)。它提供了一個抽象層,須要用戶本身手動去實現Exactly-Once語義。java
若是僅僅是使用,能夠查看這個文檔https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html。算法
若是想要了解更多,這篇文章咱們會深刻了解這個特性,以及Flink背後作的工做。apache
縱覽全篇,有如下幾點:api
當咱們在討論Exactly-Once語義的時候,咱們指的是每個到來的事件僅會影響最終結果一次。就算機器宕機或者軟件崩潰,即沒有數據重複,也沒有數據丟失。網絡
Flink好久以前就提供了Exactly-Once語義。在過去的幾年時間裏,咱們對Flink的checkpoint作了深刻的描述 ,這個是Flink可以提供Exactly-Once語義的核心。Flink文檔也對這個特性作了深刻的介紹 。異步
在咱們繼續以前,有一個關於checkpoint算法的簡要介紹,這對於瞭解更廣的主題來講是十分必要的。分佈式
一個checkpoint是Flink的一致性快照,它包括:性能
Flink經過一個可配置的時間,週期性的生成checkpoint,將它寫入到存儲中,例如S3或者HDFS。寫入到存儲的過程是異步的,意味着Flink程序在checkpoint運行的同時還能夠處理數據。翻譯
在機器或者程序遇到錯誤重啓的時候,Flink程序會使用最新的checkpoint進行恢復。Flink會恢復程序的狀態,將輸入流回滾到checkpoint保存的位置,而後從新開始運行。這意味着Flink能夠像沒有發生錯誤同樣計算結果。
在Flink 1.4.0版本以前,Flink僅保證Flink程序內部的Exactly-Once語義,沒有擴展到在Flink數據處理完成後存儲的外部系統。
Flink程序能夠和不一樣的接收器(sink)交互,開發者須要有能力在一個組件的上下文中維持Exactly-Once語義。
爲了提供端到端Exactly-Once語義,除了Flink應用程序自己的狀態,Flink寫入的外部存儲也須要知足這個語義。也就是說,這些外部系統必須提供提交或者回滾的方法,而後經過Flink的checkpoint來協調。
在分佈式系統中,協調提交和回滾的通用作法是兩階段提交。接下來,咱們討論Flink的TwoPhaseCommitSinkFunction如何使用兩階段提交協議來保證端到端的Exactly-Once語義。
咱們簡略的看一下兩階段提交協議,以及它如何在一個讀寫Kafka的Flink實例程序中提供端到端的Exactly-Once語義。Kafka是一個流行的消息中間件,常常被拿來和Flink一塊兒使用,Kafka 在最近的0.11版本中添加了對事務的支持。這意味着如今Flink讀寫Kafka有了必要的支持,使之能提供端到端的Exactly-Once語義。
Flink對端到端的Exactly-Once語義不只僅侷限在Kafka,你可使用任一輸入輸出源(source、sink),只要他們提供了必要的協調機制。例如Pravega ,來自DELL/EMC的流數據存儲系統,經過Flink的TwoPhaseCommitSinkFunction也能支持端到端的Exactly-Once語義。
在這個示例程序中,咱們有:
在data sink中要保證Exactly-Once語義,它必須將全部的寫入數據經過一個事務提交到Kafka。在兩個checkpoint之間,一個提交綁定了全部要寫入的數據。
這保證了當出錯的時候,寫入的數據能夠被回滾。
然而在分佈式系統中,一般擁有多個並行執行的寫入任務,簡單的提交和回滾是效率低下的。爲了保證一致性,全部的組件必須先達成一致,才能進行提交或者回滾。Flink使用了兩階段提交協議以及預提交階段來解決這個問題。
在checkpoint開始的時候,即兩階段提交中的預提交階段。首先,Flink的JobManager在數據流中注入一個checkpoint屏障(它將數據流中的記錄分割開,一些進入到當前的checkpoint,另外一些進入下一個checkpoint)。
屏障經過operator傳遞。對於每個operator,它將觸發operator的狀態快照寫入到state backend。
data source保存了Kafka的offset,以後把checkpoint屏障傳遞到後續的operator。
這種方式僅適用於operator有他的內部狀態。內部狀態是指,Flink state backends保存和管理的內容-舉例來講,第二個operator中window聚合算出來的sum。當一個進程有它的內部狀態的時候,除了在checkpoint以前將須要將數據更改寫入到state backend,不須要在預提交階段作其餘的動做。在checkpoint成功的時候,Flink會正確的提交這些寫入,在checkpoint失敗的時候會終止提交。
然而,當一個進程有外部狀態的時候,須要用一種不一樣的方式來處理。外部狀態一般由須要寫入的外部系統引入,例如Kafka。所以,爲了提供Exactly-Once保證,外部系統必須提供事務支持,藉此和兩階段提交協議交互。
咱們知道在咱們的例子中,因爲須要將數據寫到Kafka,data sink有外部的狀態。所以,在預提交階段,除了將狀態寫入到state backend以外,data sink必須預提交本身的外部事務。
當checkpoint屏障在全部operator中都傳遞了一遍,以及它觸發的快照寫入完成,預提交階段結束。這個時候,快照成功結束,整個程序的狀態,包括預提交的外部狀態是一致的。萬一出錯的時候,咱們能夠經過checkpoint從新初始化。
下一步是通知全部operator,checkpoint已經成功了。這時兩階段提交中的提交階段,Jobmanager爲程序中的每個operator發起checkpoint已經完成的回調。data source和window operator沒有外部的狀態,在提交階段中,這些operator不會執行任何動做。data sink擁有外部狀態,因此經過事務提交外部寫入。
讓咱們對上述的知識點彙總一下:
所以,咱們能夠肯定全部的operator贊成checkpoint的最終結果:要麼都贊成提交數據,要麼提交被終止而後回滾。
完整的實現兩階段提交協議可能會有一點複雜,所以Flink將通用邏輯提取到一個abstract的類TwoPhaseCommitSinkFunction。
讓咱們經過一個簡單的文件操做例子來講明如何使用TwoPhaseCommitSinkFunction。咱們只須要實現四個method,並使sink呈現Exactly-Once語義。
咱們知道,若是步驟中有任何錯誤,Flink會經過最新的checkpoint來恢復程序狀態。在一個罕見的場景中,預提交成功了,在通知到達operator以前失敗了。這時候,Flink將operator的狀態恢復到預提交階段,即還未真正提交的時候。
爲了能在重啓的時候可以正確的終止或者提交事務,咱們須要在預提交階段將足夠的信息保存到checkpoint中。在這個例子中,這些信息是臨時文件以及目標目錄的地址。
TwoPhaseCommitSinkFunction 已經把這個場景考慮進去了,在從checkpoint恢復的時候,它會優先提交一個commit。咱們的任務是將commit實現成一個冪等的操做。通常的,這不是難題。在這個例子中,咱們能夠發現這種狀況:臨時文件不在臨時目錄中,可是已經移動到目標目錄了。
在TwoPhaseCommitSinkFunction中,有一些其餘的邊界條件也考慮在內了。經過Flink文檔查看更多。
若是你看到了這麼後面,很感謝你通讀這個詳細的帖子。如下是咱們主要覆蓋的關鍵點:
咱們爲這個特性能提供的功能感到很興奮,從此但願能找到更多支持 TwoPhaseCommitSinkFunction的producer。