Kafka Stream是Apache Kafka從0.10版本引入的一個新Feature,它提供了對存儲於Kafka內的數據進行流式處理和分析的功能。簡而言之,Kafka Stream就是一個用來作流計算的類庫,與Storm、Spark Streaming、Flink的做用相似,但要輕量得多。java
Kafka Stream的基本概念:數據庫
Kafka Stream的高層架構圖:
apache
Kafka Stream關鍵詞:bootstrap
以下圖所示:
bash
下圖是Kafka Stream完整的高層架構圖:
服務器
從上圖中能夠看到,Consumer
對一組Partition
進行消費,這組Partition
能夠在一個Topic中或多個Topic中。而後造成數據流,通過各個流處理器後最終經過Producer
輸出到一組Partition
中,一樣這組Partition
也能夠在一個Topic中或多個Topic中。這個過程就是數據流的輸入和輸出。架構
所以,咱們在使用Stream API前須要先建立兩個Topic,一個做爲輸入,一個做爲輸出。到服務器上使用命令行建立兩個Topic:app
[root@txy-server2 ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic input-topic [root@txy-server2 ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic output-topic
因爲以前依賴的kafka-clients
包中沒有Stream API,因此須要另外引入Stream的依賴包。在項目中添加以下依賴:ide
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.5.0</version> </dependency>
接下來以一個經典的詞頻統計爲例,演示一下Stream API的使用。代碼示例:測試
package com.zj.study.kafka.stream; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; import java.util.List; import java.util.Properties; public class StreamSample { private static final String INPUT_TOPIC = "input-topic"; private static final String OUTPUT_TOPIC = "output-topic"; /** * 構建配置屬性 */ public static Properties getProperties() { Properties properties = new Properties(); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "49.232.153.84:9092"); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app"); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return properties; } public static KafkaStreams createKafkaStreams() { Properties properties = getProperties(); // 構建流結構拓撲 StreamsBuilder builder = new StreamsBuilder(); // 構建wordCount這個Processor wordCountStream(builder); Topology topology = builder.build(); // 構建KafkaStreams return new KafkaStreams(topology, properties); } /** * 定義流計算過程 * 例子爲詞頻統計 */ public static void wordCountStream(StreamsBuilder builder) { // 不斷的從INPUT_TOPIC上獲取新的數據,並追加到流上的一個抽象對象 KStream<String, String> source = builder.stream(INPUT_TOPIC); // KTable是數據集的抽象對象 KTable<String, Long> count = source.flatMapValues( // 以空格爲分隔符將字符串進行拆分 v -> List.of(v.toLowerCase().split(" ")) // 按value進行分組統計 ).groupBy((k, v) -> v).count(); KStream<String, Long> sink = count.toStream(); // 將統計結果輸出到OUTPUT_TOPIC sink.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long())); } public static void main(String[] args) { KafkaStreams streams = createKafkaStreams(); // 啓動該Stream streams.start(); } }
KTable
與KStream
的關係與區別,以下圖:
KTable
相似於一個時間片斷,在一個時間片斷內輸入的數據就會update進去,以這樣的形式來維護這張表KStream
則沒有update這個概念,而是不斷的追加運行以上代碼,而後到服務器中使用kafka-console-producer.sh
腳本命令向input-topic
生產一些數據,以下:
[root@txy-server2 ~]# kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic input-topic >Hello World Java >Hello World Kafka >Hello Java Kafka >Hello Java
而後再運行kafka-console-consumer.sh
腳本命令從output-topic
中消費數據,並進行打印。具體以下:
[root@txy-server2 ~]# kafka-console-consumer.sh --bootstrap-server 172.21.0.10:9092 --topic output-topic --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
控制檯輸出的結果:
world 2 hello 3 java 2 kafka 2 hello 4 java 3
從輸出結果中能夠看到,Kafka Stream首先是對前三行語句進行了一次詞頻統計,因此前半段是:
world 2 hello 3 java 2 kafka 2
當最後一行輸入以後,又再作了一次詞頻統計,並針對新的統計結果進行輸出,其餘沒有變化的則不做輸出,因此最後打印了:
hello 4 java 3
這也是KTable
和KStream
的一個體現,從測試的結果能夠看出Kafka Stream是實時進行流計算的,而且每次只會針對有變化的內容進行輸出。
在以前的例子中,咱們是從某個Topic讀取數據進行流處理後再輸出到另外一個Topic裏。但在一些場景下,咱們可能不但願將結果數據輸出到Topic,而是寫入到一些存儲服務中,例如ElasticSearch、MongoDB、MySQL等。
在這種場景下,就能夠利用到foreach
方法,該方法用於迭代流中的元素。咱們能夠在foreach
中將數據存入例如Map、List等容器,而後再批量寫入到數據庫或其餘存儲中間件便可。
foreach
方法使用示例:
public static void foreachStream(StreamsBuilder builder) { KStream<String, String> source = builder.stream(INPUT_TOPIC); source.flatMapValues( v -> List.of(v.toLowerCase().split(" ")) ).foreach((k, v) -> System.out.println(k + " : " + v)); }