本文由 GodPan 發表在 ScalaCool 團隊博客。java
看完上一篇,相信你們對消息系統以及Kafka的總體構成都有了初步瞭解,學習一個東西最好的辦法,就是去使用它,今天就讓咱們一塊兒窺探一下Kafka,並完成本身的處女做。git
雖然咱們掌握東西要一步一步來,可是咱們在大體瞭解了一個東西后,會有利於咱們對它的理解和學習,因此咱們能夠先來看一下一條消息從發出到最後被消息者接收到底經歷了什麼?github
上圖簡要的說明了消息在Kafka中的整個流轉過程(假設已經部署好了整個Kafka系統,並建立了相應的Topic,分區等細節後續再單獨講):算法
總的來講,怎麼流程仍是比較清晰和簡單的,下面就跟我一塊兒來練習Kafka的基本操做,最後實現一個單詞計數的小demo。shell
如下代碼及相應測試在如下環境測試經過:Mac OS + JDK1.8,Linux系統應該也能跑通,Windows有興趣的同窗能夠去官網下載相應版本進行相應的測試練習。apache
Mac系統同窗可使用brew安裝:bootstrap
brew install kafka
複製代碼
Linux系統同窗能夠從官網下載源碼解壓,也能夠直接執行如下命令:bash
cd
mkdir test-kafka && cd test-kafka
curl -o kafka_2.11-1.0.1.tgz http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.1/kafka_2.11-1.0.1.tgz
tar -xzf kafka_2.11-1.0.1.tgz
cd kafka_2.11-1.0.1
複製代碼
Kafka使用Zookeeper來維護集羣信息,因此這裏咱們先要啓動Zookeeper,Kafka與Zookeeper的相關聯繫跟結合後續再深刻了解,畢竟不能一口吃成一個胖子。curl
bin/zookeeper-server-start.sh config/zookeeper.properties
複製代碼
接着咱們啓動一個Kafka Server節點:函數
bin/kafka-server-start.sh config/server.properties
複製代碼
這時候Kafka系統已經算是啓動起來了。
在一切就緒以後,咱們要開始作極其重要的一步,那就是建立Topic,Topic是整個系統流轉的核心,另外Topic自己也包含着不少複雜的參數,好比複製因子個數,分區個數等,這裏爲了從簡,咱們將對應的參數都設爲1,方便你們測試:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kakfa-test
複製代碼
其中參數的具體含義:
屬性 | 功能 |
---|---|
--create | 表明建立Topic |
--zookeeper | zookeeper集羣信息 |
--replication-factor | 複製因子 |
--partitions | 分區信息 |
--topic | Topic名稱 |
這時候咱們已經建立好了一個叫kakfa-test的Topic了。
在有了Topic後咱們就能夠向其發送消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kakfa-test
複製代碼
而後咱們向控制檯輸入一些消息:
this is my first test kafka
so good
複製代碼
這時候消息已經被髮布在kakfa-test這個主題上了。
如今Topic上已經有消息了,如今能夠從中獲取消息被消費:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-test --from-beginning
複製代碼
這時候咱們能夠在控制檯看到:
this is my first test kafka
so good
複製代碼
至此咱們就測試了最簡單的Kafka Demo,但願你們能本身動手去試試,雖然很簡單,可是這能讓你對整個Kafka流程能更熟悉。
下面咱們來利用上面的一些基本操做來實現一個簡單WordCount程序,它具有如下功能:
與上文的啓動同樣,按照其操做便可。
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic kafka-word-count-input --partitions 1 --replication-factor 1
複製代碼
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-word-count-input
複製代碼
這部份內容是整個例子的核心,這部分代碼有Java 8+和Scala版本,我的認爲流處理用函數式語法表達的更加簡潔清晰,推薦你們用函數式的思惟去嘗試寫如下,發現本身不再想寫Java匿名內部類這種語法了。
咱們先來看一個Java 8的版本:
public class WordCount {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-word-count");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.<String, String>stream("kafka-word-count-input");
Pattern pattern = Pattern.compile("\\W+");
source
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase(Locale.getDefault()))))
.groupBy((key, value) -> value)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")).mapValues(value->Long.toString(value))
.toStream()
.to("kafka-word-count-output");
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
複製代碼
是否是很驚訝,用java也能寫出如此簡潔的代碼,因此說若是有適用場景,推薦你們嘗試的用函數式的思惟去寫寫java代碼。
咱們再來看看Scala版本的:
object WordCount {
def main(args: Array[String]) {
val props: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-word-count")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass)
p
}
val builder: StreamsBuilder = new StreamsBuilder()
val source: KStream[String, String] = builder.stream("kafka-word-count-input")
source
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
.groupBy((_, word) => word)
.count(Materialized.as[String, Long, KeyValueStore[Bytes, Array[Byte]]]("counts-store")).toStream.to("kafka-word-count-output")
val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
streams.start()
}
}
複製代碼
能夠發現使用Java 8函數式風格編寫的代碼已經跟Scala很類似了。
不少同窗電腦上並無裝sbt,因此這裏演示的利用Maven構建的Java版本,具體執行步驟請參考戳這裏kafka-word-count上的說明。
最後咱們啓動消費者進程,並在生產者中輸入一些單詞,好比:
最後咱們能夠在消費者進程中看到如下輸出:
bin/kafka-console-consumer.sh --topic kafka-word-count-output --from-beginning --bootstrap-server localhost:9092 --property print.key=true
複製代碼
本篇文章主要是講解了Kafka的基本運行過程和一些基礎操做,但這是咱們學習一個東西必不可少的一步,只有把基礎紮實好,才能更深刻的去了解它,理解它爲何這麼設計,我在這個過程當中也遇到不少麻煩,因此仍是但願你們可以本身動手去實踐一下,最終能收穫更多。