Kafka Streams開發入門(2)

 背景

上一篇咱們介紹了Kafka Streams中的消息轉換操做map,今天咱們給出另外一個經典的轉換操做filter的用法。依然是結合一個具體的實例展開介紹。html

 演示功能說明

本篇演示filter用法,即根據給定的過濾條件或邏輯實時對每條消息進行過濾處理。今天使用的輸入topic消息格式以下:java

{"name": "George R. R. Martin", "title": "A Song of Ice and Fire"}git

{"name": "C.S. Lewis", "title": "The Silver Chair"}github

咱們打算過濾出name是「George R. R. Martin」的全部消息併發送到輸出topic上。apache

 初始化項目

 建立項目目錄:bootstrap

mkdir filter-streams
cd filter-streams/併發

 配置項目

在filter-streams目錄下建立build.gradle文件,內容以下:app

buildscript {

    repositories {
        jcenter()
    }
    dependencies {
        classpath 'com.github.jengelman.gradle.plugins:shadow:4.0.2'
    }
}

plugins {
    id 'java'
    id "com.google.protob" version "0.8.10"
}
apply plugin: 'com.github.johnrengelman.shadow'


repositories {
    mavenCentral()
    jcenter()

    maven {
        url 'http://packages.confluent.io/maven'
    }
}

group 'huxihx.kafkastreams'

sourceCompatibility = 1.8
targetCompatibility = '1.8'
version = '0.0.1'

dependencies {
    implementation 'com.google.protobuf:protobuf-java:3.0.0'
    implementation 'org.slf4j:slf4j-simple:1.7.26'
    implementation 'org.apache.kafka:kafka-streams:2.3.0'
    implementation 'com.google.protobuf:protobuf-java:3.9.1'

    testCompile group: 'junit', name: 'junit', version: '4.12'
}

protobuf {
    generatedFilesBaseDir = "$projectDir/src/"
    protoc {
        artifact = 'com.google.protobuf:protoc:3.0.0'
    }
}

jar {
    manifest {
        attributes(
                'Class-Path': configurations.compile.collect { it.getName() }.join(' '),
                'Main-Class': 'huxihx.kafkastreams.FilteredStreamsApp'
        )
    }
}

shadowJar {
    archiveName = "kstreams-transform-standalone-${version}.${extension}"
}

而後執行下列命令下載Gradle的wrapper套件:jvm

gradle wrappermaven

以後在filter-streams目錄下建立一個名爲configuration的文件夾用於保存咱們的參數配置文件:

mkdir configuration

建立一個名爲dev.properties的文件:

application.id=filtering-app
bootstrap.servers=localhost:9092

input.topic.name=publications
input.topic.partitions=1
input.topic.replication.factor=1

output.topic.name=filtered-publications
output.topic.partitions=1output.topic.replication.factor=1

 建立消息Schema

下一步是建立輸入消息和輸出消息的schema。因爲咱們今天只是作filter,因此輸入和輸出的格式同樣的,只須要建立一份schema便可。首先,在filter-streams下執行命令建立保存schema的文件夾:

mkdir -p src/main/proto

以後建立publication.proto文件,內容以下:

syntax = "proto3";

package huxihx.kafkastreams.proto;

message Publication {
string name = 1;
string title = 2;
}

保存文件以後運行下列命令去編譯對應的Java類:

./gradlew build

此時,你應該能夠在src/main/java/huxihx/kafkastreams/proto下看到生成的Java類:PublicationOuterClass。

 建立Serdes

這一步的Serdes和上一篇中的同樣,所以再也不贅述,直接上代碼:

mkdir -p src/main/java/huxihx/kafkastreams/serdes

在新建立的serdes文件夾下建立ProtobufSerializer.java: 

package huxihx.kafkastreams.serdes;
 
import com.google.protobuf.MessageLite;
import org.apache.kafka.common.serialization.Serializer;
 
public class ProtobufSerializer<T extends MessageLite> implements Serializer<T> {
    @Override
    public byte[] serialize(String topic, T data) {
        return data == null ? new byte[0] : data.toByteArray();
    }
}

而後建立ProtobufDeserializer.java:

package huxihx.kafkastreams.serdes;
 
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
 
import java.util.Map;
 
