Kafka Streams開發入門(1)

 背景

最近發現Confluent公司在官網上發佈了Kafka Streams教程,共有10節課,每節課給出了Kafka Streams的一個功能介紹。這個系列教程對於咱們瞭解Kafka Streams仍是頗有幫助的。爲何要了解Kafka Streams?其實我一直以爲國內對於Flink有點過於迷戀了。大廠使用Flink尚自合理,畢竟數據量大且須要整套的集羣管理調度監控功能。但通常的中小公司業務簡單體量小,何須要費時費力地搭建一整套Flink集羣。有不少簡單的流處理業務場景使用Kafka Streams綽綽有餘了,更況且Kafka Streams在設計上特別是在消息精確處理語義上一點都沒必要Flink差,只是它的定位不一樣罷了。若是你對Kafka Streams感興趣,不妨關注一下這個系列。不過有點使人不爽的是,該教程使用Confluent Kafka做爲開發環境進行演示,並且大量用到了Avro進行消息的序列化/反序列化。Confluent Kafka默認提供Schema Regstry、avro-console-producer和avro-console-consumer工具能夠很方便地把消息轉換成Avro格式並進行測試,可是若是你使用原生的Kafka(也就是社區版的Apache Kafka),這些功能要自行開發,很是地不方便。java

鑑於此,我打算對照着這系列教程中的例子,使用Apache Kafka從新實現一遍。雖然有些科普,不過我以爲這是個很好的學習過程。須要注意的是,我不是使用Avro,而是使用Google的Protocol Buffer(下稱protobuf)來進行演示。git

 演示功能說明

第一篇要演示的功能很簡單,就是流處理的map功能:map函數或算子將一個消息流中的每條事件進行轉換,變動成另外一個格式或另外一條新的事件。今天輸入消息是表示電影的消息,格式以下:github

{"id": 294, "title": "Die Hard::1988", "genre": "action"}apache

咱們使用Kafka Streams實時地將每條消息中的title字段分開,將裏面的發行年份字段提取出來,變成下面這樣:bootstrap

{"id":294,"title":"Die Hard","release_year":1988,"genre":"action"}app

 初始化項目

第一步是建立項目文件夾。在執行這步前,你須要安裝並配置好Java環境和Gradle環境。Gradle是用於構建Java工程用的,下載地址是:https://gradle.org/。而後執行下列命令去建立項目:框架

mkdir movie-streams/
cd movie-streams/maven

 配置項目

在movie-streams目錄下,建立build.gradle文件——該文件是Gradle的項目配置文件,相似於Maven的pom.xml。該文件內容以下:ide

 buildscript {函數

    repositories {
jcenter()
}
dependencies {
classpath 'com.github.jengelman.gradle.plugins:shadow:4.0.2'
}
}

plugins {
id 'java'
id "com.google.protobuf" version "0.8.10"
}
apply plugin: 'com.github.johnrengelman.shadow'


repositories {
mavenCentral()
jcenter()

maven {
url 'http://packages.confluent.io/maven'
}
}

group 'huxihx.kafkastreams'

sourceCompatibility = 1.8
targetCompatibility = '1.8'
version = '0.0.1'

dependencies {
implementation 'com.google.protobuf:protobuf-java:3.0.0'
implementation 'org.slf4j:slf4j-simple:1.7.26'
implementation 'org.apache.kafka:kafka-streams:2.3.0'
implementation 'com.google.protobuf:protobuf-java:3.9.1'

testCompile group: 'junit', name: 'junit', version: '4.12'
}

protobuf {
generatedFilesBaseDir = "$projectDir/src/"
protoc {
artifact = 'com.google.protobuf:protoc:3.0.0'
}
}

jar {
manifest {
attributes(
'Class-Path': configurations.compile.collect { it.getName() }.join(' '),
'Main-Class': 'huxihx.kafkastreams.MovieStreamApp'
)
}
}

shadowJar {
archiveName = "kstreams-transform-standalone-${version}.${extension}"
}

 其中比較關鍵的是:首先咱們要引入Gradle的shadow插件,用於fat jar打包;其次,咱們指定了Gradle的protobuf插件用於幫助咱們將*.proto文件自動編譯成Java類。保存上面的文件,而後執行下列命令下載Gradle的wrapper套件:

