Kafka 學習筆記(二) :初探 Kafka

本文由 GodPan 發表在 ScalaCool 團隊博客。java

看完上一篇,相信你們對消息系統以及Kafka的總體構成都有了初步瞭解,學習一個東西最好的辦法,就是去使用它,今天就讓咱們一塊兒窺探一下Kafka,並完成本身的處女做。git

消息在Kafka中的歷程

雖然咱們掌握東西要一步一步來,可是咱們在大體瞭解了一個東西后,會有利於咱們對它的理解和學習,因此咱們能夠先來看一下一條消息從發出到最後被消息者接收到底經歷了什麼?github

message-flow

上圖簡要的說明了消息在Kafka中的整個流轉過程(假設已經部署好了整個Kafka系統,並建立了相應的Topic,分區等細節後續再單獨講):算法

  • 1.消息生產者將消息發佈到具體的Topic,根據必定算法或者隨機被分發到具體的分區中;
  • 2.根據實際需求,是否須要實現處理消息邏輯;
  • 3.若須要,則實現具體邏輯後將結果發佈到輸出Topic;
  • 4.消費者根據需求訂閱相關Topic,並消費消息;

總的來講,怎麼流程仍是比較清晰和簡單的,下面就跟我一塊兒來練習Kafka的基本操做,最後實現一個單詞計數的小demo。shell

基礎操做

如下代碼及相應測試在如下環境測試經過:Mac OS + JDK1.8,Linux系統應該也能跑通,Windows有興趣的同窗能夠去官網下載相應版本進行相應的測試練習。apache

下載Kafka

Mac系統同窗可使用brew安裝:bootstrap

brew install kafka
複製代碼

Linux系統同窗能夠從官網下載源碼解壓,也能夠直接執行如下命令:bash

cd 
mkdir test-kafka && cd test-kafka
curl -o kafka_2.11-1.0.1.tgz http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.1/kafka_2.11-1.0.1.tgz
tar -xzf kafka_2.11-1.0.1.tgz
cd kafka_2.11-1.0.1

複製代碼

啓動

Kafka使用Zookeeper來維護集羣信息,因此這裏咱們先要啓動Zookeeper,Kafka與Zookeeper的相關聯繫跟結合後續再深刻了解,畢竟不能一口吃成一個胖子。curl

bin/zookeeper-server-start.sh config/zookeeper.properties
複製代碼

接着咱們啓動一個Kafka Server節點:函數

bin/kafka-server-start.sh config/server.properties
複製代碼

這時候Kafka系統已經算是啓動起來了。

建立Topic

在一切就緒以後,咱們要開始作極其重要的一步,那就是建立Topic,Topic是整個系統流轉的核心,另外Topic自己也包含着不少複雜的參數,好比複製因子個數,分區個數等,這裏爲了從簡,咱們將對應的參數都設爲1,方便你們測試:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kakfa-test
複製代碼

其中參數的具體含義:

屬性 功能
--create 表明建立Topic
--zookeeper zookeeper集羣信息
--replication-factor 複製因子
--partitions 分區信息
--topic Topic名稱

這時候咱們已經建立好了一個叫kakfa-test的Topic了。

向Topic發送消息

在有了Topic後咱們就能夠向其發送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kakfa-test
複製代碼

而後咱們向控制檯輸入一些消息:

this is my first test kafka
so good
複製代碼

這時候消息已經被髮布在kakfa-test這個主題上了。

從Topic獲取消息

如今Topic上已經有消息了,如今能夠從中獲取消息被消費:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-test --from-beginning
複製代碼

這時候咱們能夠在控制檯看到:

this is my first test kafka
so good
複製代碼

至此咱們就測試了最簡單的Kafka Demo,但願你們能本身動手去試試,雖然很簡單,可是這能讓你對整個Kafka流程能更熟悉。

WordCount

下面咱們來利用上面的一些基本操做來實現一個簡單WordCount程序,它具有如下功能:

  • 1.支持詞組持續輸入,即生產者不斷生成消息;
  • 2.程序自動從輸入Topic中獲取原始數據,而後通過處理,將處理結果發佈在計數Topic中;
  • 3.消費者能夠從計數Topic獲取相應的WordCount的結果;

1.啓動kafka

與上文的啓動同樣,按照其操做便可。

2.建立輸入Topic

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic kafka-word-count-input --partitions 1 --replication-factor 1
複製代碼

3.向Topic輸入消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic kafka-word-count-input
複製代碼

4.流處理邏輯

這部份內容是整個例子的核心,這部分代碼有Java 8+和Scala版本,我的認爲流處理用函數式語法表達的更加簡潔清晰,推薦你們用函數式的思惟去嘗試寫如下,發現本身不再想寫Java匿名內部類這種語法了。

咱們先來看一個Java 8的版本:

public class WordCount {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-word-count");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> source = builder.<String, String>stream("kafka-word-count-input");
        Pattern pattern = Pattern.compile("\\W+");
        source
           .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase(Locale.getDefault()))))
           .groupBy((key, value) -> value)
           .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")).mapValues(value->Long.toString(value))
           .toStream()
           .to("kafka-word-count-output");
        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}
複製代碼

是否是很驚訝,用java也能寫出如此簡潔的代碼,因此說若是有適用場景,推薦你們嘗試的用函數式的思惟去寫寫java代碼。

咱們再來看看Scala版本的:

object WordCount {
  def main(args: Array[String]) {
    val props: Properties = {
      val p = new Properties()
      p.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-word-count")
      p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
      p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)
      p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass)
      p
    }

    val builder: StreamsBuilder = new StreamsBuilder()
    val source: KStream[String, String] = builder.stream("kafka-word-count-input")
    source
      .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava)
      .groupBy((_, word) => word)
      .count(Materialized.as[String, Long, KeyValueStore[Bytes, Array[Byte]]]("counts-store")).toStream.to("kafka-word-count-output")
    val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
    streams.start()
  }
}
複製代碼

能夠發現使用Java 8函數式風格編寫的代碼已經跟Scala很類似了。

5.啓動處理邏輯

不少同窗電腦上並無裝sbt,因此這裏演示的利用Maven構建的Java版本,具體執行步驟請參考戳這裏kafka-word-count上的說明。

6.啓動消費者進程

最後咱們啓動消費者進程,並在生產者中輸入一些單詞,好比:

kafka-word-count-input

最後咱們能夠在消費者進程中看到如下輸出:

bin/kafka-console-consumer.sh --topic kafka-word-count-output --from-beginning --bootstrap-server localhost:9092  --property print.key=true
複製代碼

kafka-word-count-output

總結

本篇文章主要是講解了Kafka的基本運行過程和一些基礎操做,但這是咱們學習一個東西必不可少的一步,只有把基礎紮實好,才能更深刻的去了解它,理解它爲何這麼設計,我在這個過程當中也遇到不少麻煩,因此仍是但願你們可以本身動手去實踐一下,最終能收穫更多。

相關文章
相關標籤/搜索