Kafka Streams的WordCount收到消息後是怎麼處理的呢?

本文將嘗試從Kafka Streams的WordCount應用收到消息後如何處理來分析Kafka Streams的工做原理。java

1 準備工做

1.1 Kafka Streams是什麼

The easiest way to write mission-critical real-time applications and microservices.數據庫

Kafka Streams提供了一個最簡單的,開發實時流處理應用的方式。由於:apache

  • 它是一個jar包而非流處理框架。單獨構建Kafka Streams應用只須要一個jar包;與其餘項目集成也只需引用這個jar包。
  • Kafka Streams只依賴Kafka,輸入數據和輸出數據都存放在Kafka中。

1.2 Kafka 集羣

假設咱們已經有了Zookeeper和Kafka環境,如今須要建立兩個topic:安全

  • source topic,命名爲TextLinesTopic,分區數爲2,WordCount應用消費的topic。
  • target topic,命名爲WordsWithCountsTopic,分區數爲2,WordCount應用處理過的數據會發往這個topic。

建立topic的命令爲:app

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic topic-name

# 建立成功後,命令行顯示Created topic "topic-name".

建立兩個topic後,查看topic列表框架

bin/kafka-topics.sh --zookeeper localhost:2181 --list

# 看到建立的source topic和target topic

2 建立Kafka Streams應用

2.1 官網的WordCount應用

使用Hello Kafka Streams官網提供的入門的WordCount應用,在頁面最下能夠看到官網提供的WordCountApplication,以下(有小改動):dom

// 略去import

public class WordCountApplication {

  private static final String BOOTSTRAP_SERVERS_CONFIG = "127.0.0.1:9092";

  public static void main(String[] args) {

    Properties props = new Properties();

    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    StreamsBuilder builder = new StreamsBuilder();
    // stream & source topic
    KStream<String, String> textLines = builder.stream("TextLinesTopic");

    // table
    KTable<String, Long> wordCounts = textLines

        .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))

        .groupBy((key, word) -> word)

        .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));

    // target topic
    wordCounts

        .toStream()

        .to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));

    Topology toplogy = builder.build();
    // 打印toplogy
    System.out.println(toplogy.describe());

    // 構建KafkaStreams
    KafkaStreams streams = new KafkaStreams(toplogy, props);

    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
      @Override
      public void run() {
        streams.close();
      }
    });

    streams.start();

  }

}

由上邊代碼可見,構建一個KafkaStreams至少須要TopologyProperties兩個參數(KafkaStreams總共提供了7個public的構造函數,3個已經棄用)。ide

2.1.1 Topology

WordCount應用經過StreamsBuilder#build建立了一個Topology。在控制檯把建立的拓撲打印出來,以下:函數

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [TextLinesTopic])
      --> KSTREAM-FLATMAPVALUES-0000000001
    Processor:  KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
      --> KSTREAM-KEY-SELECT-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
      --> KSTREAM-FILTER-0000000005
      <-- KSTREAM-FLATMAPVALUES-0000000001
    Processor: KSTREAM-FILTER-0000000005 (stores: [])
      --> KSTREAM-SINK-0000000004
      <-- KSTREAM-KEY-SELECT-0000000002
    Sink: KSTREAM-SINK-0000000004 (topic: counts-store-repartition)
      <-- KSTREAM-FILTER-0000000005

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000006 (topics: [counts-store-repartition])
      --> KSTREAM-AGGREGATE-0000000003
    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [counts-store])
      --> KTABLE-TOSTREAM-0000000007
      <-- KSTREAM-SOURCE-0000000006
    Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
      --> KSTREAM-SINK-0000000008
      <-- KSTREAM-AGGREGATE-0000000003
    Sink: KSTREAM-SINK-0000000008 (topic: WordsWithCountsTopic)
      <-- KTABLE-TOSTREAM-0000000007

由上可見,這個Topology又分爲兩個子拓撲,分別是子拓撲0和子拓撲1,對應Task0Task1,具體以下: WordCount的拓撲ui

2.1.2 Properties

WordCount應用使用了4個StreamsConfig的配置,分別是:

key value describe
StreamsConfig.APPLICATION_ID_CONFIG wordcount-application 應用名。3個做用:client-id的前綴;相同的應用名組成Kafka Streams集羣;內部topic的前綴
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG 127.0.0.1:9092 kafka地址,多個以","分隔
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG Serdes.String().getClass() key序列和反序列的類
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG Serdes.String().getClass() value序列和反序列的類