gradle wrapper

 以後在movie-streams目錄下建立一個名爲configuration的文件夾用於保存咱們的參數配置文件:

mkdir configuration

同時建立配置文件dev.properties,內容以下:

application.id=movie-transformer

bootstrap.servers=localhost:9092

input.topic.name=raw-movies
input.topic.partitions=1
input.topic.replication.factor=1

output.topic.name=movies
output.topic.partitions=1
output.topic.replication.factor=1

 該文件設置了咱們要鏈接的Kafka集羣信息以及輸入topic和輸出topic詳情。

 建立消息Schema

下一步是建立輸入消息和輸出消息的schema。在movie-streams下執行命令建立保存schema的文件夾:

mkdir -p src/main/proto 

 而後分別建立raw-movie.proto和parsed-movie.proto文件,內容分別是:

syntax = "proto3";

package huxihx.kafkastreams.proto;

message RawMovie {
uint64 id = 1;
string title = 2;
string genre = 3;
}
syntax = "proto3";

package huxihx.kafkastreams.proto;

message Movie {
uint64 id = 1;
string title = 2;
uint32 release_year = 3;
string genre = 4;
}

文件內容是標準的protobuf語法,定義了電影事件的id、title、release_year和genre信息。保存這兩個文件,在movie-streams下以後運行gradlew命令將它們自動編譯成Java類:

./gradlew build

此時,你應該能夠在src/main/java/huxihx/kafkastreams/proto下看到生成的兩個Java類:ParsedMovie和RawMovieOuterClass。

 建立Serdes

這一步咱們爲RawMovie和Movie消息建立各自的Serdes。所謂的Serdes是serializer和deserializer的合稱。Kafka Streams程序在讀取消息時須要用到Serdes中的deserializer將消息字節序列轉換成對應的Java對象實例,而生產消息時則會用到Serdes的serializer將Java對象實例轉換成字節序列。因爲咱們使用protobuf框架進行序列化和反序列化,所以咱們須要建立支持protobuf的Serdes。

在movie-streams下執行:

mkdir -p src/main/java/huxihx/kafkastreams/serdes

而後在新建立的serdes文件夾下建立ProtobufSerializer.java:

package huxihx.kafkastreams.serdes;

import com.google.protobuf.MessageLite;
import org.apache.kafka.common.serialization.Serializer;

public class ProtobufSerializer<T extends MessageLite> implements Serializer<T> {
    @Override
    public byte[] serialize(String topic, T data) {
        return data == null ? new byte[0] : data.toByteArray();
    }
}

而後建立ProtobufDeserializer.java:

package huxihx.kafkastreams.serdes;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

public class ProtobufDeserializer<T extends MessageLite> implements Deserializer<T> {

    private Parser<T> parser;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        parser = (Parser<T>) configs.get("parser");
    }

    @Override
    public T deserialize(String topic, byte[] data) {
        try {
            return parser.parseFrom(data);
        } catch (InvalidProtocolBufferException e) {
            throw new SerializationException("Failed to deserialize from a protobuf byte array.", e);
        }
    }
}

最後建立ProtobufSerdes.java:

package huxihx.kafkastreams.serdes;

import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

import java.util.HashMap;
import java.util.Map;

public class ProtobufSerdes<T extends MessageLite> implements Serde<T> {

    private final Serializer<T> serializer;
    private final Deserializer<T> deserializer;

    public ProtobufSerdes(Parser<T> parser) {
        serializer = new ProtobufSerializer<>();
        deserializer = new ProtobufDeserializer<>();
        Map<String, Parser<T>> config = new HashMap<>();
        config.put("parser", parser);
        deserializer.configure(config, false);
    }

    @Override
    public Serializer<T> serializer() {
        return serializer;
    }

    @Override
    public Deserializer<T> deserializer() {
        return deserializer;
    }
}

 開發主流程

在src/main/java/huxihx/kafkastreams下建立MovieStreamApp.java文件:

package huxihx.kafkastreams;

