Kafka詳解與總結(二)

Kafka Stream

Kafka Streams是一個客戶端庫,用於構建任務關鍵型實時應用程序和微服務,其中輸入和輸出數據存儲在Kafka集羣中。Kafka Streams結合了在客戶端編寫和部署標準Java和Scala應用程序的簡單性以及Kafka服務器端集羣技術的優點,使這些應用程序具備高度可擴展性,彈性,容錯性,分佈式等等。java

如下是WordCountDemo示例代碼的要點(爲了方便閱讀,使用的是java8 lambda表達式)。git

步驟:github

1.啓動zk和kafka算法

> bin/zookeeper-server-start.sh config/zookeeper.propertiesapache

> bin/kafka-server-start.sh config/server.propertiesbootstrap

 

  1. 準備輸入主題並啓動生產者

建立名爲streams-plaintext-input的輸入主題和名爲streams-wordcount-output的輸出主題:服務器

> bin/kafka-topics.sh --create \分佈式

    --zookeeper localhost:2181 \微服務

    --replication-factor 1 \工具

    --partitions 1 \

    --topic streams-plaintext-input

Created topic "streams-plaintext-input".

注意:咱們建立輸出主題並啓用壓縮,由於輸出流是更改日誌流

> bin/kafka-topics.sh --create \

    --zookeeper localhost:2181 \

    --replication-factor 1 \

    --partitions 1 \

    --topic streams-wordcount-output \

    --config cleanup.policy=compact

Created topic "streams-wordcount-output".

使用相同的kafka-topics工具描述建立的主題:

> bin/kafka-topics.sh --zookeeper localhost:2181 --describe

 

  1. 啓動Wordcount應用程序

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

演示應用程序將從輸入主題stream-plaintext-input讀取,對每一個讀取消息執行WordCount算法的計算,並將其當前結果連續寫入輸出主題streams-wordcount-output

 

  1. 處理數據

開啓一個生產者終端:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic

 streams-plaintext-input

all streams lead to kafka

開啓一個消費者終端:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \

    --topic streams-wordcount-output \

    --from-beginning \

    --formatter kafka.tools.DefaultMessageFormatter \

    --property print.key=true \

    --property print.value=true \

    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \

--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

all     1

streams 1

lead    1

to      1

kafka   1

這裏,第一列是java.lang.String格式的Kafka消息鍵,表示正在計數的單詞,第二列是java.lang.Long格式的消息值,表示單詞的最新計數。

相關文章
相關標籤/搜索