Kafka Streams是一個客戶端庫,用於構建任務關鍵型實時應用程序和微服務,其中輸入和輸出數據存儲在Kafka集羣中。Kafka Streams結合了在客戶端編寫和部署標準Java和Scala應用程序的簡單性以及Kafka服務器端集羣技術的優點,使這些應用程序具備高度可擴展性,彈性,容錯性,分佈式等等。java
如下是WordCountDemo示例代碼的要點(爲了方便閱讀,使用的是java8 lambda表達式)。git
步驟:github
1.啓動zk和kafka算法
> bin/zookeeper-server-start.sh config/zookeeper.propertiesapache
> bin/kafka-server-start.sh config/server.propertiesbootstrap
建立名爲streams-plaintext-input的輸入主題和名爲streams-wordcount-output的輸出主題:服務器
> bin/kafka-topics.sh --create \分佈式
--zookeeper localhost:2181 \微服務
--replication-factor 1 \工具
--partitions 1 \
--topic streams-plaintext-input
Created topic "streams-plaintext-input".
注意:咱們建立輸出主題並啓用壓縮,由於輸出流是更改日誌流
> bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-wordcount-output \
--config cleanup.policy=compact
Created topic "streams-wordcount-output".
使用相同的kafka-topics工具描述建立的主題:
> bin/kafka-topics.sh --zookeeper localhost:2181 --describe
> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
演示應用程序將從輸入主題stream-plaintext-input讀取,對每一個讀取消息執行WordCount算法的計算,並將其當前結果連續寫入輸出主題streams-wordcount-output。
開啓一個生產者終端:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
streams-plaintext-input
all streams lead to kafka
開啓一個消費者終端:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
all 1
streams 1
lead 1
to 1
kafka 1
這裏,第一列是java.lang.String格式的Kafka消息鍵,表示正在計數的單詞,第二列是java.lang.Long格式的消息值,表示單詞的最新計數。