public class ProtobufDeserializer<T extends MessageLite> implements Deserializer<T> {
 
    private Parser<T> parser;
 
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        parser = (Parser<T>) configs.get("parser");
    }
 
    @Override
    public T deserialize(String topic, byte[] data) {
        try {
            return parser.parseFrom(data);
        } catch (InvalidProtocolBufferException e) {
            throw new SerializationException("Failed to deserialize from a protobuf byte array.", e);
        }
    }
}

最後建立ProtobufSerdes.java:

package huxihx.kafkastreams.serdes;
 
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
 
import java.util.HashMap;
import java.util.Map;
 
public class ProtobufSerdes<T extends MessageLite> implements Serde<T> {
 
    private final Serializer<T> serializer;
    private final Deserializer<T> deserializer;
 
    public ProtobufSerdes(Parser<T> parser) {
        serializer = new ProtobufSerializer<>();
        deserializer = new ProtobufDeserializer<>();
        Map<String, Parser<T>> config = new HashMap<>();
        config.put("parser", parser);
        deserializer.configure(config, false);
    }
 
    @Override
    public Serializer<T> serializer() {
        return serializer;
    }
 
    @Override
    public Deserializer<T> deserializer() {
        return deserializer;
    }
}

開發主流程

 在src/main/java/huxihx/kafkastreams下建立FilteredStreamsApp.java文件:

package huxihx.kafkastreams;

import huxihx.kafkastreams.proto.PublicationOuterClass;
import huxihx.kafkastreams.serdes.ProtobufSerdes;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;

public class FilteredStreamsApp {

