Flink Kafa Connector是Flink內置的Kafka鏈接器,它包含了從Kafka Topic讀入數據的Flink Kafka Consumer
以及向Kafka Topic寫出數據的Flink Kafka Producer
,除此以外Flink Kafa Connector基於Flink Checkpoint機制提供了完善的容錯能力。本文從Flink Kafka Connector的基本使用到Kafka在Flink中端到端的容錯原理展開討論。html
在Flink中使用Kafka Connector時須要依賴Kafka的版本,Flink針對不一樣的Kafka版本提供了對應的Connector實現。java
既然Flink對不一樣版本的Kafka有不一樣實現,在使用時須要注意區分,根據使用環境引入正確的依賴關係。算法
1<dependency>
2 <groupId>org.apache.flink</groupId>
3 <artifactId>${flink_kafka_connector_version}</artifactId>
4 <version>${flink_version}</version>
5</dependency>複製代碼
在上面的依賴配置中${flink_version}指使用Flink的版本,${flink_connector_kafka_version}
指依賴的Kafka connector版本對應的artifactId。下表描述了截止目前爲止Kafka服務版本與Flink Connector之間的對應關係。
Flink官網內容Apache Kafka Connector(ci.apache.org/projects/fl…)中也有詳細的說明。apache
從Flink 1.7版本開始爲Kafka 1.0.0及以上版本提供了全新的的Kafka Connector支持,若是使用的Kafka版本在1.0.0及以上能夠忽略因Kafka版本差別帶來的依賴變化。bootstrap
明確了使用的Kafka版本後就能夠編寫一個基於Flink Kafka讀/寫的應用程序「本文討論內容所有基於Flink 1.7版本和Kafka 1.1.0版本」。根據上面描述的對應關係在工程中添加Kafka Connector依賴。
後端
1<dependency>
2 <groupId>org.apache.flink</groupId>
3 <artifactId>flink-connector-kafka_2.11</artifactId>
4 <version>1.7.0</version>
5</dependency>複製代碼
下面的代碼片斷是從Kafka Topic「flink_kafka_poc_input」中消費數據,再寫入Kafka Topic「flink_kafka_poc_output」的簡單示例。示例中除了讀/寫Kafka Topic外,沒有作其餘的邏輯處理。bash
1public static void main(String[] args) {
2 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
3
4 /** 初始化Consumer配置 */
5 Properties consumerConfig = new Properties();
6 consumerConfig.setProperty("bootstrap.servers", "127.0.0.1:9091");
7 consumerConfig.setProperty("group.id", "flink_poc_k110_consumer");
8
9 /** 初始化Kafka Consumer */
10 FlinkKafkaConsumer<String> flinkKafkaConsumer =
11 new FlinkKafkaConsumer<String>(
12 "flink_kafka_poc_input",
13 new SimpleStringSchema(),
14 consumerConfig
15 );
16 /** 將Kafka Consumer加入到流處理 */
17 DataStream<String> stream = env.addSource(flinkKafkaConsumer);
18
19 /** 初始化Producer配置 */
20 Properties producerConfig = new Properties();
21 producerConfig.setProperty("bootstrap.servers", "127.0.0.1:9091");
22
23 /** 初始化Kafka Producer */
24 FlinkKafkaProducer<String> myProducer =
25 new FlinkKafkaProducer<String>(
26 "flink_kafka_poc_output",
27 new MapSerialization(),
28 producerConfig
29 );
30 /** 將Kafka Producer加入到流處理 */
31 stream.addSink(myProducer);
32
33 /** 執行 */
34 env.execute();
35}
36
37class MapSerialization implements SerializationSchema<String> {
38 public byte[] serialize(String element) {
39 return element.getBytes();
40 }
41}
複製代碼
Flink API使用起來確實很是簡單,調用addSource
方法和addSink
方法就能夠將初始化好的FlinkKafkaConsumer
和FlinkKafkaProducer
加入到流處理中。execute
執行後,KafkaConsumer和KafkaProducer就能夠開始正常工做了。網絡
衆所周知,Flink支持Exactly-once semantics。什麼意思呢?翻譯過來就是「剛好一次語義」。流處理系統中,數據源源不斷的流入到系統、被處理、最後輸出結果。咱們都不但願系統因人爲或外部因素產生任何意想不到的結果。對於Exactly-once語義達到的目的是指即便系統被人爲中止、因故障shutdown、無端關機等任何因素中止運行狀態時,對於系統中的每條數據不會被重複處理也不會少處理。
數據結構
Flink宣稱支持Exactly-once其針對的是Flink應用內部的數據流處理。但Flink應用內部要想處理數據首先要有數據流入到Flink應用,其次Flink應用對數據處理完畢後也理應對數據作後續的輸出。在Flink中數據的流入稱爲Source,數據的後續輸出稱爲Sink,對於Source和Sink徹底依靠外部系統支撐(好比Kafka)。分佈式
Flink自身是沒法保證外部系統的Exactly-once語義。但這樣一來其實並不能稱爲完整的Exactly-once,或者說Flink並不能保證端到端Exactly-once。而對於數據精準性要求極高的系統必需要保證端到端的Exactly-once,所謂端到端是指Flink應用從Source一端開始到Sink一端結束,數據必經的起始和結束兩個端點。
那麼如何實現端到端的Exactly-once呢?Flink應用所依賴的外部系統須要提供Exactly-once支撐,並結合Flink提供的Checkpoint機制和Two Phase Commit才能實現Flink端到端的Exactly-once。對於Source和Sink的容錯保障,Flink官方給出了具體說明:
Fault Tolerance Guarantees of Data Sources and Sinks(ci.apache.org/projects/fl…)
在討論基於Kafka端到端的Exactly-once以前先簡單瞭解一下Flink Checkpoint,詳細內容在《Flink Checkpoint原理》中有作討論。Flink Checkpoint是Flink用來實現應用一致性快照的核心機制,當Flink因故障或其餘緣由重啓後能夠經過最後一次成功的Checkpoint將應用恢復到當時的狀態。若是在應用中啓用了Checkpoint,會由JobManager按指定時間間隔觸發Checkpoint,Flink應用內全部帶狀態的Operator會處理每一輪Checkpoint生命週期內的幾個狀態。
CheckpointedFunction
接口定義。Task啓動時獲取應用中全部實現了CheckpointedFunction
的Operator,並觸發執行initializeState
方法。在方法的實現中通常都是從狀態後端將快照狀態恢復。CheckpointedFunction
接口定義。JobManager會按期發起Checkpoint,Task接收到Checkpoint後獲取應用中全部實現了CheckpointedFunction
的Operator並觸發執行對應的snapshotState
方法。1public interface CheckpointedFunction {
2 void snapshotState(FunctionSnapshotContext context) throws Exception;
3 void initializeState(FunctionInitializationContext context) throws Exception;
4}複製代碼
CheckpointListener
接口定義。當基於同一個輪次(checkpointId相同)的Checkpoint快照所有處理成功後獲取應用中全部實現了CheckpointListener
的Operator並觸發執行notifyCheckpointComplete
方法。觸發notifyCheckpointComplete
方法時攜帶的checkpointId參數用來告訴Operator哪一輪Checkpoint已經完成。1public interface CheckpointListener {
2 void notifyCheckpointComplete(long checkpointId) throws Exception;
3}複製代碼
Kafka是很是收歡迎的分佈式消息系統,在Flink中它能夠做爲Source,同時也能夠做爲Sink。Kafka 0.11.0及以上版本提供了對事務的支持,這讓Flink應用搭載Kafka實現端到端的exactly-once成爲了可能。下面咱們就來深刻了解提供了事務支持的Kafka是如何與Flink結合實現端到端exactly-once的。
本文忽略了Barrier機制,因此示例和圖中都以單線程爲例。Barrier在《Flink Checkpoint原理》有較多討論。
Flink Kafka Consumer
Kafka自身提供了可重複消費消息的能力,Flink結合Kafka的這個特性以及自身Checkpoint機制,得以實現Flink Kafka Consumer的容錯。
Flink Kafka Consumer是Flink應用從Kafka獲取數據流消息的一個實現。除了數據流獲取、數據發送下游算子這些基本功能外它還提供了完善的容錯機制。這些特性依賴了其內部的一些組件以及內置的數據結構協同處理完成。這裏,咱們先簡單瞭解這些組件和內置數據結構的職責,再結合Flink 運行時 和 故障恢復時 兩個不一樣的處理時機來看一看它們之間是如何協同工做的。
AbstractPartitionDiscoverer
負責得到指定topic的元數據信息,並將獲取到的topic元數據信息封裝成KafkaTopicPartition
集合。String topic
和int partition
兩個主要屬性。假設topic A有2個分區,經過組件AbstractPartitionDiscoverer
處理後將獲得由兩個KafkaTopicPartition
對象組成的集合:KafkaTopicPartition(topic:A, partition:0)
和KafkaTopicPartition(topic:A, partition:1)
AbstractFetcher
組件負責完成這部分功能。除此以外Fetcher還負責offset的提交、KafkaTopicPartitionState
結構的數據維護。KafkaTopicPartitionState
是一個很是核心的數據結構,基於內部的4個基本屬性,Flink Kafka Consumer維護了topic、partition、已消費offset、待提交offset的關聯關係。Flink Kafka Consumer的容錯機制依賴了這些數據。KafkaTopicPartitionState
還有兩個子類,一個是支持PunctuatedWatermark
的實現,另外一個是支持PeriodicWatermark
的實現,這兩個子類在原有基礎上擴展了對水印的支持,咱們這裏不作過多討論。ListState<Tuple2<KafkaTopicPartition, Long>>
定義了狀態存儲結構,在這裏Long表示的是offset類型,因此實際上就是使用KafkaTopicPartition
和offset組成了一個對兒,再添加到狀態後端集合。KafkaTopicPartition
,value是對應已消費的offset。恢復成功後,應用恢復到故障前Flink Kafka Consumer消費的offset,並繼續執行任務,就好像什麼都沒發生同樣。咱們假設Flink應用正常運行,Flink Kafka Consumer消費topic爲Topic-A
,Topic-A
只有一個partition。在運行期間,主要作了這麼幾件事
接下來咱們再結合消息真正開始處理後,KafkaTopicPartitionState結構中的數據變化。
能夠看到,隨着應用的運行,KafkaTopicPartitionState
中的offset屬性值發生了變化,它記錄了已經發送到下游算子消息在Kafka中的offset。在這裏因爲消息P0-C
已經發送到下游算子,因此KafkaTopicPartitionState.offset
變動爲2。
FlinkKafkaConsumer
實現了CheckpointedFunction
,因此它具有快照狀態(snapshotState)的能力。在實現中,snapshotState具體幹了這麼兩件事下圖描述當一輪Checkpoint開始時FlinkKafkaConsumer
的處理過程。在例子中,FlinkKafkaConsumer已經將offset=3的P0-D
消息發送到下游,當checkpoint觸發時將topic=Topic-A;partition=0;offset=3做爲最後的狀態持久化到外部存儲。
待提交offset
的Map集合,其中key是CheckpointId。FlinkKafkaConsumer
當前運行狀態持久化,即將topic、partition、offset持久化。一旦出現故障,就能夠根據最新持久化的快照進行恢復。下圖描述當一輪Checkpoint開始時FlinkKafkaConsumer
的處理過程。在例子中,FlinkKafkaConsumer已經將offset=3的P0-D
消息發送到下游,當checkpoint觸發時將topic=Topic-A;partition=0;offset=3做爲最後的狀態持久化到外部存儲。
CheckpointListener.notifyCheckpointComplete(checkpointId)
通知算子Checkpoint完成,參數checkpointId指明瞭本次通知是基於哪一輪Checkpoint。在FlinkKafkaConsumer
的實現中,接到Checkpoint完成通知後會變動KafkaTopicPartitionState.commitedOffset
屬性值。最後再將變動後的commitedOffset提交到Kafka brokers或Zookeeper。topic=Topic-A;partition=0;offset=3
的狀態作了快照,在真正提交offset時是將快照的offset + 1
做爲結果提交的。「源代碼KafkaFetcher.java 207行
doCommitInternalOffsetsToKafka方法」Flink應用崩潰後,開始進入恢復模式。假設Flink Kafka Consumer最後一次成功的快照狀態是topic=Topic-A;partition=0;offset=3
,在恢復期間按照下面的前後順序執行處理。
CheckpointedFunction.initializeState
接口定義。在FlinkKafkaConsumer
的實現中,從狀態後端得到快照並寫入到內部存儲結構TreeMap,其中key是由KafkaTopicPartition
表示的topic與partition,value爲offset。下圖描述的是故障恢復的第一個階段,從狀態後端得到快照,並恢復到內部存儲。待消費信息
。若是應用須要從快照恢復狀態,則從待恢復狀態
中初始化這個Map結構。下圖是該階段從快照恢復的處理過程。function初始化階段兼容了正常啓動和狀態恢復時offset的初始化。對於正常啓動過程,StartupMode
的設置決定待消費信息
中的結果。該模式共有5種,默認爲StartupMode.GROUP_OFFSETS
。
FlinkKafkaConsumer
運行起來,下圖描述了這個階段的處理流程這裏對圖中兩個步驟作個描述
topic=Topic-A;partition=0;offset=3
初始化Flink Kafka Consumer
內部維護的Kafka處理狀態。由於是恢復流程,因此這個內部維護的處理狀態也應該隨着快照恢復。狀態後端offset + 1
。在例子中,消費Kafka數據前將offset重置爲4,因此狀態恢復後KafkaConsumer是從offset=4位置開始消費。「源代碼KafkaConsumerThread.java 428行
」總結
上述的3個步驟是恢復期間主要的處理流程,一旦恢復邏輯執行成功,後續處理流程與正常運行期間一致。最後對FlinkKafkaConsumer用一句話作個總結。
「將offset提交權交給FlinkKafkaConsumer,其內部維護Kafka消費及提交的狀態。基於Kafka可重複消費能力並配合Checkpoint機制和狀態後端存儲能力,就能實現FlinkKafkaConsumer容錯性,即Source端的Exactly-once語義」。
Flink Kafka Producer
Flink Kafka Producer是Flink應用向Kafka寫出數據的一個實現。在Kafka 0.11.0及以上版本中提供了事務支持,這讓Flink搭載Kafka的事務特性能夠輕鬆實現Sink端的Exactly-once語義。關於Kafka事務特性在《Kafka冪等與事務》中作了詳細討論。
在Flink Kafka Producer中,有一個很是重要的組件FlinkKafkaInternalProducer
,這個組件代理了Kafka客戶端org.apache.kafka.clients.producer.KafkaProducer
,它爲Flink Kafka Producer操做Kafka提供了強有力的支撐。在這個組件內部,除了代理方法外,還提供了一些關鍵操做。我的認爲,Flink Kafka Sink可以實現Exactly-once語義除了須要Kafka支持事務特性外,同時也離不開FlinkKafkaInternalProducer
組件提供的支持,尤爲是下面這些關鍵操做:
FlinkKafkaInternalProducer
組件中最關鍵的處理當屬事務重置,事務重置由resumeTransaction方法實現「源代碼FlinkKafkaInternalProducer.java
144行」。因爲Kafka客戶端未暴露針對事務操做的API,因此在這個方法內部,大量的使用了反射。方法中使用反射得到KafkaProducer依賴的transactionManager對象,並將狀態後端快照的屬性值恢復到transactionManager對象中,這樣以達到讓Flink Kafka Producer應用恢復到重啓前的狀態。下面咱們結合Flink 運行時 和 故障恢復 兩個不一樣的處理時機來了解Flink Kafka Producer內部如何工做。
運行時
咱們假設Flink應用正常運行,Flink Kafka Producer正常接收上游數據並寫到Topic-B的Topic中,Topic-B只有一個partition。在運行期間,主要作如下幾件事:
FlinkKafkaProducer
,FlinkKafkaProducer
接到數據後封裝ProducerRecord
對象並調用Kafka客戶端KafkaProducer.send
方法將ProducerRecord
對象寫入緩衝「源代碼FlinkKafkaProducer.java 616行」。下圖是該階段的描述:FlinkKafkaProducer
做爲Kafka Sink,它繼承抽象類TwoPhaseCommitSinkFunction
,根據名字就能知道,這個抽象類主要實現兩階段提交
。爲了集成Flink Checkpoint機制,抽象類實現了CheckpointedFunction
和CheckpointListener
,所以它具有快照狀態(snapshotState)能力。狀態快照處理具體作了下面三件事:isolation.level=read_committed
,那麼此時這個消費端還沒法poll到flush的數據,由於這些數據還沒有commit。何時commit呢?在快照結束處理
階段進行commit,後面會提到。TransactionHolder
表示的事務對象。TransactionHolder
對象內部記錄了transactionalId、producerId、epoch以及Kafka客戶端kafkaProducer的引用。下圖是狀態快照處理階段處理過程
TwoPhaseCommitSinkFunction
實現了CheckpointListener
,應用中全部算子的快照處理成功後會收到基於某輪Checkpoint完成的通知。當FlinkKafkaProducer
收到通知後,主要任務就是提交上一階段產生的事務,而具體要提交哪些事務是從上一階段生成的待提交事務集合中獲取的。圖中第4步執行成功後,flush到Kafka的數據從UNCOMMITTED變動爲COMMITTED,這意味着此時消費端能夠poll到這批數據了。
2PC(兩階段提交)理論的兩個階段分別對應了FlinkKafkaProducer的狀態快照處理
階段和快照結束處理
階段,前者是經過Kafka的事務初始化、事務開啓、flush等操做預提交事務,後者是經過Kafka的commit操做真正執行事務提交。
Flink應用崩潰後,FlinkKafkaProducer
開始進入恢復模式。下圖爲應用崩潰前的狀態描述:
在恢復期間主要的處理在狀態初始化階段。當Flink任務重啓時會觸發狀態初始化,此時應用與Kafka已經斷開了鏈接。但在運行期間可能存在數據flush還沒有提交的狀況。
若是想從新提交這些數據須要從狀態後端恢復當時KafkaProducer持有的事務對象,具體一點就是恢復當時事務的transactionalId、producerId、epoch。這個時候就用到了FlinkKafkaInternalProducer
組件中的事務重置,在狀態初始化時從狀態後端得到這些事務信息,並重置到當前KafkaProducer中,再執行commit操做。這樣就能夠恢復任務重啓前的狀態,Topic-B的消費端依然能夠poll到應用恢復後提交的數據。
須要注意的是:若是這個重置並提交的動做失敗了,可能會形成數據丟失。
下圖描述的是狀態初始化階段的處理流程:
總結
FlinkKafkaProducer
故障恢復期間,狀態初始化是比較重要的處理階段。這個階段在Kafka事務特性的強有力支撐下,實現了事務狀態的恢復,而且使得狀態存儲佔用空間最小。依賴Flink提供的TwoPhaseCommitSinkFunction
實現類,咱們本身也能夠對Sink作更多的擴展。
本文做者:TalkingData 史天舒 封面來源於網絡,若有侵權請聯繫刪除