Kafka自身提供的流式數據處理工具,輕量級java
Wordcountapache
package com.zhiwei.kafka.streams; import com.zhiwei.util.PropertyUtils; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.state.KeyValueStore; import java.util.Arrays; import java.util.Locale; import java.util.Properties; /** * wordcount 案例 */ public class WordCountApplication { public static void main(final String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "centos:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); /** * 指定數據源:topic * key: String * value: String */ KStream<String, String> source = builder.stream("kafkaStreamSourceTopic"); //分詞,構建多個key-value記錄 source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) .groupBy((key, value) -> value) //kafkaStream key分組,相似Hadoop MR的Reduce過程 .count(Materialized.as("counts-store")) //計算group: key對應value的數量,null會被會忽略 .toStream() //KafkaStream新的流構建 .to("kafkaStreamTargetTopic", Produced.with(Serdes.String(), Serdes.Long())); //輸出目標數據源,key:String, value:Long final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); streams.start(); } }
消費者bootstrap
package com.zhiwei.kafka.streams; import com.zhiwei.util.PropertyUtils; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import java.io.IOException; import java.time.Duration; import java.util.Arrays; import java.util.Properties; /** * 訂閱wordcount輸出主題 * * 注意:Kafka消費者消費的key\value 編碼、解碼器須與KafkaStream輸出格式保持一致不然出現亂碼 */ @Slf4j public class KafkaStreamMsgConsumer { public static void main(String[] args) throws IOException { new KafkaStreamMsgConsumer().consume(); } public void consume() throws IOException { KafkaConsumer<String, Long> consumer = null; try { Properties props = new Properties(); props.put("bootstrap.servers", "centos:9092"); props.put("group.id", "myConsumerGroup"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.LongDeserializer"); consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("kafkaStreamTargetTopic")); while (true) { ConsumerRecords<String, Long> records = consumer.poll(Duration.ofMillis(1)); for (ConsumerRecord<String, Long> record : records) { log.info("key: {},value:{}", record.key(), record.value()); } } } finally { if (consumer != null) { consumer.close(); } } } }