Flink1.9整合Kafka

file

本文基於Flink1.9版本簡述如何鏈接Kafka。java

流式鏈接器

file

咱們知道能夠本身來開發Source 和 Sink ,可是一些比較基本的 Source 和 Sink 已經內置在 Flink 裏。git

預約義的source支持從文件、目錄、socket,以及 collections 和 iterators 中讀取數據。github

預約義的sink支持把數據寫入文件、標準輸出(stdout)、標準錯誤輸出(stderr)和 socket。數據庫

鏈接器能夠和多種多樣的第三方系統進行交互。目前支持如下系統:apache

  • Apache Kafka
  • Apache Cassandra(sink)
  • Amazon Kinesis Streams(source/sink)
  • Elasticsearch(sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ(source/sink)
  • Apache NiFi(source/sink)
  • Twitter Streaming API(source)

請記住,在使用一種鏈接器時,一般須要額外的第三方組件,好比:數據存儲服務器或者消息隊列。bootstrap

Apache Bahir 中定義了其餘一些鏈接器api

  • Apache ActiveMQ(source/sink)
  • Apache Flume(sink)
  • Redis(sink)
  • Akka (sink)
  • Netty (source)

使用connector並非惟一能夠使數據進入或者流出Flink的方式。一種常見的模式是從外部數據庫或者 Web 服務查詢數據獲得初始數據流,而後經過 Map 或者 FlatMap 對初始數據流進行豐富和加強,這裏要使用Flink的異步IO。服務器

而向外部存儲推送大量數據時會致使 I/O 瓶頸問題出現。在這種場景下,若是對數據的讀操做遠少於寫操做,可讓外部應用從 Flink 拉取所需的數據,須要用到Flink的可查詢狀態接口。異步

本文重點介紹Apache Kafka Connectorsocket

Kafka鏈接器

此鏈接器提供對Apache Kafka提供的事件流的訪問。

Flink提供特殊的Kafka鏈接器,用於從/向Kafka主題讀取和寫入數據。Flink Kafka Consumer集成了Flink的檢查點機制,可提供一次性處理語義。爲實現這一目標,Flink並不徹底依賴Kafka 的消費者組的偏移量,而是在內部跟蹤和檢查這些偏移。

下表爲不一樣版本的kafka與Flink Kafka Consumer的對應關係。

Maven Dependency
Supported since Consumer and Producer Class name
Kafka version
flink-connector-kafka-0.8_2.11
1.0.0
FlinkKafkaConsumer08 FlinkKafkaProducer08 0.8.x
flink-connector-kafka-0.9_2.11
1.0.0
FlinkKafkaConsumer09 FlinkKafkaProducer09 0.9.x
flink-connector-kafka-0.10_2.11 1.2.0
FlinkKafkaConsumer010 FlinkKafkaProducer010 0.10.x
flink-connector-kafka-0.11_2.11 1.4.0
FlinkKafkaConsumer011 FlinkKafkaProducer011 0.11.x
flink-connector-kafka_2.11
1.7.0
FlinkKafkaConsumer FlinkKafkaProducer >= 1.0.0

而從最新的Flink1.9.0版本開始,使用Kafka 2.2.0客戶端。

下面簡述使用步驟。

導入maven依賴:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.9.0</version>
</dependency>複製代碼

安裝Kafka:

能夠參照 Kafka入門寶典(詳細截圖版)

兼容性:

從Flink 1.7開始,它不跟蹤特定的Kafka主要版本。相反,它在Flink發佈時跟蹤最新版本的Kafka。若是您的Kafka代理版本是1.0.0或更高版本,則應使用此Kafka鏈接器。若是使用舊版本的Kafka(0.11,0.10,0.9或0.8),則應使用與代理版本對應的鏈接器。

升級Connect要注意Flink升級做業,同時

  • 在整個過程當中使用Flink 1.9或更新版本。
  • 不要同時升級Flink和運營商。
  • 確保您做業中使用的Kafka Consumer和/或Kafka Producer分配了惟一標識符(uid)。
  • 使用stop with savepoint功能獲取保存點(例如,使用stop --withSavepoint)。

用法:

引入依賴後,實例化新的source(FlinkKafkaConsumer)和sink(FlinkKafkaProducer)。

Kafka Consumer

先分步驟介紹構建過程,文末附Flink1.9鏈接Kafka完整代碼。

Kafka consumer 根據版本分別叫作FlinkKafkaConsumer08 FlinkKafkaConsumer09等等Kafka >= 1.0.0 的版本就叫FlinkKafkaConsumer。

構建FlinkKafkaConsumer

java示例代碼以下:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env
    .addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));複製代碼

scala:

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
stream = env
    .addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))
    .print()複製代碼

