【譯】Apache Flink Kafka consumer

Flink提供了Kafka connector用於消費/生產Apache Kafka topic的數據。Flink的Kafka consumer集成了checkpoint機制以提供精確一次的處理語義。在具體的實現過程當中,Flink不依賴於Kafka內置的消費組位移管理,而是在內部自行記錄和維護consumer的位移。html

用戶在使用時須要根據Kafka版原本選擇相應的connector,以下表所示:java

Maven依賴 支持的最低Flink版本 Kafka客戶端類名 說明
flink-connector-kafka-0.8_2.10 1.0.0

FlinkKafkaConsumer08apache

FlinkKafkaProducer08bootstrap

使用的是Kafka老版本low-level consumer,即SimpleConsumer. Flink在內部會提交位移到Zookeeper
flink-connector-kafka-0.9_2.10 1.0.0

FlinkKafkaConsumer09api

FlinkKafkaProducer09數組

使用Kafka新版本consumer
flink-connector-kafka-0.10_2.10 1.2.0

FlinkKafkaConsumer010maven

FlinkKafkaProducer010函數

支持使用Kafka 0.10.0.0版本新引入的內置時間戳信息

而後,將上面對應的connector依賴加入到maven項目中,好比:性能

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.3.2</version>
</dependency>ui

Kafka Consumer

Flink kafka connector使用的consumer取決於用戶使用的是老版本consumer仍是新版本consumer,新舊兩個版本對應的connector類名是不一樣的,分別是:FlinkKafkaConsumer09(或FlinkKafkaConsumer010)以及FlinkKafkaConsumer08。它們都支持同時消費多個topic。

該Connector的構造函數包含如下幾個字段:

  1. 待消費的topic列表
  2. key/value解序列化器,用於將字節數組形式的Kafka消息解序列化回對象
  3. Kafka consumer的屬性對象,經常使用的consumer屬性包括:bootstrap.servers(新版本consumer專用)、zookeeper.connect(舊版本consumer專用)和group.id

下面給出一個實例:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));

DeserializationSchema

Flink的Kafka consumer須要依靠用戶指定的解序列化器來將二進制的數據轉換成Java對象。DeserializationSchema接口就是作這件事情的,該接口中的deserialize方法做用於每條Kafka消息上,並把轉換的結果發往Flink的下游operator。

一般狀況下,用戶直接繼承AbstractDeserializationSchema來建立新的deserializer,也能夠實現DeserializationSchema接口,只不過要自行實現getProducedType方法。

若是要同時解序列化Kafka消息的key和value,則須要實現KeyedDeserializationSchema接口,由於該接口的deserialize方法同時包含了key和value的字節數組。

Flink默認提供了幾種deserializer:

  • TypeInformationSerializationSchema(以及TypeInformationKeyValueSerializationSchema):建立一個基於Flink TypeInformation的schema,適用於數據是由Flink讀寫之時。比起其餘序列化方法,這種schema性能更好
  • JsonDeserializationSchema(JSONKeyValueDeserializationSchema):將JSON轉換成ObjectNode對象,而後經過ObjectNode.get("fieldName").as(Int/String...)()訪問具體的字段。KeyValue

一旦在解序列化過程當中出現錯誤,Flink提供了兩個應對方法——1. 在deserialize方法中拋出異常,使得整個做業失敗並重啓;2. 返回null告訴Flink Kafka connector跳過這條異常消息。值得注意的是,因爲consumer是高度容錯的,若是採用第一種方式會讓consumer再次嘗試deserialize這條有問題的消息。所以假若deserializer再次失敗,程序可能陷入一個死循環並不斷進行錯誤重試。

Kafka consumer起始位移配置

Flink的Kafka consumer容許用戶配置Kafka consumer的起始讀取位移,以下列代碼所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
myConsumer.setStartFromEarliest();     // start from the earliest record possible
myConsumer.setStartFromLatest();       // start from the latest record
myConsumer.setStartFromGroupOffsets(); // the default behaviour

DataStream<String> stream = env.addSource(myConsumer);
...

