Flink Kafka Connector 是 Flink 內置的 Kafka 鏈接器,它包含了從 Kafka Topic 讀入數據的 Flink Kafka Consumer 以及向 Kafka Topic 寫出數據的 Flink Kafka Producer,除此以外 Flink Kafa Connector 基於 Flink Checkpoint 機制提供了完善的容錯能力。本文從 Flink Kafka Connector 的基本使用到 Kafka 在 Flink 中端到端的容錯原理展開討論。
1.Flink Kafka 的使用
在 Flink 中使用 Kafka Connector 時須要依賴 Kafka 的版本,Flink 針對不一樣的 Kafka 版本提供了對應的 Connector 實現。
1.1 版本依賴
既然 Flink 對不一樣版本的 Kafka 有不一樣實現,在使用時須要注意區分,根據使用環境引入正確的依賴關係。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>${flink_kafka_connector_version}</artifactId>
<version>${flink_version}</version>
</dependency>複製代碼
在上面的依賴配置中 ${flink_version} 指使用 Flink 的版本,${flink_connector_kafka_version} 指依賴的 Kafka connector 版本對應的 artifactId。下表描述了截止目前爲止 Kafka 服務版本與 Flink Connector 之間的對應關係。
Flink 官網內容 Apache Kafka Connector 中也有詳細的說明。
從 Flink 1.7 版本開始爲 Kafka 1.0.0 及以上版本提供了全新的 Kafka Connector 支持,若是使用的 Kafka 版本在 1.0.0 及以上能夠忽略因 Kafka 版本差別帶來的依賴變化。
1.2 基本使用
明確了使用的 Kafka 版本後就能夠編寫一個基於 Flink Kafka 讀/寫的應用程序「本文討論內容所有基於 Flink 1.7 版本和 Kafka 1.1.0 版本」。根據上面描述的對應關係在工程中添加 Kafka Connector 依賴。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.7.0</version>
</dependency>複製代碼
下面的代碼片斷是從 Kafka Topic「flink_kafka_poc_input」中消費數據,再寫入 Kafka Topic「flink_kafka_poc_output」的簡單示例。示例中除了讀/寫 Kafka Topic 外,沒有作其餘的邏輯處理。
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/** 初始化 Consumer 配置 */
Properties consumerConfig = new Properties();
consumerConfig.setProperty("bootstrap.servers", "127.0.0.1:9091");
consumerConfig.setProperty("group.id", "flink_poc_k110_consumer");
/** 初始化 Kafka Consumer */
FlinkKafkaConsumer<String> flinkKafkaConsumer =
new FlinkKafkaConsumer<String>(
"flink_kafka_poc_input",
new SimpleStringSchema(),
consumerConfig
);
/** 將 Kafka Consumer 加入到流處理 */
DataStream<String> stream = env.addSource(flinkKafkaConsumer);
/** 初始化 Producer 配置 */
Properties producerConfig = new Properties();
producerConfig.setProperty("bootstrap.servers", "127.0.0.1:9091");
/** 初始化 Kafka Producer */
FlinkKafkaProducer<String> myProducer =
new FlinkKafkaProducer<String>(
"flink_kafka_poc_output",
new MapSerialization(),
producerConfig
);
/** 將 Kafka Producer 加入到流處理 */
stream.addSink(myProducer);
/** 執行 */
env.execute();
}
class MapSerialization implements SerializationSchema<String> {
public byte[] serialize(String element) {
return element.getBytes();
}
}複製代碼
Flink API 使用起來確實很是簡單,調用 addSource 方法和 addSink 方法就能夠將初始化好的 FlinkKafkaConsumer 和 FlinkKafkaProducer 加入到流處理中。execute 執行後,KafkaConsumer 和 KafkaProducer 就能夠開始正常工做了。
2.Flink Kafka 的容錯
衆所周知,Flink 支持 Exactly-once semantics。什麼意思呢?翻譯過來就是「剛好一次語義」。流處理系統中,數據源源不斷的流入到系統、被處理、最後輸出結果。咱們都不但願系統因人爲或外部因素產生任何意想不到的結果。對於 Exactly-once 語義達到的目的是指即便系統被人爲中止、因故障 shutdown、無端關機等任何因素中止運行狀態時,對於系統中的每條數據不會被重複處理也不會少處理。
2.1 Flink Exactly-once
Flink 宣稱支持 Exactly-once 其針對的是 Flink 應用內部的數據流處理。但 Flink 應用內部要想處理數據首先要有數據流入到 Flink 應用,其次 Flink 應用對數據處理完畢後也理應對數據作後續的輸出。在 Flink 中數據的流入稱爲 Source,數據的後續輸出稱爲 Sink,對於 Source 和 Sink 徹底依靠外部系統支撐(好比 Kafka)。
Flink 自身是沒法保證外部系統的 Exactly-once 語義。但這樣一來其實並不能稱爲完整的 Exactly-once,或者說 Flink 並不能保證端到端 Exactly-once。而對於數據精準性要求極高的系統必需要保證端到端的 Exactly-once,所謂端到端是指 Flink 應用從 Source 一端開始到 Sink 一端結束,數據必經的起始和結束兩個端點。
那麼如何實現端到端的 Exactly-once 呢?Flink 應用所依賴的外部系統須要提供 Exactly-once 支撐,並結合 Flink 提供的 Checkpoint 機制和 Two Phase Commit 才能實現 Flink 端到端的 Exactly-once。對於 Source 和 Sink 的容錯保障,Flink 官方給出了具體說明:
2.2 Flink Checkpoint
在討論基於 Kafka 端到端的 Exactly-once 以前先簡單瞭解一下 Flink Checkpoint。Flink Checkpoint 是 Flink 用來實現應用一致性快照的核心機制,當 Flink 因故障或其餘緣由重啓後能夠經過最後一次成功的 Checkpoint 將應用恢復到當時的狀態。若是在應用中啓用了 Checkpoint,會由 JobManager 按指定時間間隔觸發 Checkpoint,Flink 應用內全部帶狀態的 Operator 會處理每一輪 Checkpoint 生命週期內的幾個狀態。
由 CheckpointedFunction 接口定義。Task 啓動時獲取應用中全部實現了CheckpointedFunction 的 Operator,並觸發執行 initializeState 方法。在方法的實現中通常都是從狀態後端將快照狀態恢復。
由 CheckpointedFunction 接口定義。JobManager 會按期發起 Checkpoint,Task 接收到 Checkpoint 後獲取應用中全部實現了 CheckpointedFunction 的 Operator 並觸發執行對應的 snapshotState 方法。
JobManager 每發起一輪 Checkpoint 都會攜帶一個自增的 checkpointId,這個 checkpointId 表明了快照的輪次。
public interface CheckpointedFunction {
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
}複製代碼
由 CheckpointListener 接口定義。當基於同一個輪次(checkpointId 相同)的Checkpoint 快照所有處理成功後獲取應用中全部實現了 CheckpointListener 的 Operator 並觸發執行 notifyCheckpointComplete 方法。觸發 notifyCheckpointComplete 方法時攜帶的 checkpointId 參數用來告訴 Operator 哪一輪 Checkpoint 已經完成。
public interface CheckpointListener {
void notifyCheckpointComplete(long checkpointId) throws Exception;
}複製代碼
3. Flink Kafka 端到端 Exactly-once
Kafka 是很是受歡迎的分佈式消息系統,在 Flink 中它能夠做爲 Source,同時也能夠做爲 Sink。Kafka 0.11.0 及以上版本提供了對事務的支持,這讓 Flink 應用搭載 Kafka 實現端到端的 exactly-once 成爲了可能。下面咱們就來深刻了解提供了事務支持的 Kafka 是如何與 Flink 結合實現端到端 exactly-once 的。
本文忽略了 Barrier 機制,因此示例和圖中都以單線程爲例。Barrier 在《Flink Checkpoint 原理》有較多討論。
3.1 Flink Kafka Consumer
Kafka 自身提供了可重複消費消息的能力,Flink 結合 Kafka 的這個特性以及自身 Checkpoint 機制,得以實現 Flink Kafka Consumer 的容錯。
Flink Kafka Consumer 是 Flink 應用從 Kafka 獲取數據流消息的一個實現。除了數據流獲取、數據發送下游算子這些基本功能外它還提供了完善的容錯機制。這些特性依賴了其內部的一些組件以及內置的數據結構協同處理完成。這裏,咱們先簡單瞭解這些組件和內置數據結構的職責,再結合 Flink 運行時 和 故障恢復時 兩個不一樣的處理時機來看一看它們之間是如何協同工做的。
Kafka Topic 元數據
從 Kafka 消費數據的前提是須要知道消費哪一個 topic,這個 topic 有多少個 partition。組件 AbstractPartitionDiscoverer 負責得到指定 topic 的元數據信息,並將獲取到的 topic 元數據信息封裝成 KafkaTopicPartition 集合。
KafkaTopicPartition 結構用於記錄 topic 與 partition 的對應關係,內部定義了 String topic 和 int partition 兩個主要屬性。假設 topic A 有 2 個分區,經過組件 AbstractPartitionDiscoverer 處理後將獲得由兩個 KafkaTopicPartition 對象組成的集合:KafkaTopicPartition(topic:A, partition:0) 和 KafkaTopicPartition(topic:A, partition:1)
做爲 Flink Source,Flink Kafka Consumer 最主要的職責就是能從 Kafka 中獲取數據,交給下游處理。在 Kafka Consumer 中 AbstractFetcher 組件負責完成這部分功能。除此以外 Fetcher 還負責 offset 的提交、KafkaTopicPartitionState 結構的數據維護。
KafkaTopicPartitionState 是一個很是核心的數據結構,基於內部的 4 個基本屬性,Flink Kafka Consumer 維護了 topic、partition、已消費 offset、待提交 offset 的關聯關係。Flink Kafka Consumer 的容錯機制依賴了這些數據。
除了這 4 個基本屬性外 KafkaTopicPartitionState 還有兩個子類,一個是支持 PunctuatedWatermark 的實現,另外一個是支持 PeriodicWatermark 的實現,這兩個子類在原有基礎上擴展了對水印的支持,咱們這裏不作過多討論。
Flink Kafka Consumer 的容錯性依靠的是狀態持久化,也能夠稱爲狀態快照。對於Flink Kafka Consumer 來講,這個狀態持久化具體是對 topic、partition、已消費 offset 的對應關係作持久化。
在實現中,使用 ListState> 定義了狀態存儲結構,在這裏 Long 表示的是 offset 類型,因此實際上就是使用 KafkaTopicPartition 和 offset 組成了一個對兒,再添加到狀態後端集合。
當狀態成功持久化後,一旦應用出現故障,就能夠用最近持久化成功的快照恢復應用狀態。在實現中,狀態恢復時會將快照恢復到一個 TreeMap 結構中,其中 key 是 KafkaTopicPartition,value 是對應已消費的 offset。恢復成功後,應用恢復到故障前 Flink Kafka Consumer 消費的 offset,並繼續執行任務,就好像什麼都沒發生同樣。
3.1.1 運行時
咱們假設 Flink 應用正常運行,Flink Kafka Consumer 消費 topic 爲 Topic-A,Topic-A 只有一個 partition。在運行期間,主要作了這麼幾件事:
KafkaFetcher 不斷的從 Kafka 消費數據,消費的數據會發送到下游算子並在內部記錄已消費過的 offset。下圖描述的是 Flink Kafka Consumer 從消費 Kafka 消息到將消息發送到下游算子的一個處理過程。
接下來咱們再結合消息真正開始處理後,KafkaTopicPartitionState 結構中的數據變化。
能夠看到,隨着應用的運行,KafkaTopicPartitionState 中的 offset 屬性值發生了變化,它記錄了已經發送到下游算子消息在 Kafka 中的 offset。在這裏因爲消息 P0-C 已經發送到下游算子,因此 KafkaTopicPartitionState.offset 變動爲 2。
若是 Flink 應用開啓了 Checkpoint,JobManager 會按期觸發 Checkpoint。FlinkKafkaConsumer 實現了 CheckpointedFunction,因此它具有快照狀態(snapshotState)的能力。在實現中,snapshotState 具體幹了這麼兩件事。
下圖描述當一輪 Checkpoint 開始時 FlinkKafkaConsumer 的處理過程。在例子中,FlinkKafkaConsumer 已經將 offset=3 的 P0-D 消息發送到下游,當checkpoint 觸發時將 topic=Topic-A;partition=0;offset=3 做爲最後的狀態持久化到外部存儲。
下圖描述當一輪 Checkpoint 開始時 FlinkKafkaConsumer 的處理過程。在例子中,FlinkKafkaConsumer 已經將 offset=3 的 P0-D 消息發送到下游,當 checkpoint 觸發時將 topic=Topic-A;partition=0;offset=3 做爲最後的狀態持久化到外部存儲。
當全部算子基於同一輪次快照處理結束後,會調用 CheckpointListener.notifyCheckpointComplete(checkpointId) 通知算子 Checkpoint 完成,參數 checkpointId 指明瞭本次通知是基於哪一輪 Checkpoint。在 FlinkKafkaConsumer 的實現中,接到 Checkpoint 完成通知後會變動 KafkaTopicPartitionState.commitedOffset 屬性值。最後再將變動後的 commitedOffset 提交到 Kafka brokers 或 Zookeeper。
在這個例子中,commitedOffset 變動爲 4,由於在快照階段,將 topic=Topic-A;partition=0;offset=3 的狀態作了快照,在真正提交 offset 時是將快照的 offset + 1 做爲結果提交的。「源代碼 KafkaFetcher.java 207 行 doCommitInternalOffsetsToKafka 方法」。
3.1.2 故障恢復
Flink 應用崩潰後,開始進入恢復模式。假設 Flink Kafka Consumer 最後一次成功的快照狀態是 topic=Topic-A;partition=0;offset=3,在恢復期間按照下面的前後順序執行處理。
狀態初始化階段嘗試從狀態後端加載出能夠用來恢復的狀態。它由 CheckpointedFunction.initializeState 接口定義。在 FlinkKafkaConsumer 的實現中,從狀態後端得到快照並寫入到內部存儲結構 TreeMap,其中 key 是由 KafkaTopicPartition 表示的 topic 與 partition,value 爲 offset。下圖描述的是故障恢復的第一個階段,從狀態後端得到快照,並恢復到內部存儲。
function 初始化階段除了初始化 OffsetCommitMode 和 partitionDiscoverer 外,還會初始化一個 Map 結構,該結構用來存儲應用待消費信息。若是應用須要從快照恢復狀態,則從待恢復狀態中初始化這個 Map 結構。下圖是該階段從快照恢復的處理過程。
function 初始化階段兼容了正常啓動和狀態恢復時 offset 的初始化。對於正常啓動過程,StartupMode 的設置決定待消費信息中的結果。該模式共有 5 種,默認爲 StartupMode.GROUP_OFFSETS。
在該階段中,將 KafkaFetcher 初始化、初始化內部消費狀態、啓動消費線程等等,其目的是爲了將 FlinkKafkaConsumer 運行起來,下圖描述了這個階段的處理流程。
-
步驟 3,使用狀態後端的快照結果 topic=Topic-A;partition=0;offset=3 初始化 Flink Kafka Consumer 內部維護的 Kafka 處理狀態。由於是恢復流程,因此這個內部維護的處理狀態也應該隨着快照恢復。
-
步驟 4,在真正消費 Kafka 數據前(指調用 KafkaConsumer.poll 方法),使用Kafka 提供的 seek 方法將 offset 重置到指定位置,而這個 offset 具體算法就是狀態後端 offset + 1。在例子中,消費 Kafka 數據前將 offset 重置爲 4,因此狀態恢復後 KafkaConsumer 是從 offset=4 位置開始消費。「源代碼 KafkaConsumerThread.java 428 行」
3.1.3 總結
上述的 3 個步驟是恢復期間主要的處理流程,一旦恢復邏輯執行成功,後續處理流程與正常運行期間一致。最後對 FlinkKafkaConsumer 用一句話作個總結。
「將 offset 提交權交給 FlinkKafkaConsumer,其內部維護 Kafka 消費及提交的狀態。基於 Kafka 可重複消費能力並配合 Checkpoint 機制和狀態後端存儲能力,就能實現 FlinkKafkaConsumer 容錯性,即 Source 端的 Exactly-once 語義」。
3.2 Flink Kafka Producer
Flink Kafka Producer 是 Flink 應用向 Kafka 寫出數據的一個實現。在 Kafka 0.11.0 及以上版本中提供了事務支持,這讓 Flink 搭載 Kafka 的事務特性能夠輕鬆實現 Sink 端的 Exactly-once 語義。關於 Kafka 事務特性在《Kafka 冪等與事務》中作了詳細討論。
在 Flink Kafka Producer 中,有一個很是重要的組件 FlinkKafkaInternalProducer,這個組件代理了 Kafka 客戶端 org.apache.kafka.clients.producer.KafkaProducer,它爲 Flink Kafka Producer 操做 Kafka 提供了強有力的支撐。在這個組件內部,除了代理方法外,還提供了一些關鍵操做。我的認爲,Flink Kafka Sink 可以實現 Exactly-once 語義除了須要 Kafka 支持事務特性外,同時也離不開
FlinkKafkaInternalProducer 組件提供的支持,尤爲是下面這些關鍵操做:
下面咱們結合 Flink 運行時 和 故障恢復 兩個不一樣的處理時機來了解 Flink Kafka Producer 內部如何工做。
3.2.1 運行時
咱們假設 Flink 應用正常運行,Flink Kafka Producer 正常接收上游數據並寫到 Topic-B 的 Topic 中,Topic-B 只有一個 partition。在運行期間,主要作如下幾件事:
上游算子不斷的將數據 Sink 到 FlinkKafkaProducer,FlinkKafkaProducer 接到數據後封裝 ProducerRecord 對象並調用 Kafka 客戶端 KafkaProducer.send 方法將 ProducerRecord 對象寫入緩衝「源代碼 FlinkKafkaProducer.java 616 行」。下圖是該階段的描述:
Flink 1.7 及以上版本使用 FlinkKafkaProducer 做爲 Kafka Sink,它繼承抽象類 TwoPhaseCommitSinkFunction,根據名字就能知道,這個抽象類主要實現兩階段提交。爲了集成 Flink Checkpoint 機制,抽象類實現了 CheckpointedFunction 和 CheckpointListener,所以它具有快照狀態(snapshotState)能力。狀態快照處理具體作了下面三件事:
調用 KafkaProducer 客戶端 flush 方法,將緩衝區內所有記錄發送到 Kafka,但不提交。這些記錄寫入到 Topic-B,此時這些數據的事務隔離級別爲 UNCOMMITTED,也就是說若是有個服務消費 Topic-B,而且設置的 isolation.level=read_committed,那麼此時這個消費端還沒法 poll 到 flush 的數據,由於這些數據還沒有 commit。何時 commit 呢?在快照結束處理階段進行 commit,後面會提到。 將快照輪次與當前事務記錄到一個 Map 表示的待提交事務集合中,key 是當前快照輪次的 CheckpointId,value 是由 TransactionHolder 表示的事務對象。TransactionHolder 對象內部記錄了 transactionalId、producerId、epoch 以及 Kafka 客戶端 kafkaProducer 的引用。 持久化當前事務處理狀態,也就是將當前處理的事務詳情存入狀態後端,供應用恢復時使用。
TwoPhaseCommitSinkFunction 實現了 CheckpointListener,應用中全部算子的快照處理成功後會收到基於某輪 Checkpoint 完成的通知。當 FlinkKafkaProducer 收到通知後,主要任務就是提交上一階段產生的事務,而具體要提交哪些事務是從上一階段生成的待提交事務集合中獲取的。
圖中第 4 步執行成功後,flush 到 Kafka 的數據從 UNCOMMITTED 變動爲 COMMITTED,這意味着此時消費端能夠 poll 到這批數據了。
2PC(兩階段提交)理論的兩個階段分別對應了 FlinkKafkaProducer 的狀態快照處理階段和快照結束處理階段,前者是經過 Kafka 的事務初始化、事務開啓、flush 等操做預提交事務,後者是經過 Kafka 的 commit 操做真正執行事務提交。
3.2.2 故障恢復
Flink 應用崩潰後,FlinkKafkaProducer 開始進入恢復模式。下圖爲應用崩潰前的狀態描述:
在恢復期間主要的處理在狀態初始化階段。當 Flink 任務重啓時會觸發狀態初始化,此時應用與 Kafka 已經斷開了鏈接。但在運行期間可能存在數據 flush 還沒有提交的狀況。
若是想從新提交這些數據須要從狀態後端恢復當時 KafkaProducer 持有的事務對象,具體一點就是恢復當時事務的 transactionalId、producerId、epoch。這個時候就用到了 FlinkKafkaInternalProducer 組件中的事務重置,在狀態初始化時從狀態後端得到這些事務信息,並重置到當前 KafkaProducer 中,再執行 commit 操做。這樣就能夠恢復任務重啓前的狀態,Topic-B 的消費端依然能夠 poll 到應用恢復後提交的數據。
須要注意的是:若是這個重置並提交的動做失敗了,可能會形成數據丟失。下圖描述的是狀態初始化階段的處理流程:
3.2.3 總結
FlinkKafkaProducer 故障恢復期間,狀態初始化是比較重要的處理階段。這個階段在 Kafka 事務特性的強有力支撐下,實現了事務狀態的恢復,而且使得狀態存儲佔用空間最小。依賴 Flink 提供的 TwoPhaseCommitSinkFunction 實現類,咱們本身也能夠對 Sink 作更多的擴展。