Flink提供了Kafka connector用於消費/生產Apache Kafka topic的數據。Flink的Kafka consumer集成了checkpoint機制以提供精確一次的處理語義。在具體的實現過程當中,Flink不依賴於Kafka內置的消費組位移管理,而是在內部自行記錄和維護consumer的位移。html
用戶在使用時須要根據Kafka版原本選擇相應的connector,以下表所示:java
Maven依賴 | 支持的最低Flink版本 | Kafka客戶端類名 | 說明 |
flink-connector-kafka-0.8_2.10 | 1.0.0 | FlinkKafkaConsumer08apache FlinkKafkaProducer08bootstrap |
使用的是Kafka老版本low-level consumer,即SimpleConsumer. Flink在內部會提交位移到Zookeeper |
flink-connector-kafka-0.9_2.10 | 1.0.0 | FlinkKafkaConsumer09api FlinkKafkaProducer09數組 |
使用Kafka新版本consumer |
flink-connector-kafka-0.10_2.10 | 1.2.0 | FlinkKafkaConsumer010maven FlinkKafkaProducer010函數 |
支持使用Kafka 0.10.0.0版本新引入的內置時間戳信息 |
而後,將上面對應的connector依賴加入到maven項目中,好比:性能
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.3.2</version>
</dependency>ui
Kafka Consumer
Flink kafka connector使用的consumer取決於用戶使用的是老版本consumer仍是新版本consumer,新舊兩個版本對應的connector類名是不一樣的,分別是:FlinkKafkaConsumer09(或FlinkKafkaConsumer010)以及FlinkKafkaConsumer08。它們都支持同時消費多個topic。
該Connector的構造函數包含如下幾個字段:
下面給出一個實例:
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); // only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); DataStream<String> stream = env.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));
DeserializationSchema
Flink的Kafka consumer須要依靠用戶指定的解序列化器來將二進制的數據轉換成Java對象。DeserializationSchema接口就是作這件事情的,該接口中的deserialize方法做用於每條Kafka消息上,並把轉換的結果發往Flink的下游operator。
一般狀況下,用戶直接繼承AbstractDeserializationSchema來建立新的deserializer,也能夠實現DeserializationSchema接口,只不過要自行實現getProducedType方法。
若是要同時解序列化Kafka消息的key和value,則須要實現KeyedDeserializationSchema接口,由於該接口的deserialize方法同時包含了key和value的字節數組。
Flink默認提供了幾種deserializer:
一旦在解序列化過程當中出現錯誤,Flink提供了兩個應對方法——1. 在deserialize方法中拋出異常,使得整個做業失敗並重啓;2. 返回null告訴Flink Kafka connector跳過這條異常消息。值得注意的是,因爲consumer是高度容錯的,若是採用第一種方式會讓consumer再次嘗試deserialize這條有問題的消息。所以假若deserializer再次失敗,程序可能陷入一個死循環並不斷進行錯誤重試。
Kafka consumer起始位移配置
Flink的Kafka consumer容許用戶配置Kafka consumer的起始讀取位移,以下列代碼所示:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...); myConsumer.setStartFromEarliest(); // start from the earliest record possible myConsumer.setStartFromLatest(); // start from the latest record myConsumer.setStartFromGroupOffsets(); // the default behaviour DataStream<String> stream = env.addSource(myConsumer); ...
全部版本的Flink Kafka consumer均可以使用上面的方法來設定起始位移。
Flink也支持用戶自行指定位移,方法以下:
Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>(); specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L); specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L); myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
上面的例子中,consumer將從用戶指定的位移處開始讀取消息。這裏的位移記錄的是下一條待消費消息的位移,而不是最新的已消費消息的位移。值得注意的是,若是待消費分區的位移不在保存的位移映射中,Flink Kafka connector會使用默認的組位移策略(即setStartFromGroupOffsets())。
另外,當任務自動地從失敗中恢復或手動地從savepoint中恢復時,上述這些設置位移的方法是不生效的。在恢復時,每一個Kafka分區的起始位移都是由保存在savepoint或checkpoint中的位移來決定的。
Kafka consumer容錯性
一旦啓用了Flink的檢查點機制(checkpointing),Flink Kafka消費者會按期地對其消費的topic作checkpoint以保存它消費的位移以及其餘操做的狀態。一旦出現失敗,Flink將會恢復streaming程序到最新的checkpoint狀態,而後從新從Kafka消費數據,從新讀取的位置就是保存在checkpoint中的位移。
checkpoint的間隔決定了程序容錯性的程度,它直接肯定了在程序崩潰時,程序回溯到的最久狀態。
若是要使用啓動容錯性的Kafka消費者,按期對拓撲進行checkpoint就是很是必要的,實現方法以下面代碼所示:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // 每5秒作一次checkpoint
須要注意的是,只有槽位(slot)充足Flink纔會重啓拓撲,所以一旦拓撲因沒法鏈接TaskManager而崩潰,仍然須要有足夠的slot才能重啓拓撲。若是使用YARN的話,Flink可以自動地重啓丟失的YARN容器。
若是沒有啓用checkpoint,那麼Kafka consumer會按期地向Zookeeper提交位移。
Kafka consumer位移提交
Flink Kafka consumer能夠自行設置位移提交的行爲。固然,它不依賴於這些已提交的位移來實現容錯性。這些提交位移只是供監控使用。
配置位移提交的方法各異,主要依賴因而否啓用了checkpointing機制:
Kafka consumer時間戳提取/水位生成
一般,事件或記錄的時間戳信息是封裝在消息體中。至於水位,用戶能夠選擇按期地發生水位,也能夠基於某些特定的Kafka消息來生成水位——這分別就是AssignerWithPeriodicWatermaks以及AssignerWithPunctuatedWatermarks接口的使用場景。
用戶也可以自定義時間戳提取器/水位生成器,具體方法參見這裏,而後按照下面的方式傳遞給consumer:
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); // only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties); myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); DataStream<String> stream = env .addSource(myConsumer) .print();
在內部,Flink會爲每一個Kafka分區都執行一個對應的assigner實例。一旦指定了這樣的assigner,對於每條Kafka中的消息,extractTimestamp(T element, long previousElementTimestamp)方法會被調用來給消息分配時間戳,而getCurrentWatermark()方法(定時生成水位)或checkAndGetNextWatermark(T lastElement, long extractedTimestamp)方法(基於特定條件)會被調用以肯定是否發送新的水位值。