必須有的:

1.topic名稱

2.用於反序列化Kafka數據的DeserializationSchema / KafkaDeserializationSchema

3.配置參數:「bootstrap.servers」 「group.id」 (kafka0.8還須要 「zookeeper.connect」)

配置消費起始位置

java:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.setStartFromEarliest();     // start from the earliest record possible
myConsumer.setStartFromLatest();       // start from the latest record
myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets(); // the default behaviour

//指定位置
//Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
//specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
//myConsumer.setStartFromSpecificOffsets(specificStartOffsets);

DataStream<String> stream = env.addSource(myConsumer);複製代碼

scala:

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val myConsumer = new FlinkKafkaConsumer[String](...)
myConsumer.setStartFromEarliest()      // start from the earliest record possible
myConsumer.setStartFromLatest()        // start from the latest record
myConsumer.setStartFromTimestamp(...)  // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets()  // the default behaviour

//指定位置
//val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
//specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
//myConsumer.setStartFromSpecificOffsets(specificStartOffsets)

val stream = env.addSource(myConsumer)複製代碼

檢查點

啓用Flink的檢查點後,Flink Kafka Consumer將使用主題中的記錄,並以一致的方式按期檢查其全部Kafka偏移以及其餘操做的狀態。若是做業失敗,Flink會將流式程序恢復到最新檢查點的狀態,並從存儲在檢查點中的偏移量開始從新使用Kafka的記錄。

若是禁用了檢查點,則Flink Kafka Consumer依賴於內部使用的Kafka客戶端的自動按期偏移提交功能。

若是啓用了檢查點,則Flink Kafka Consumer將在檢查點完成時提交存儲在檢查點狀態中的偏移量。

java

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs複製代碼

scala

val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs複製代碼

分區發現

Flink Kafka Consumer支持發現動態建立的Kafka分區,並使用一次性保證消費它們。

還能夠使用正則:

java

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
    java.util.regex.Pattern.compile("test-topic-[0-9]"),
    new SimpleStringSchema(),
    properties);

DataStream<String> stream = env.addSource(myConsumer);
...複製代碼

scala

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")

val myConsumer = new FlinkKafkaConsumer08[String](
  java.util.regex.Pattern.compile("test-topic-[0-9]"),
  new SimpleStringSchema,
  properties)

val stream = env.addSource(myConsumer)
...複製代碼

時間戳和水印

在許多狀況下,記錄的時間戳(顯式或隱式)嵌入記錄自己。另外,用戶可能想要週期性地或以不規則的方式發出水印。

咱們能夠定義好Timestamp Extractors / Watermark Emitters,經過如下方式將其傳遞給您的消費者:

java

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer08<String> myConsumer =
    new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());

DataStream<String> stream = env
    .addSource(myConsumer)
    .print();複製代碼

scala

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")

val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties)
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter())
stream = env
    .addSource(myConsumer)
    .print()複製代碼

Kafka Producer

Kafka Producer 根據版本分別叫作FlinkProducer011 FlinkKafkaProducer010等等Kafka >= 1.0.0 的版本就叫FlinkKafkaProducer 。

構建FlinkKafkaConsumer

java

DataStream<String> stream = ...;

FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<String>(
        "localhost:9092",            // broker list
        "my-topic",                  // target topic
        new SimpleStringSchema());   // serialization schema

// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions
myProducer.setWriteTimestampToKafka(true);

stream.addSink(myProducer);複製代碼

scala

val stream: DataStream[String] = ...

val myProducer = new FlinkKafkaProducer011[String](
        "localhost:9092",         // broker list
        "my-topic",               // target topic
        new SimpleStringSchema)   // serialization schema

// versions 0.10+ allow attaching the records' event timestamp when writing them to Kafka;
// this method is not available for earlier Kafka versions
myProducer.setWriteTimestampToKafka(true)

stream.addSink(myProducer)複製代碼

須要指定broker list , topic,序列化類。

自定義分區:默認狀況下,將使用FlinkFixedPartitioner將每一個Flink Kafka Producer並行子任務映射到單個Kafka分區。

能夠實現FlinkKafkaPartitioner類自定義分區。

Flink1.9消費Kafka完整代碼:

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;

public class KafkaConsumer {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");
        //構建FlinkKafkaConsumer
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
        //指定偏移量
        myConsumer.setStartFromEarliest();


        DataStream<String> stream = env
                .addSource(myConsumer);

        env.enableCheckpointing(5000);
        stream.print();

        env.execute("Flink Streaming Java API Skeleton");
    }複製代碼

項目地址:github.com/tree1123/fl…

更多Flink知識,歡迎關注實時流式計算

file

相關文章
相關標籤/搜索