StreamsConfig全部的配置可查看Kafka Streams Configs

<!-- 若是來得及的話,加在概念及其餘中。並替換上邊的官網連接 -->

2.1.3 內部topic

WordCount應用啓動後,會建立2個內部topic,分區數

  • wordcount-application-counts-store-repartition,子拓撲0的SinkNode會把通過處理的數據發往這個名爲repartition的內部topic,子拓撲1的SourceNode會從這個名爲repartition的內部topic取到通過子拓撲0處理過的數據。

  • wordcount-application-counts-store-changelog,這個topic會記錄WordCount的聚合結果,changelog topic使用了kafka的Log compaction功能,能夠安全地清除舊數據,以防止topic無限增加。changelog changelog用於Kafka Streams容錯處理

2.2 啓動WordCount應用

啓動一個WordCount應用,查看啓動的日誌

...
stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] Starting
stream-client [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683]Started Streams client
stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] State transition from CREATED to RUNNING
[Consumer clientId=wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer, groupId=wordcount-application] Discovered group coordinator K-PC:9092 (id: 2147483647 rack: null)
[Consumer clientId=wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer, groupId=wordcount-application] Revoking previously assigned partitions []
stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED
stream-client [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683]State transition from RUNNING to REBALANCING
stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] partition revocation took 1 ms.
  suspended active tasks: []
  suspended standby tasks: []
[Consumer clientId=wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer, groupId=wordcount-application] (Re-)joining group
stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer] Assigned tasks to clients as {6f260d17-7347-4e80-94ad-786753f4d683=[activeTasks: ([0_0, 0_1, 1_0, 1_1]) standbyTasks: ([]) assignedTasks: ([0_0, 0_1, 1_0, 1_1]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}.
[Consumer clientId=wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer, groupId=wordcount-application] Successfully joined group with generation 1
[Consumer clientId=wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer, groupId=wordcount-application] Setting newly assigned partitions [wordcount-application-counts-store-repartition-0, TextLinesTopic-1, TextLinesTopic-0, wordcount-application-counts-store-repartition-1]
stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] partition assignment took 31 ms.
  current active tasks: [0_0, 0_1, 1_0, 1_1]
  current standby tasks: []
  previous active tasks: []

stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
stream-client [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683]State transition from REBALANCING to RUNNING

能夠看到:

  • streams應用state的改變:CREATED -> RUNNING -> REBALANCING -> RUNNING
  • streams線程的狀態state的改變:CREATED -> RUNNING -> PARTITIONS_REVOKED -> PARTITIONS_ASSIGNED -> RUNNING
  • 最終應用拿到的任務是:0_0, 0_1, 1_0, 1_1,下劃線前的數字是子拓撲,下劃線後的數字是source topic的分區。

再啓動一個WordCount應用(使用一樣的APPLICATION_ID_CONFIG,與第一個應用組成集羣),控制檯日誌分別輸出:

#第一個WordCount應用日誌
...
[Consumer clientId=wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer, groupId=wordcount-application] Revoking previously assigned partitions [wordcount-application-counts-store-repartition-0, TextLinesTopic-1, TextLinesTopic-0, wordcount-application-counts-store-repartition-1]
stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED
stream-client [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683]State transition from RUNNING to REBALANCING
stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] partition revocation took 277 ms.
  suspended active tasks: [0_0, 0_1, 1_0, 1_1]
  suspended standby tasks: []
[Consumer clientId=wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer, groupId=wordcount-application] (Re-)joining group
stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer] Assigned tasks to clients as {b391295c-1bee-493e-9ebf-ac4ba0302190=[activeTasks: ([1_0, 1_1]) standbyTasks: ([]) assignedTasks: ([1_0, 1_1]) prevActiveTasks: ([]) prevAssignedTasks: ([0_0, 0_1, 1_0, 1_1]) capacity: 1], 6f260d17-7347-4e80-94ad-786753f4d683=[activeTasks: ([0_0, 0_1]) standbyTasks: ([]) assignedTasks: ([0_0, 0_1]) prevActiveTasks: ([0_0, 0_1, 1_0, 1_1]) prevAssignedTasks: ([0_0, 0_1, 1_0, 1_1]) capacity: 1]}.
[Consumer clientId=wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer, groupId=wordcount-application] Successfully joined group with generation 7
[Consumer clientId=wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer, groupId=wordcount-application] Setting newly assigned partitions [TextLinesTopic-1, TextLinesTopic-0]
stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] partition assignment took 229 ms.
  current active tasks: [0_0, 0_1]
  current standby tasks: []
  previous active tasks: [0_0, 0_1, 1_0, 1_1]

stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
stream-client [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683]State transition from REBALANCING to RUNNING


# 第二個WordCount應用的日誌
...
stream-thread [wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1] Starting
stream-client [wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190]Started Streams client
stream-thread [wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1] State transition from CREATED to RUNNING
[Consumer clientId=wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1-consumer, groupId=wordcount-application] Discovered group coordinator K-PC:9092 (id: 2147483647 rack: null)
[Consumer clientId=wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1-consumer, groupId=wordcount-application] Revoking previously assigned partitions []
stream-thread [wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED
stream-client [wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190]State transition from RUNNING to REBALANCING
stream-thread [wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1] partition revocation took 1 ms.
  suspended active tasks: []
  suspended standby tasks: []
[Consumer clientId=wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1-consumer, groupId=wordcount-application] (Re-)joining group
[Consumer clientId=wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1-consumer, groupId=wordcount-application] Successfully joined group with generation 7
[Consumer clientId=wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1-consumer, groupId=wordcount-application] Setting newly assigned partitions [wordcount-application-counts-store-repartition-0, wordcount-application-counts-store-repartition-1]
stream-thread [wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
stream-thread [wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1] partition assignment took 35 ms.
  current active tasks: [1_0, 1_1]
  current standby tasks: []
  previous active tasks: []

stream-thread [wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
stream-client [wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190]State transition from REBALANCING to RUNNING

能夠看到,第二個應用啓動後組成了集羣,新的應用加入後,兩個應用的任務進行了REBALANCE,第一個應用拿到了0_0, 0_1,第二個應用拿到了1_0, 1_1。REBALANCE後第一個應用的子拓撲0執行topic分區0和分區1的任務,子拓撲1沒有拿到任務;第二個應用的子拓撲1執行topic分區0和分區1的任務,子拓撲0沒有拿到任務。以下圖: rebalance

這時,若是一條消息發往source topic的1分區:

  1. 消息會被第一個WordCount應用子拓撲0的SourceNode(KSTREAM-SOURCE-0000000000)拿到
  2. 子拓撲0處理後由子拓撲0的SinkNode(KSTREAM-SINK-0000000004)發回到kafka的reparation分區
  3. 而後被第二個WordCount應用的子拓撲1(2個WordCount應用經過任務REBALANCE後,的SourceNode(KSTREAM-SOURCE-0000000006)拿到
  4. 通過子拓撲1處理由SinkNode(KSTREAM-SINK-0000000008)把最終結果發至target topic。

3 往source topic 發消息

往source topic(TextLinesTopic)發送一條消息,消息的key和value都是"hello world"。

  • 1 消息被子拓撲0的SourceNode(KSTREAM-SOURCE-0000000000)拿到。
  • 2 原始消息被Kstream#flatMapValues處理,對應子拓撲0的ProcessorNode(KSTREAM-FLATMAPVALUES-0000000001)。這個算子(Processor)會把消息的value轉爲小寫格式在進行切分操做(以非字母數字下劃線切分),這些操做並不改變原消息的key。所以,key和value都是"hello world"的消息通過Kstream#flatMapValues處理後,變成key是"hello world",value分別是"hello"和"world"的兩條消息。
  • 3 而後由Kstream#groupBy處理。這個算子等價於前後調用Kstream#selectKeyKstream#groupByKey,分別對應子拓撲0的ProcessorNode(KSTREAM-KEY-SELECT-0000000002)和ProcessorNodeKSTREAM-FILTER-0000000005)。
    • 3.1 Kstream#selectKey,改變消息的key。消息變爲key和value分別是「hello」和「world」的兩條消息。
    • 3.2 Kstream#groupByKey,按key進行分組,相同的key會被分到一組(Kafka Streams進行聚合操做前必須進行分組操做)。
  • 4 消息經過SinkNode(KSTREAM-SINK-0000000004)將子拓撲0處理完的消息發往reparation topic。

接下來的操做由子拓撲1完成。

  • 1 子拓撲1的SourceNode(KSTREAM-SOURCE-0000000006)從reparation topic拿到分組後的消息。
  • 2 由Kstream#count處理,對應子拓撲1的ProcessorNode(KSTREAM-AGGREGATE-0000000003),count操做後消息類型變成KTable
    • 2.1 local state store,將結果存進rocksdb,存儲在本地。
    • 2.2 將結果發往changelog topic,用於容錯處理。
  • 3 ProcessorNode(KTABLE-TOSTREAM-0000000007)把消息類型從KTable轉化爲KStream
  • 4 經過SinkNode(KSTREAM-SINK-0000000008)將子拓撲1 count的消息發往target topic。

因爲時間有限,不少Kafka Streams的概念,好比窗口等本文沒有涉及。

4 概念及其餘

<span id ="jump-to-topology"></span>

4.1 topology

Kafka Streams 經過拓撲定義處理邏輯,拓撲由點和邊組成。

  • 點,分爲3類:SourceNodeSinkNodeProcessorNode
    • SourceNodeSinkNode分別是拓撲的起止。SourceNode從Kafka的source topic中取消息給ProcessorNode處理,SinkNodeProcessorNode處理完的結果發往Kafka的target topic;
    • Processor負責處理流數據。分爲Kafka Streams DSLProcessor API兩類,前者提供經常使用的數據操做,如map、filter、join、aggregations;後者能夠由開發者本身定製開發。
  • 邊,數據流向。

<span id ="jump-to-state"></span>

4.2 state

Stream thread的states

+-------------+
        +<--- | Created (0) |
        |     +-----+-------+
        |           |
        |           v
        |     +-----+-------+
        +<--- | Running (1) | <----+
        |     +-----+-------+      |
        |           |              |
        |           v              |
        |     +-----+-------+      |
        +<--- | Partitions  |      |
        |     | Revoked (2) | <----+
        |     +-----+-------+      |
        |           |              |
        |           v              |
        |     +-----+-------+      |
        |     | Partitions  |      |
        |     | Assigned (3)| ---->+
        |     +-----+-------+
        |           |
        |           v
        |     +-----+-------+
        +---> | Pending     |
              | Shutdown (4)|
              +-----+-------+
                    |
                    v
              +-----+-------+
              | Dead (5)    |
              +-------------+

Kafka Streams的states

+--------------+
        +<----- | Created (0)  |
        |       +-----+--------+
        |             |
        |             v
        |       +--------------+
        +<----- | Running (2)  | -------->+
        |       +----+--+------+          |
        |            |  ^                 |
        |            v  |                 |
        |       +----+--+------+          |
        |       | Re-          |          v
        |       | Balancing (1)| -------->+
        |       +-----+--------+          |
        |             |                   |
        |             v                   v
        |       +-----+--------+     +----+-------+
        +-----> | Pending      |<--- | Error (5)  |
                | Shutdown (3) |     +------------+
                +-----+--------+
                     |
                      v
                +-----+--------+
                | Not          |
                | Running (4)  |
                +--------------+

<span id = "jump-to-ktable"></span>

4.3 KTable與KStream

KStream,數據流,全部數據經過insert only的方式加入到這個數據流中。

Ktable,數據集,像是數據庫中的表,數據以update only的方式加入。

4.4 Kafka發送消息時,如何肯定分區

源碼KafkaProducer#doSend在發送消息時,經過partition方法得到分區。

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
      ...
            int partition = partition(record, serializedKey, serializedValue, cluster);
      ...
    }

kafka提供了一個默認的partition實現DefaultPartitioner#partition

/**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }

上述源碼能夠看到:

  • 消息key是null,topic可用分區大於0時,使用根據topic獲取的nextValue的值和可用分區數進行取模操做。
  • 消息key是null,topic可用分區小於等於0時,獲取根據topic獲取的nextValue的值和總分區數進行取模操做(give a non-available partition,給了個不可用的分區)。
  • 消息key不是null時,對key進行hash後和topic的分區數取模。能夠保證topic的分區數不變的狀況下,相同的key每次都發往同一個分區。
相關文章
相關標籤/搜索