【Kafka】- KafkaStream wordcount 案例

kafka Stream


簡介

Kafka自身提供的流式數據處理工具,輕量級java


案例

Wordcountapache

package com.zhiwei.kafka.streams;

import com.zhiwei.util.PropertyUtils;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;

/**
 * wordcount 案例
 */
public class WordCountApplication {

    public static void main(final String[] args) throws Exception {

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "centos:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        /**
         * 指定數據源:topic
         * key: String
         * value: String
         */
        KStream<String, String> source = builder.stream("kafkaStreamSourceTopic");

        //分詞,構建多個key-value記錄
        source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
                .groupBy((key, value) -> value) //kafkaStream key分組,相似Hadoop MR的Reduce過程
                .count(Materialized.as("counts-store")) //計算group: key對應value的數量,null會被會忽略
                .toStream() //KafkaStream新的流構建
                .to("kafkaStreamTargetTopic", Produced.with(Serdes.String(), Serdes.Long())); //輸出目標數據源,key:String, value:Long

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        streams.start();
    }
}

消費者bootstrap

package com.zhiwei.kafka.streams;

import com.zhiwei.util.PropertyUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * 訂閱wordcount輸出主題
 *
 * 注意:Kafka消費者消費的key\value 編碼、解碼器須與KafkaStream輸出格式保持一致不然出現亂碼
 */
@Slf4j
public class KafkaStreamMsgConsumer {

    public static void main(String[] args) throws IOException {
        new KafkaStreamMsgConsumer().consume();
    }

    public void consume() throws IOException {

        KafkaConsumer<String, Long> consumer = null;

        try {
            Properties props = new Properties();
            props.put("bootstrap.servers", "centos:9092");
            props.put("group.id", "myConsumerGroup");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");

            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("kafkaStreamTargetTopic"));

            while (true) {

                ConsumerRecords<String, Long> records = consumer.poll(Duration.ofMillis(1));
                for (ConsumerRecord<String, Long> record : records) {
                   log.info("key: {},value:{}", record.key(), record.value());
                }
            }
        }  finally {
            if (consumer != null) {
                consumer.close();
            }
        }
    }
}

測試

相關文章
相關標籤/搜索