    private Properties buildStreamsProperties(Properties envProps) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, envProps.getProperty("application.id"));
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return props;
    }

    private void preCreateTopics(Properties envProps) throws Exception {
        Map<String, Object> config = new HashMap<>();
        config.put("bootstrap.servers", envProps.getProperty("bootstrap.servers"));
        try (AdminClient client = AdminClient.create(config)) {
            Set<String> existingTopics = client.listTopics().names().get();

            List<NewTopic> topics = new ArrayList<>();
            String inputTopic = envProps.getProperty("input.topic.name");
            if (!existingTopics.contains(inputTopic)) {
                topics.add(new NewTopic(inputTopic,
                        Integer.parseInt(envProps.getProperty("input.topic.partitions")),
                        Short.parseShort(envProps.getProperty("input.topic.replication.factor"))));

            }

            String outputTopic = envProps.getProperty("output.topic.name");
            if (!existingTopics.contains(outputTopic)) {
                topics.add(new NewTopic(outputTopic,
                        Integer.parseInt(envProps.getProperty("output.topic.partitions")),
                        Short.parseShort(envProps.getProperty("output.topic.replication.factor"))));
            }

            client.createTopics(topics);
        }
    }

    private Properties loadEnvProperties(String filePath) throws IOException {
        Properties envProps = new Properties();
        try (FileInputStream input = new FileInputStream(filePath)) {
            envProps.load(input);
        }
        return envProps;
    }

    private Topology buildTopology(Properties envProps, final Serde<PublicationOuterClass.Publication> publicationSerde) {
        final StreamsBuilder builder = new StreamsBuilder();

        final String inputTopic = envProps.getProperty("input.topic.name");
        final String outputTopic = envProps.getProperty("output.topic.name");

        builder.stream(inputTopic, Consumed.with(Serdes.String(), publicationSerde))
                .filter((key, publication) -> "George R. R. Martin".equals(publication.getName()))
                .to(outputTopic, Produced.with(Serdes.String(), publicationSerde));
        return builder.build();
    }

    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            throw new IllegalArgumentException("Environment configuration file must be specified.");
        }

        FilteredStreamsApp app = new FilteredStreamsApp();
        Properties envProps = app.loadEnvProperties(args[0]);
        Properties streamProps = app.buildStreamsProperties(envProps);

        app.preCreateTopics(envProps);

        Topology topology = app.buildTopology(envProps, new ProtobufSerdes<>(PublicationOuterClass.Publication.parser()));

        final KafkaStreams streams = new KafkaStreams(topology, streamProps);
        final CountDownLatch latch = new CountDownLatch(1);

        Runtime.getRuntime().addShutdownHook(new Thread("streams-jvm-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Exception e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

 編寫測試Producer和Consumer

在src/main/java/huxihx/kafkastreams/tests/TestProducer.java和TestConsumer.java,內容分別以下:

package huxihx.kafkastreams.tests;

import huxihx.kafkastreams.proto.PublicationOuterClass;
import huxihx.kafkastreams.serdes.ProtobufSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class TestProducer {

    // 測試輸入事件
    private static final List<PublicationOuterClass.Publication> TEST_PUBLICATIONS = Arrays.asList(
            PublicationOuterClass.Publication.newBuilder()
                    .setName("George R. R. Martin").setTitle("A Song of Ice and Fire").build(),
            PublicationOuterClass.Publication.newBuilder()
                    .setName("C.S. Lewis").setTitle("The Silver Chair").build(),
            PublicationOuterClass.Publication.newBuilder()
                    .setName("C.S. Lewis").setTitle("Perelandra").build(),
            PublicationOuterClass.Publication.newBuilder()
                    .setName("George R. R. Martin").setTitle("Fire & Blood").build(),
            PublicationOuterClass.Publication.newBuilder()
                    .setName("J. R. R. Tolkien").setTitle("The Hobbit").build(),
            PublicationOuterClass.Publication.newBuilder()
                    .setName("J. R. R. Tolkien").setTitle("The Lord of the Rings").build(),
            PublicationOuterClass.Publication.newBuilder()
                    .setName("George R. R. Martin").setTitle("A Dream of Spring").build(),
            PublicationOuterClass.Publication.newBuilder()
                    .setName("J. R. R. Tolkien").setTitle("The Fellowship of the Ring").build(),
            PublicationOuterClass.Publication.newBuilder()
                    .setName("George R. R. Martin").setTitle("The Ice Dragon").build());

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", new ProtobufSerializer<PublicationOuterClass.Publication>().getClass());

        try (final Producer<String, PublicationOuterClass.Publication> producer = new KafkaProducer<>(props)) {
            TEST_PUBLICATIONS.stream()
                    .map(publication -> new ProducerRecord<String, PublicationOuterClass.Publication>("publications", publication))
                    .forEach(producer::send);
        }
    }
}

 

package huxihx.kafkastreams.tests;

import com.google.protobuf.Parser;
import huxihx.kafkastreams.proto.PublicationOuterClass;
import huxihx.kafkastreams.serdes.ProtobufDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class TestConsumer {

    public static void main(String[] args) {
        // 爲輸出事件構造protobuf deserializer
        Deserializer<PublicationOuterClass.Publication> deserializer = new ProtobufDeserializer<>();
        Map<String, Parser<PublicationOuterClass.Publication>> config = new HashMap<>();
        config.put("parser", PublicationOuterClass.Publication.parser());
        deserializer.configure(config, false);

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "earliest");
        KafkaConsumer<String, PublicationOuterClass.Publication> consumer = new KafkaConsumer<>(props, new StringDeserializer(), deserializer);
        consumer.subscribe(Arrays.asList("filtered-publications"));
        while (true) {
            ConsumerRecords<String, PublicationOuterClass.Publication> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, PublicationOuterClass.Publication> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

 測試

 首先咱們運行下列命令構建項目:

./gradlew shadowJar

而後啓動Kafka集羣,以後運行Kafka Streams應用:

java -jar build/libs/kstreams-transform-standalone-0.0.1.jar configuration/dev.properties

而後啓動TestProducer發送測試事件:

java -cp build/libs/kstreams-transform-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestProducer

最後啓動TestConsumer驗證Kafka Streams過濾出了指定的Publication消息:

java -cp build/libs/kstreams-transform-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestConsumer

.......

offset = 0, key = null, value = name: "George R. R. Martin"
title: "A Song of Ice and Fire"

offset = 1, key = null, value = name: "George R. R. Martin"
title: "Fire & Blood"

offset = 2, key = null, value = name: "George R. R. Martin"
title: "A Dream of Spring"

offset = 3, key = null, value = name: "George R. R. Martin"
title: "The Ice Dragon"

 總結

下一篇介紹rekey的用法,即實時修改消息的Key值~~

相關文章
相關標籤/搜索