Apache Flink 自2017年12月發佈的1.4.0版本開始,爲流計算引入了一個重要的里程碑特性:TwoPhaseCommitSinkFunction(相關的Jira)。它提取了兩階段提交協議的通用邏輯,使得經過Flink來構建端到端的Exactly-Once程序成爲可能。同時支持一些數據源(source)和輸出端(sink),包括Apache Kafka 0.11及更高版本。它提供了一個抽象層,用戶只須要實現少數方法就能實現端到端的Exactly-Once語義。apache
有關TwoPhaseCommitSinkFunction的使用詳見文檔: TwoPhaseCommitSinkFunction。或者能夠直接閱讀Kafka 0.11 sink的文檔: kafka。網絡
接下來會詳細分析這個新功能以及Flink的實現邏輯,分爲以下幾點。架構
當咱們說『Exactly-Once』時,指的是每一個輸入的事件隻影響最終結果一次。即便機器或軟件出現故障,既沒有重複數據,也不會丟數據。併發
Flink好久以前就提供了Exactly-Once語義。在過去幾年中,咱們對Flink的checkpoint機制有過深刻的描述,這是Flink有能力提供Exactly-Once語義的核心。Flink文檔還提供了該功能的全面概述。異步
在繼續以前,先看下對checkpoint機制的簡要介紹,這對理解後面的主題相當重要。分佈式
Flink能夠配置一個固定的時間點,按期產生checkpoint,將checkpoint的數據寫入持久存儲系統,例如S3或HDFS。將checkpoint數據寫入持久存儲是異步發生的,這意味着Flink應用程序在checkpoint過程當中能夠繼續處理數據。函數
若是發生機器或軟件故障,從新啓動後,Flink應用程序將從最新的checkpoint點恢復處理; Flink會恢復應用程序狀態,將輸入流回滾到上次checkpoint保存的位置,而後從新開始運行。這意味着Flink能夠像從未發生過故障同樣計算結果。url
在Flink 1.4.0以前,Exactly-Once語義僅限於Flink應用程序內部,並無擴展到Flink數據處理完後發送的大多數外部系統。Flink應用程序與各類數據輸出端進行交互,開發人員須要有能力本身維護組件的上下文來保證Exactly-Once語義。spa
爲了提供端到端的Exactly-Once語義 – 也就是說,除了Flink應用程序內部,Flink寫入的外部系統也須要能知足Exactly-Once語義 – 這些外部系統必須提供提交或回滾的方法,而後經過Flink的checkpoint機制來協調。翻譯
分佈式系統中,協調提交和回滾的經常使用方法是兩階段提交協議。在下一節中,咱們將討論Flink的TwoPhaseCommitSinkFunction是如何利用兩階段提交協議來提供端到端的Exactly-Once語義。
咱們將介紹兩階段提交協議,以及它如何在一個讀寫Kafka的Flink程序中實現端到端的Exactly-Once語義。Kafka是一個流行的消息中間件,常常與Flink一塊兒使用。Kafka在最近的0.11版本中添加了對事務的支持。這意味着如今經過Flink讀寫Kafaka,並提供端到端的Exactly-Once語義有了必要的支持。
Flink對端到端的Exactly-Once語義的支持不只侷限於Kafka,您能夠將它與任何一個提供了必要的協調機制的源/輸出端一塊兒使用。例如Pravega,來自DELL/EMC的開源流媒體存儲系統,經過Flink的TwoPhaseCommitSinkFunction也能支持端到端的Exactly-Once語義。
在今天討論的這個示例程序中,咱們有:
要使數據輸出端提供Exactly-Once保證,它必須將全部數據經過一個事務提交給Kafka。提交捆綁了兩個checkpoint之間的全部要寫入的數據。這可確保在發生故障時能回滾寫入的數據。可是在分佈式系統中,一般會有多個併發運行的寫入任務的,簡單的提交或回滾是不夠的,由於全部組件必須在提交或回滾時「一致」才能確保一致的結果。Flink使用兩階段提交協議及預提交階段來解決這個問題。
在checkpoint開始的時候,即兩階段提交協議的「預提交」階段。當checkpoint開始時,Flink的JobManager會將checkpoint barrier(將數據流中的記錄分爲進入當前checkpoint與進入下一個checkpoint)注入數據流。
brarrier在operator之間傳遞。對於每個operator,它觸發operator的狀態快照寫入到state backend。
數據源保存了消費Kafka的偏移量(offset),以後將checkpoint barrier傳遞給下一個operator。
這種方式僅適用於operator具備『內部』狀態。所謂內部狀態,是指Flink state backend保存和管理的 -例如,第二個operator中window聚合算出來的sum值。當一個進程有它的內部狀態的時候,除了在checkpoint以前須要將數據變動寫入到state backend,不須要在預提交階段執行任何其餘操做。Flink負責在checkpoint成功的狀況下正確提交這些寫入,或者在出現故障時停止這些寫入。
可是,當進程具備『外部』狀態時,須要做些額外的處理。外部狀態一般以寫入外部系統(如Kafka)的形式出現。在這種狀況下,爲了提供Exactly-Once保證,外部系統必須支持事務,這樣才能和兩階段提交協議集成。
在本文示例中的數據須要寫入Kafka,所以數據輸出端(Data Sink)有外部狀態。在這種狀況下,在預提交階段,除了將其狀態寫入state backend以外,數據輸出端還必須預先提交其外部事務。
當checkpoint barrier在全部operator都傳遞了一遍,而且觸發的checkpoint回調成功完成時,預提交階段就結束了。全部觸發的狀態快照都被視爲該checkpoint的一部分。checkpoint是整個應用程序狀態的快照,包括預先提交的外部狀態。若是發生故障,咱們能夠回滾到上次成功完成快照的時間點。
下一步是通知全部operator,checkpoint已經成功了。這是兩階段提交協議的提交階段,JobManager爲應用程序中的每一個operator發出checkpoint已完成的回調。
數據源和 widnow operator沒有外部狀態,所以在提交階段,這些operator沒必要執行任何操做。可是,數據輸出端(Data Sink)擁有外部狀態,此時應該提交外部事務。
咱們對上述知識點總結下:
所以,咱們能夠肯定全部operator都贊成checkpoint的最終結果:全部operator都贊成數據已提交,或提交被停止並回滾。
完整的實現兩階段提交協議可能有點複雜,這就是爲何Flink將它的通用邏輯提取到抽象類TwoPhaseCommitSinkFunction中的緣由。
接下來基於輸出到文件的簡單示例,說明如何使用TwoPhaseCommitSinkFunction。用戶只須要實現四個函數,就能爲數據輸出端實現Exactly-Once語義:
咱們知道,若是發生任何故障,Flink會將應用程序的狀態恢復到最新的一次checkpoint點。一種極端的狀況是,預提交成功了,但在此次commit的通知到達operator以前發生了故障。在這種狀況下,Flink會將operator的狀態恢復到已經預提交,但還沒有真正提交的狀態。
咱們須要在預提交階段保存足夠多的信息到checkpoint狀態中,以便在重啓後能正確的停止或提交事務。在這個例子中,這些信息是臨時文件和目標目錄的路徑。
TwoPhaseCommitSinkFunction已經把這種狀況考慮在內了,而且在從checkpoint點恢復狀態時,會優先發出一個commit。咱們須要以冪等方式實現提交,通常來講,這並不難。在這個示例中,咱們能夠識別出這樣的狀況:臨時文件不在臨時目錄中,但已經移動到目標目錄了。
在TwoPhaseCommitSinkFunction中,還有一些其餘邊界狀況也會考慮在內,請參考Flink文檔瞭解更多信息。
總結下本文涉及的一些要點:
這是個使人興奮的功能,期待Flink TwoPhaseCommitSinkFunction在將來支持更多的數據接收端。
via: https://www.ververica.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafka做者:Piotr Nowojski
翻譯| 周凱波
周凱波,阿里巴巴技術專家,四川大學碩士,2010年畢業後加入阿里搜索事業部,從事搜索離線平臺的研發工做,參與將搜索後臺數據處理架構從MapReduce到Flink的重構。目前在阿里計算平臺事業部,專一於基於Flink的一站式計算平臺的建設。
本文爲雲棲社區原創內容,未經容許不得轉載。