全部版本的Flink Kafka consumer均可以使用上面的方法來設定起始位移。

  • setStartFromGroupOffsets:這是默認狀況,即從消費者組提交到Kafka broker上的位移開始讀取分區數據(對於老版本而言,位移是提交到Zookeeper上)。若是未找到位移,使用auto.offset.reset屬性值來決定位移。該屬性默認是LATEST,即從最新的消息位移處開始消費
  • setStartFromEarliest() / setStartFromLatest():設置從最先/最新位移處開始消費。使用這兩個方法的話,Kafka中提交的位移就將會被忽略而不會被用做起始位移

Flink也支持用戶自行指定位移,方法以下:

Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);

myConsumer.setStartFromSpecificOffsets(specificStartOffsets);

上面的例子中,consumer將從用戶指定的位移處開始讀取消息。這裏的位移記錄的是下一條待消費消息的位移,而不是最新的已消費消息的位移。值得注意的是,若是待消費分區的位移不在保存的位移映射中,Flink Kafka connector會使用默認的組位移策略(即setStartFromGroupOffsets())。

另外,當任務自動地從失敗中恢復或手動地從savepoint中恢復時,上述這些設置位移的方法是不生效的。在恢復時,每一個Kafka分區的起始位移都是由保存在savepoint或checkpoint中的位移來決定的。

Kafka consumer容錯性

一旦啓用了Flink的檢查點機制(checkpointing),Flink Kafka消費者會按期地對其消費的topic作checkpoint以保存它消費的位移以及其餘操做的狀態。一旦出現失敗,Flink將會恢復streaming程序到最新的checkpoint狀態,而後從新從Kafka消費數據,從新讀取的位置就是保存在checkpoint中的位移。

checkpoint的間隔決定了程序容錯性的程度,它直接肯定了在程序崩潰時,程序回溯到的最久狀態。

若是要使用啓動容錯性的Kafka消費者,按期對拓撲進行checkpoint就是很是必要的,實現方法以下面代碼所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // 每5秒作一次checkpoint  

須要注意的是,只有槽位(slot)充足Flink纔會重啓拓撲,所以一旦拓撲因沒法鏈接TaskManager而崩潰,仍然須要有足夠的slot才能重啓拓撲。若是使用YARN的話,Flink可以自動地重啓丟失的YARN容器。

若是沒有啓用checkpoint,那麼Kafka consumer會按期地向Zookeeper提交位移。

Kafka consumer位移提交

Flink Kafka consumer能夠自行設置位移提交的行爲。固然,它不依賴於這些已提交的位移來實現容錯性。這些提交位移只是供監控使用。

配置位移提交的方法各異,主要依賴因而否啓用了checkpointing機制:

  • 未啓用checkpointing:Flink Kafka consumer依賴於Kafka提供的自動提交位移功能。設置方法是在Properties對象中配置Kafka參數enable.auto.commit(新版本Kafka consumer)或auto.commit.enable(老版本Kafka consumer)
  • 啓用checkpointing:Flink Kafka consumer會提交位移到checkpoint狀態中。這就保證了Kafka中提交的位移與checkpoint狀態中的位移是一致的。用戶能夠調用setCommitOffsetsCheckpoints(boolean)方法來禁用/開啓位移提交——默認是true,即開啓了位移提交。注意,這種狀況下,Flink會忽略上一種狀況中說起的Kafka參數

Kafka consumer時間戳提取/水位生成

一般,事件或記錄的時間戳信息是封裝在消息體中。至於水位,用戶能夠選擇按期地發生水位,也能夠基於某些特定的Kafka消息來生成水位——這分別就是AssignerWithPeriodicWatermaks以及AssignerWithPunctuatedWatermarks接口的使用場景。

用戶也可以自定義時間戳提取器/水位生成器,具體方法參見這裏,而後按照下面的方式傳遞給consumer:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer08<String> myConsumer =
    new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());

DataStream<String> stream = env
	.addSource(myConsumer)
	.print();

在內部,Flink會爲每一個Kafka分區都執行一個對應的assigner實例。一旦指定了這樣的assigner,對於每條Kafka中的消息,extractTimestamp(T element, long previousElementTimestamp)方法會被調用來給消息分配時間戳,而getCurrentWatermark()方法(定時生成水位)或checkAndGetNextWatermark(T lastElement, long extractedTimestamp)方法(基於特定條件)會被調用以肯定是否發送新的水位值。

相關文章
相關標籤/搜索