本教程假定你第一次,且沒有搭建現有的Kafka或ZooKeeper。可是,若是你已經啓動了Kafka和ZooKeeper,請跳過前兩個步驟。java
Kafka Streams結合了在客戶端編寫和部署標準Java和Scala應用程序的簡單性以及Kafka服務器端集羣技術的優點,使這些應用程序具備高度可伸縮性,彈性,容錯性,分佈式等特性。算法
這個快速入門示例將演示如何運行一個流應用程序。一個WordCountDemo
的例子(爲了方便閱讀,使用的是java8 lambda表達式)apache
// Serializers/deserializers (serde) for String and Long types final Serde<String> stringSerde = Serdes.String(); final Serde<Long> longSerde = Serdes.Long(); // Construct a `KStream` from the input topic "streams-plaintext-input", where message values // represent lines of text (for the sake of this example, we ignore whatever may be stored // in the message keys). KStream<String, String> textLines = builder.stream("streams-plaintext-input", Consumed.with(stringSerde, stringSerde); KTable<String, Long> wordCounts = textLines // Split each text line, by whitespace, into words. .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) // Group the text words as message keys .groupBy((key, value) -> value) // Count the occurrences of each word (message key). .count() // Store the running counts as a changelog stream to the output topic. wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
從輸入的文本計算出一個詞出現的次數。可是,不像其餘的WordCount
的例子,你可能會看到,在有限的數據基礎上,執行的演示應用程序的行爲略有不一樣,由於它應該是在一個無限數據的操做,數據流。相似的有界變量,它是一種動態算法,跟蹤和更新的單詞計數。然而,因爲它必須假設潛在的無界輸入數據,它會按期輸出其當前狀態和結果,同時繼續處理更多的數據,由於它不知道何時它處理過的「全部」的輸入數據。bootstrap
做爲第一步,咱們將啓動Kafka,而後咱們將輸入數據準備到Kafka主題,而後由Kafka Streams應用程序處理。服務器
下載1.1.0版本
並解壓它。注意,有多個可下載的Scala版本,咱們選擇在這裏使用推薦版本(2.11):微信
> tar -xzf kafka_2.11-1.1.0.tgz > cd kafka_2.11-1.1.0
Kafka使用Zookeeper,因此第一步啓動Zookeeper服務。運維
> bin/zookeeper-server-start.sh config/zookeeper.properties [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ...
如今啓動 Kafka server:socket
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) ...
接下來,咱們建立一個輸入主題「streams-plaintext-input」,和一個輸出主題"streams-wordcount-output":分佈式
> bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic streams-plaintext-input Created topic "streams-plaintext-input".
注意:由於輸出主題是更新日誌流(參見下面的應用程序輸出的說明),因此咱們爲輸出主題啓用了壓縮
。工具
> bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic streams-wordcount-output \ --config cleanup.policy=compact Created topic "streams-wordcount-output".
也可使用kafka topic工具查看主題描述:
> bin/kafka-topics.sh --zookeeper localhost:2181 --describe Topic:streams-plaintext-input PartitionCount:1 ReplicationFactor:1 Configs: Topic: streams-plaintext-input Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic:streams-wordcount-output PartitionCount:1 ReplicationFactor:1 Configs: Topic: streams-wordcount-output Partition: 0 Leader: 0 Replicas: 0 Isr: 0
如下命令啓動WordCount演示程序:
> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
演示程序將從輸入主題streams-plaintext-input
中讀取,對每一個讀取消息執行WordCount算法
計算,並將其當前結果連續寫入輸出主題streams-wordcount-output
。 所以,除了日誌條目外,不會有任何STDOUT輸出,由於結果會寫回到Kafka中。
如今咱們另外開一個終端,來啓動生產者來爲該主題寫入一些輸入數據:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
在開一個終端,讀取輸出主題的數據。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
如今,咱們經過輸入一行文本而後按,生產一些新的消息到輸入主題streams-plaintext-input
。其中消息key爲空,消息value爲剛剛輸入的字符串編碼文本行(實際上,應用程序的輸入數據一般會連續流入Kafka,而不是 像咱們在這個快速入門中那樣手動輸入):
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input all streams lead to kafka
這些消息將被Wordcount程序處理,而後輸出數據到streams-wordcount-output
主題中,咱們新打開一個命令窗口,輸出消費者:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer all 1 streams 1 lead 1 to 1 kafka 1
這裏,第一列是java.lang.String格式的Kafka消息key,表示正在計數的單詞,第二列是java.lang.Longformat中的消息value,表示該單詞的最新計數。
如今,用生產者繼續往streams-plaintext-input主題中發消息,輸入"hello kafka streams",而後:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input all streams lead to kafka hello kafka streams
在消費者命令窗口,你能夠觀察WordCount程序寫入到輸出主題的數據:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer all 1 streams 1 lead 1 to 1 kafka 1 hello 1 kafka 2 streams 2
在這裏,最後一行打印行kafka 2
和streams 2
表示計數已經從1遞增到2。每當你向輸入主題寫入更多的輸入消息時,你將觀察到新的消息被添加到streams-wordcount-output
主題,表示由WordCount應用程序計算出的最新字數。讓咱們輸入一個最終的輸入文本行「join kafka summit」,而後在控制檯生產者中輸入主題streams-wordcount-input以前的:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input all streams lead to kafka hello kafka streams join kafka summit
streams-wordcount-output主題隨後將顯示相應的更新變化(請參見最後三行):
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer all 1 streams 1 lead 1 to 1 kafka 1 hello 1 kafka 2 streams 2 join 1 kafka 3 summit 1
能夠看到,Wordcount應用程序的輸出其實是一個連續的更新流,其中每一個輸出記錄(即上面原始輸出中的每一行)是單個單詞的更新計數,也就是諸如「kafka」的記錄關鍵字。 對於具備相同密鑰的多個記錄,每一個後面的記錄都是前一個記錄的更新。
下面的兩張圖說明了幕後發生的事情。第一列顯示KTable <string,long>當前狀態的演變,它計數count的單詞出現次數。 第二列顯示從KTable的狀態更新以及發送到輸出主題streams-wordcount-output的更改記錄。
首先正在處理文本行「all streams lead to kafka」。KTable正在創建,由於每一個新單詞都會生成一個新表格(用綠色背景突出顯示),並將相應的更改記錄發送到下游KStream。
當處理第二行文本「hello kafka streams」時,咱們首次觀察到KTable中現有的條目正在被更新(這裏是:「kafka」和「streams」)。 再次,更改記錄發送到輸出主題。
(咱們跳過了第三行如何處理的說明)。這解釋了爲何輸出主題具備咱們上面顯示的內容,由於它包含完整的變動記錄。
在這個例子的範圍以外,Kafka Streams在這裏作的是利用表和變動日誌流之間的對偶性(這裏:table = KTable,changelog stream =下游KStream):你能夠發佈table轉換爲流,而且若是你從頭至尾使用整個變動日誌流,則能夠從新構建表的內容。
最後,經過Ctrl-C
中止控制檯消費者,生產者,Wordcount程序,Kafka Broker和Zokeeper服務。
本文轉發自 http://orchome.com/936
關於Kafka深刻學習視頻, 如Kafka領導選舉, offset管理, Streams接口, 高性能之道, 監控運維, 性能測試等,
請關注我的微信公衆號: 求學之旅, 發送Kafka, 便可收穫Kafka學習視頻大禮包一枚。