本文是翻譯做品,做者是Piotr Nowojski和Michael Winters。前者是該方案的實現者。html
原文地址是https://data-artisans.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafkajava
2017年12月Apache Flink社區發佈了1.4版本。該版本正式引入了一個里程碑式的功能:兩階段提交Sink,即TwoPhaseCommitSinkFunction。該SinkFunction提取並封裝了兩階段提交協議中的公共邏輯,自此Flink搭配特定source和sink(特別是0.11版本Kafka)搭建精確一次處理語義(下稱EOS)應用成爲了可能。做爲一個抽象類,TwoPhaseCommitSinkFunction提供了一個抽象層供用戶自行實現特定方法來支持EOS。算法
用戶能夠閱讀Java文檔來學習如何使用TwoPhaseCommitSinkFunction,或者參考Flink官網文檔來了解FlinkKafkaProducer011是如何支持EOS的,由於後者正是基於TwoPhaseCommitSinkFunction實現的。apache
本文將深刻討論一下Flink 1.4這個新特性以及其背後的設計思想。在本文中咱們將:
1. 描述Flink應用中的checkpoint如何幫助確保EOS
2. 展現Flink如何經過兩階段提交協議與source和sink交互以實現端到端的EOS交付保障
3. 給出一個使用TwoPhaseCommitSinkFunction實現EOS的文件Sink實例後端
1、Flink應用的EOS
當談及「EOS」時,咱們真正想表達的是每條輸入消息只會影響最終結果一次!【譯者:影響應用狀態一次,而非被處理一次】即便出現機器故障或軟件崩潰,Flink也要保證不會有數據被重複處理或壓根就沒有被處理從而影響狀態。長久以來Flink一直宣稱支持EOS是指在一個Flink應用內部。在過去的幾年間,Flink開發出了checkpointing機制,而它則是提供這種應用內EOS的基石。參考官網這篇文章來了解checkpointing機制。api
在繼續以前咱們簡要總結一下checkpointing算法,這對於咱們瞭解本文內容相當重要。簡單來講,一個Flink checkpoint是一個一致性快照,它包含:
1. 應用的當前狀態
2. 輸入流位置緩存
Flink會按期地產生checkpoint而且把這些checkpoint寫入到一個持久化存儲上,好比S3或HDFS。這個寫入過程是異步的,這就意味着Flink即便在checkpointing過程當中也是不斷處理輸入數據的。網絡
若是出現機器或軟件故障,Flink應用重啓後會從最新成功完成的checkpoint中恢復——重置應用狀態並回滾狀態到checkpoint中輸入流的正確位置,以後再開始執行數據處理,就好像該故障或崩潰從未發生過通常。併發
在Flink 1.4版本以前,EOS只限於Flink應用內。Flink處理完數據後須要將結果發送到外部系統,這個過程當中Flink並不保證EOS。可是Flink應用一般都須要接入不少下游子系統,而開發人員很但願能在多個系統上維持EOS,即維持端到端的EOS。框架
爲了提供端到端的EOS,EOS必須也要應用於Flink寫入數據的外部系統——故這些外部系統必須提供一種手段容許提交或回滾這些寫入操做,同時還要保證與Flink checkpoint可以協調使用。
在分佈式系統中協調提交和回滾的一個常見方法就是使用兩階段提交協議。下一章節中咱們將討論下Flink的TwoPhaseCommitSinkFunction是如何利用兩階段提交協議來實現EOS的。
2、Flink實現EOS應用
咱們將給出一個實例來幫助瞭解兩階段提交協議以及Flink如何使用它來實現EOS。該實例從Kafka中讀取數據,經處理以後再寫回到Kafka。Kafka是很是受歡迎的消息引擎,而Kafka 0.11.0.0版本正式發佈了對於事務的支持——這是與Kafka交互的Flink應用要實現端到端EOS的必要條件。
固然,Flink支持這種EOS並不僅是限於與Kafka的綁定,你可使用任何source/sink,只要它們提供了必要的協調機制。舉個例子,Pravega是Dell/EMC的一個開源流式存儲系統,Flink搭配它也能夠實現端到端的EOS。
本例中的Flink應用包含如下組件,如上圖所示:
1. 一個source,從Kafka中讀取數據(即KafkaConsumer)
2. 一個時間窗口化的聚會操做
3. 一個sink,將結果寫回到Kafka(即KafkaProducer)
若要sink支持EOS,它必須以事務的方式寫數據到Kafka,這樣當提交事務時兩次checkpoint間的全部寫入操做看成爲一個事務被提交。這確保了出現故障或崩潰時這些寫入操做可以被回滾。
固然了,在一個分佈式且含有多個併發執行sink的應用中,僅僅執行單次提交或回滾是不夠的,由於全部組件都必須對這些提交或回滾達成共識,這樣才能保證獲得一個一致性的結果。Flink使用兩階段提交協議以及預提交(pre-commit)階段來解決這個問題。
方案以下:Flink checkpointing開始時便進入到pre-commit階段。具體來講,一旦checkpoint開始,Flink的JobManager向輸入流中寫入一個checkpoint barrier將流中全部消息分割成屬於本次checkpoint的消息以及屬於下次checkpoint的。barrier也會在操做算子間流轉。對於每一個operator來講,該barrier會觸發operator狀態後端爲該operator狀態打快照。
衆所周知,flink kafka source保存Kafka消費位移,一旦完成位移保存,它會將checkpoint barrier傳給下一個operator。
這個方法對於opeartor只有內部狀態的場景是可行的。所謂的內部狀態就是徹底由Flink狀態保存並管理的——本例中的第二個opeartor:時間窗口上保存的求和數據就是這樣的例子。當只有內部狀態時,pre-commit階段無需執行額外的操做,僅僅是寫入一些已定義的狀態變量便可。當chckpoint成功時Flink負責提交這些寫入,不然就終止取消掉它們。
當時,一旦operator包含外部狀態,事情就不同了。咱們不能像處理內部狀態同樣處理這些外部狀態。由於外部狀態一般都涉及到與外部系統的交互。若是是這樣的話,外部系統必需要支持可與兩階段提交協議捆綁使用的事務才能確保實現總體的EOS。
顯然本例中的data sink是有外部狀態的,由於它須要寫入數據到Kafka。此時的pre-commit階段下data sink在保存狀態到狀態存儲的同時還必須預提交它的外部事務,以下圖所示:
當checkpoint barrier在全部operator都傳遞了一遍且對應的快照也都成功完成以後,這個pre-commit階段纔算完成。該過程當中全部建立的快照都被視爲是checkpoint的一部分。咱們說checkpoint就是整個應用的全局狀態,固然也包含pre-commit階段提交的外部狀態。當出現崩潰時,咱們能夠回滾狀態到最新已成功完成快照時的時間點。
下一步就是通知全部的operator,告訴它們checkpoint已成功完成。這即是兩階段提交協議的第二個階段:commit階段。該階段中JobManager會爲應用中每一個operator發起checkpoint已完成的回調邏輯。
本例中的data source和窗口操做無外部狀態【譯者:該文章中的實例主要基於Kafka producer對於事務和EOS的支持。事實上,Kafka consumer對於EOS的支持是有限的,故做者這裏並無拿consumer的EOS來舉例說明,故這裏的data source沒有外部狀態】,所以在該階段,這兩個opeartor無需執行任何邏輯,可是data sink是有外部狀態的,所以此時咱們必須提交外部事務,以下圖所示:
彙總以上全部信息,總結一下:
1. 一旦全部operator完成各自的pre-commit,它們會發起一個commit操做
2. 假若有一個pre-commit失敗,全部其餘的pre-commit必須被終止,而且Flink會回滾到最近成功完成decheckpoint
3. 一旦pre-commit完成,必需要確保commit也要成功——operator和外部系統都須要對此進行保證。假若commit失敗(好比網絡故障等),Flink應用就會崩潰,而後根據用戶重啓策略執行重啓邏輯,以後再次重試commit。這個過程相當重要,由於假若commit沒法順利執行,就可能出現數據丟失的狀況
所以,全部opeartor必須對checkpoint最終結果達成共識:即全部operator都必須認定數據提交要麼成功執行,要麼被終止而後回滾。
3、Flink中實現兩階段提交operator
這種operator的管理有些複雜,這也是爲何Flink提取了公共邏輯並封裝進TwoPhaseCommitSinkFunction抽象類的緣由。
下面討論一下如何擴展TwoPhaseCommitSinkFunction類來實現一個簡單的基於文件的sink。若要實現支持EOS的文件sink,咱們須要實現如下4個方法:
1. beginTransaction:開啓一個事務,在臨時目錄下建立一個臨時文件,以後,寫入數據到該文件中
2. preCommit:在pre-commit階段,flush緩存數據塊到磁盤,而後關閉該文件,確保再不寫入新數據到該文件。同時開啓一個新事務執行屬於下一個checkpoint的寫入操做
3. commit:在commit階段,咱們以原子性的方式將上一階段的文件寫入真正的文件目錄下。注意:這會增長輸出數據可見性的延時。通俗說就是用戶想要看到最終數據須要等會,不是實時的。
4. abort:一旦終止事務,咱們離本身刪除臨時文件
當出現崩潰時,Flink會恢復最新已完成快照中應用狀態。須要注意的是在某些極偶然的場景下,pre-commit階段已成功完成而commit還沒有開始(也就是operator還沒有來得及被告知要開啓commit),此時假若發生崩潰Flink會將opeartor狀態恢復到已完成pre-commit但還沒有commit的狀態。
在一個checkpoint狀態中,對於已完成pre-commit的事務狀態,咱們必須保存足夠多的信息,這樣才能確保在重啓後要麼從新發起commit亦或是終止掉事務。本例中這部分信息就是臨時文件所在的路徑以及目標目錄。
TwoPhaseCommitSinkFunction考慮了這種場景,所以當應用從checkpoint恢復以後TwoPhaseCommitSinkFunction老是會發起一個搶佔式的commit。這種commit必須是冪等性的,雖然大部分狀況下這都不是問題。本例中對應的這種場景就是:臨時文件不在臨時目錄下,而是已經被移動到目標目錄下。
4、總結
本文的一些關鍵要點:
1. Flinkcheckpointing機制是實現兩階段提交協議以及提供EOS的基石
2. 與其餘系統持久化傳輸中的數據不一樣,Flink不須要將計算的每一個階段寫入到磁盤中——而這是不少批處理應用的方式
3. Flink新的TwoPhaseCommitSinkFunction封裝兩階段提交協議的公共邏輯使之搭配支持事務的外部系統來共同構建EOS應用成爲可能
4. 自1.4版本起,Flink + Pravega和Kafka 0.11 producer開始支持EOS
5. Flink Kafka 0.11 producer基於TwoPhaseCommitSinkFunction實現,比起至少一次語義的producer而言開銷並未顯著增長
【譯者:我愈來愈以爲全部的流式處理框架在設計上甚至是編碼上都很類似了。。。Kafka對於事務的支持也是相似的方式,並且也有諸如beginTransaction, preCommit等字眼。另外不少Flink和Kafka源碼中的方法名稱都是相似的。。。。】