import huxihx.kafkastreams.proto.ParsedMovie;
import huxihx.kafkastreams.proto.RawMovieOuterClass;
import huxihx.kafkastreams.serdes.ProtobufSerdes;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;

public class MovieStreamApp {

    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            throw new IllegalArgumentException("Config file path must be specified.");
        }

        MovieStreamApp app = new MovieStreamApp();
        Properties envProps = app.loadEnvProperties(args[0]);
        Properties streamProps = app.createStreamsProperties(envProps);
        Topology topology = app.buildTopology(envProps);

        app.preCreateTopics(envProps);

        final KafkaStreams streams = new KafkaStreams(topology, streamProps);
        final CountDownLatch latch = new CountDownLatch(1);

        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Exception e) {
            System.exit(1);
        }
        System.exit(0);
    }


    /**
     * 構建Streams拓撲對象實例
     *
     * @param envProps
     * @return
     */
    private Topology buildTopology(Properties envProps) {
        final StreamsBuilder builder = new StreamsBuilder();
        final String inputTopic = envProps.getProperty("input.topic.name");
        final String outputTopic = envProps.getProperty("output.topic.name");

        builder.stream(inputTopic, Consumed.with(Serdes.String(), rawMovieProtobufSerdes()))
                .map((key, rawMovie) -> new KeyValue<>(rawMovie.getId(), parseRawMovie(rawMovie)))
                .to(outputTopic, Produced.with(Serdes.Long(), movieProtobufSerdes()));

        return builder.build();
    }

    /**
     * 爲Kafka Streams程序構建所需的Properties實例
     *
     * @param envProps
     * @return
     */
    private Properties createStreamsProperties(Properties envProps) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, envProps.getProperty("application.id"));
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return props;
    }

    /**
     * 預建立輸入/輸出topic,若是topic已存在則忽略
     *
     * @param envProps
     * @throws Exception
     */
    private void preCreateTopics(Properties envProps) throws Exception {
        Map<String, Object> config = new HashMap<>();
        config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
        String inputTopic = envProps.getProperty("input.topic.name");
        String outputTopic = envProps.getProperty("output.topic.name");
        try (AdminClient client = AdminClient.create(config)) {
            Collection<TopicListing> existingTopics = client.listTopics().listings().get();

            List<NewTopic> topics = new ArrayList<>();
            List<String> topicNames = existingTopics.stream().map(TopicListing::name).collect(Collectors.toList());
            if (!topicNames.contains(inputTopic))
                topics.add(new NewTopic(
                        envProps.getProperty("input.topic.name"),
                        Integer.parseInt(envProps.getProperty("input.topic.partitions")),
                        Short.parseShort(envProps.getProperty("input.topic.replication.factor"))));

            if (!topicNames.contains(outputTopic))
                topics.add(new NewTopic(
                        envProps.getProperty("output.topic.name"),
                        Integer.parseInt(envProps.getProperty("output.topic.partitions")),
                        Short.parseShort(envProps.getProperty("output.topic.replication.factor"))));

            if (!topics.isEmpty())
                client.createTopics(topics).all().get();
        }
    }

    /**
     * 加載configuration下的配置文件
     *
     * @param fileName
     * @return
     * @throws IOException
     */
    private Properties loadEnvProperties(String fileName) throws IOException {
        Properties envProps = new Properties();
        try (FileInputStream input = new FileInputStream(fileName)) {
            envProps.load(input);
        }
        return envProps;
    }

    /**
     * 構建輸出topic所需的Serdes
     *
     * @return
     */
    private static ProtobufSerdes<ParsedMovie.Movie> movieProtobufSerdes() {
        return new ProtobufSerdes<>(ParsedMovie.Movie.parser());
    }

    /**
     * 構建輸入topic所需的Serdes
     *
     * @return
     */
    private static ProtobufSerdes<RawMovieOuterClass.RawMovie> rawMovieProtobufSerdes() {
        return new ProtobufSerdes<>(RawMovieOuterClass.RawMovie.parser());
    }

    /**
     * 執行map邏輯提取release_year字段
     *
     * @param rawMovie
     * @return
     */
    private static ParsedMovie.Movie parseRawMovie(RawMovieOuterClass.RawMovie rawMovie) {
        String[] titleParts = rawMovie.getTitle().split("::");
        String title = titleParts[0];
        int releaseYear = Integer.parseInt(titleParts[1]);
        return ParsedMovie.Movie.newBuilder()
                .setId(rawMovie.getId())
                .setTitle(title)
                .setReleaseYear(releaseYear)
                .setGenre(rawMovie.getGenre())
                .build();
    }
}

 編寫測試Producer和Consumer

