Kafka Streams開發入門(3)

 背景

上一篇咱們介紹了Kafka Streams中的消息過濾操做filter,今天咱們展現一個對消息進行轉換Key的操做,依然是結合一個具體的實例展開介紹。所謂轉換Key是指對流處理中每條消息的Key進行變換操做,以方便後面進行各類groupByKey操做。html

 演示功能說明

本篇演示selectKey的用法,即根據指定的Key選擇邏輯對每條消息的Key進行變換操做。今天使用的輸入topic消息格式以下:java

ID | First Name | Last Name | Phone Numbergit

好比這樣:github

3 | San | Zhang | 13910010000apache

咱們的目標是提取出手機號的號段(好比1391)做爲消息的新Key,而後輸出到一個新的Kafka主題上。bootstrap

 初始化項目  

 建立項目目錄:app

mkdir selectKey-streams
cd selectKey-streams/jvm

 配置項目

在selectKey-streams目錄下建立build.gradle文件,內容以下:maven

buildscript {ide

    repositories {

        jcenter()

    }

    dependencies {

        classpath 'com.github.jengelman.gradle.plugins:shadow:4.0.2'

    }

}

 

plugins {

    id 'java'

}

apply plugin: 'com.github.johnrengelman.shadow'

 

repositories {

    mavenCentral()

    jcenter()

 

    maven {

        url 'http://packages.confluent.io/maven'

    }

}

 

group 'huxihx.kafkastreams'

 

sourceCompatibility = 1.8

targetCompatibility = '1.8'

version = '0.0.1'

 

dependencies {

    implementation 'org.slf4j:slf4j-simple:1.7.26'

    implementation 'org.apache.kafka:kafka-streams:2.3.0'

    testCompile group: 'junit', name: 'junit', version: '4.12'

}

 

jar {

    manifest {

        attributes(

                'Class-Path': configurations.compile.collect { it.getName() }.join(' '),

                'Main-Class': 'huxihx.kafkastreams.SelectKeyStreamsApp'

        )

    }

}

 

shadowJar {

    archiveName = "kstreams-transform-standalone-${version}.${extension}"

}

 而後執行下列命令下載Gradle的wrapper套件:

gradle wrapper

以後在selectKey-streams目錄下建立一個名爲configuration的文件夾用於保存咱們的參數配置文件:

mkdir configuration

建立一個名爲dev.properties的文件:

application.id=selectKey-app
bootstrap.servers=localhost:9092

input.topic.name=nonkeyed-records
input.topic.partitions=1
input.topic.replication.factor=1

output.topic.name=keyed-records
output.topic.partitions=1
output.topic.replication.factor=1

 開發主流程

 建立src/main/java/huxihx/kafkastreams目錄,並在該目錄下建立SelectKeyStreamsApp.java文件:

 

package huxihx.kafkastreams;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;

public class SelectKeyStreamsApp {

    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            throw new IllegalArgumentException("Environment configuration file must be specified.");
        }

        SelectKeyStreamsApp app = new SelectKeyStreamsApp();
        Properties envProps = app.loadEnvProperties(args[0]);
        Properties streamProps = app.buildStreamsProperties(envProps);

        app.preCreateTopics(envProps);

        Topology topology = app.buildTopology(envProps);

        final KafkaStreams streams = new KafkaStreams(topology, streamProps);
        final CountDownLatch latch = new CountDownLatch(1);

        Runtime.getRuntime().addShutdownHook(new Thread("streams-jvm-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Exception e) {
            System.exit(1);
        }
        System.exit(0);
    }

    private Topology buildTopology(Properties envProps) {
        final StreamsBuilder builder = new StreamsBuilder();

        final String inputTopic = envProps.getProperty("input.topic.name");
        final String outputTopic = envProps.getProperty("output.topic.name");

        builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
                .selectKey((noKey, value) -> {
                    String[] fields = value.split("\\|");
                    return fields[fields.length - 1].trim().substring(0, 4);
                })
                .to(outputTopic);
        return builder.build();
    }

    private Properties buildStreamsProperties(Properties envProps) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, envProps.getProperty("application.id"));
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return props;
    }

    private void preCreateTopics(Properties envProps) throws Exception {
        Map<String, Object> config = new HashMap<>();
        config.put("bootstrap.servers", envProps.getProperty("bootstrap.servers"));
        try (AdminClient client = AdminClient.create(config)) {
            Set<String> existingTopics = client.listTopics().names().get();

            List<NewTopic> topics = new ArrayList<>();
            String inputTopic = envProps.getProperty("input.topic.name");
            if (!existingTopics.contains(inputTopic)) {
                topics.add(new NewTopic(inputTopic,
                        Integer.parseInt(envProps.getProperty("input.topic.partitions")),
                        Short.parseShort(envProps.getProperty("input.topic.replication.factor"))));

            }

            String outputTopic = envProps.getProperty("output.topic.name");
            if (!existingTopics.contains(outputTopic)) {
                topics.add(new NewTopic(outputTopic,
                        Integer.parseInt(envProps.getProperty("output.topic.partitions")),
                        Short.parseShort(envProps.getProperty("output.topic.replication.factor"))));
            }

            client.createTopics(topics);
        }
    }

    private Properties loadEnvProperties(String filePath) throws IOException {
        Properties envProps = new Properties();
        try (FileInputStream input = new FileInputStream(filePath)) {
            envProps.load(input);
        }
        return envProps;
    }
}

 測試

首先咱們運行下列命令構建項目:

./gradlew clean shadowJar

而後啓動Kafka集羣,以後運行Kafka Streams應用:

java -jar build/libs/kstreams-transform-standalone-0.0.1.jar configuration/dev.properties

以後啓動一個Console Consumer去測試輸出topic的Key值是否真的設置了咱們的手機號段:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic keyed-records --property print.key=true

最後啓動一個Console Producer按照規定的事件格式去生成對應的消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic nonkeyed-records

>1 | Wang | Wu | 18601234567
>2 | Li | Si | 13901234567
>3 | Zhang | San | 13921234567
>4 | Alice | Joe | 13901234568

若是一切正常,你應該能夠在Console Consumer的輸出中看到:

1860 1 | Wang | Wu | 18601234567
1390 2 | Li | Si | 13901234567
1390 3 | Zhang | San | 13921234567
1390 4 | Alice | Joe | 13901234568

前面的4位數字就是咱們提取的手機號段信息。後面你能夠使用這個Key進行各類groupBy操做,好比統計各個號段的人數等。

總結

不少場合下咱們都須要修改原始消息中的Key值,方便後續進行統計操做。本例演示瞭如何使用selectKey函數方便地對消息Key進行變換。

相關文章
相關標籤/搜索