在src/main/java/huxihx/kafkastreams/tests/TestProducer.java和TestConsumer.java,內容分別以下:

package huxihx.kafkastreams.tests;

import huxihx.kafkastreams.proto.RawMovieOuterClass;
import huxihx.kafkastreams.serdes.ProtobufSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class TestProducer {

    // 測試輸入事件
    private static final List<RawMovieOuterClass.RawMovie> TEST_RAW_MOVIES = Arrays.asList(
            RawMovieOuterClass.RawMovie.newBuilder()
                    .setId(294).setTitle("Die Hard::1988").setGenre("action").build(),

            RawMovieOuterClass.RawMovie.newBuilder()
                    .setId(354).setTitle("Tree of Life::2011").setGenre("drama").build(),

            RawMovieOuterClass.RawMovie.newBuilder()
                    .setId(782).setTitle("A Walk in the Clouds::1995").setGenre("romance").build(),

            RawMovieOuterClass.RawMovie.newBuilder()
                    .setId(128).setTitle("The Big Lebowski::1998").setGenre("comedy").build());

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", new ProtobufSerializer<RawMovieOuterClass.RawMovie>().getClass());

        try (final Producer<String, RawMovieOuterClass.RawMovie> producer = new KafkaProducer<>(props)) {
            TEST_RAW_MOVIES.stream()
                    .map(rawMovie -> new ProducerRecord<String, RawMovieOuterClass.RawMovie>("raw-movies", rawMovie))
                    .forEach(producer::send);
        }
    }
}

 

package huxihx.kafkastreams.tests;

import com.google.protobuf.Parser;
import huxihx.kafkastreams.proto.ParsedMovie;
import huxihx.kafkastreams.serdes.ProtobufDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class TestConsumer {

    public static void main(String[] args) {
        // 爲輸出事件構造protobuf deserializer
        Deserializer<ParsedMovie.Movie> deserializer = new ProtobufDeserializer<>();
        Map<String, Parser<ParsedMovie.Movie>> config = new HashMap<>();
        config.put("parser", ParsedMovie.Movie.parser());
        deserializer.configure(config, false);

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        KafkaConsumer<Long, ParsedMovie.Movie> consumer = new KafkaConsumer<>(props, new LongDeserializer(), deserializer);
        consumer.subscribe(Arrays.asList("movies"));
        while (true) {
            ConsumerRecords<Long, ParsedMovie.Movie> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<Long, ParsedMovie.Movie> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

 測試

 首先咱們運行下列命令構建項目:

./gradlew shadowJar

而後啓動Kafka集羣,以後運行Kafka Streams應用:

java -jar build/libs/kstreams-transform-standalone-0.0.1.jar configuration/dev.properties

而後啓動TestProducer發送測試事件:

java -cp build/libs/kstreams-transform-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestProducer

最後啓動TestConsumer驗證Kafka Streams提取出了每條輸入事件的release_year字段:

java -cp build/libs/kstreams-transform-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestConsumer

.......

offset = 0, key = 294, value = id: 294
title: "Die Hard"
release_year: 1988
genre: "action"

offset = 1, key = 354, value = id: 354
title: "Tree of Life"
release_year: 2011
genre: "drama"

offset = 2, key = 782, value = id: 782
title: "A Walk in the Clouds"
release_year: 1995
genre: "romance"

offset = 3, key = 128, value = id: 128
title: "The Big Lebowski"
release_year: 1998
genre: "comedy"

 總結

okay,第一篇的演示至此結束。整體上來看,Kafka Streams的轉換操做算子map仍是很是好用的。它可以實時地爲每條入站消息執行你指定的邏輯。下一篇中我將演示另外一個經典的transformation操做:filter。

相關文章
相關標籤/搜索