本文將嘗試從Kafka Streams的WordCount應用收到消息後如何處理來分析Kafka Streams的工做原理。java
1 準備工做
1.1 Kafka Streams是什麼
The easiest way to write mission-critical real-time applications and microservices.數據庫
Kafka Streams提供了一個最簡單的,開發實時流處理應用的方式。由於:apache
- 它是一個jar包而非流處理框架。單獨構建Kafka Streams應用只須要一個jar包;與其餘項目集成也只需引用這個jar包。
- Kafka Streams只依賴Kafka,輸入數據和輸出數據都存放在Kafka中。
1.2 Kafka 集羣
假設咱們已經有了Zookeeper和Kafka環境,如今須要建立兩個topic:安全
- source topic,命名爲TextLinesTopic,分區數爲2,WordCount應用消費的topic。
- target topic,命名爲WordsWithCountsTopic,分區數爲2,WordCount應用處理過的數據會發往這個topic。
建立topic的命令爲:app
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic topic-name # 建立成功後,命令行顯示Created topic "topic-name".
建立兩個topic後,查看topic列表框架
bin/kafka-topics.sh --zookeeper localhost:2181 --list # 看到建立的source topic和target topic
2 建立Kafka Streams應用
2.1 官網的WordCount應用
使用Hello Kafka Streams官網提供的入門的WordCount應用,在頁面最下能夠看到官網提供的WordCountApplication
,以下(有小改動):dom
// 略去import public class WordCountApplication { private static final String BOOTSTRAP_SERVERS_CONFIG = "127.0.0.1:9092"; public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); // stream & source topic KStream<String, String> textLines = builder.stream("TextLinesTopic"); // table KTable<String, Long> wordCounts = textLines .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+"))) .groupBy((key, word) -> word) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")); // target topic wordCounts .toStream() .to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long())); Topology toplogy = builder.build(); // 打印toplogy System.out.println(toplogy.describe()); // 構建KafkaStreams KafkaStreams streams = new KafkaStreams(toplogy, props); Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); } }); streams.start(); } }
由上邊代碼可見,構建一個KafkaStreams
至少須要Topology
和Properties
兩個參數(KafkaStreams
總共提供了7個public的構造函數,3個已經棄用)。ide
2.1.1 Topology
WordCount應用經過StreamsBuilder#build
建立了一個Topology
。在控制檯把建立的拓撲打印出來,以下:函數
Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [TextLinesTopic]) --> KSTREAM-FLATMAPVALUES-0000000001 Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: []) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-KEY-SELECT-0000000002 (stores: []) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001 Processor: KSTREAM-FILTER-0000000005 (stores: []) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002 Sink: KSTREAM-SINK-0000000004 (topic: counts-store-repartition) <-- KSTREAM-FILTER-0000000005 Sub-topology: 1 Source: KSTREAM-SOURCE-0000000006 (topics: [counts-store-repartition]) --> KSTREAM-AGGREGATE-0000000003 Processor: KSTREAM-AGGREGATE-0000000003 (stores: [counts-store]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006 Processor: KTABLE-TOSTREAM-0000000007 (stores: []) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003 Sink: KSTREAM-SINK-0000000008 (topic: WordsWithCountsTopic) <-- KTABLE-TOSTREAM-0000000007
由上可見,這個Topology
又分爲兩個子拓撲,分別是子拓撲0和子拓撲1,對應Task0和Task1,具體以下: ui
2.1.2 Properties
WordCount應用使用了4個StreamsConfig
的配置,分別是:
key | value | describe |
---|---|---|
StreamsConfig.APPLICATION_ID_CONFIG | wordcount-application | 應用名。3個做用:client-id的前綴;相同的應用名組成Kafka Streams集羣;內部topic的前綴 |
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG | 127.0.0.1:9092 | kafka地址,多個以","分隔 |
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG | Serdes.String().getClass() | key序列和反序列的類 |
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG | Serdes.String().getClass() | value序列和反序列的類 |
StreamsConfig
全部的配置可查看Kafka Streams Configs
<!-- 若是來得及的話,加在概念及其餘中。並替換上邊的官網連接 -->
2.1.3 內部topic
WordCount應用啓動後,會建立2個內部topic,分區數
-
wordcount-application-counts-store-repartition
,子拓撲0的SinkNode
會把通過處理的數據發往這個名爲repartition的內部topic,子拓撲1的SourceNode
會從這個名爲repartition的內部topic取到通過子拓撲0處理過的數據。 -
wordcount-application-counts-store-changelog
,這個topic會記錄WordCount的聚合結果,changelog topic使用了kafka的Log compaction功能,能夠安全地清除舊數據,以防止topic無限增加。changelog changelog用於Kafka Streams容錯處理。
2.2 啓動WordCount應用
啓動一個WordCount應用,查看啓動的日誌
... stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] Starting stream-client [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683]Started Streams client stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] State transition from CREATED to RUNNING [Consumer clientId=wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer, groupId=wordcount-application] Discovered group coordinator K-PC:9092 (id: 2147483647 rack: null) [Consumer clientId=wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer, groupId=wordcount-application] Revoking previously assigned partitions [] stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED stream-client [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683]State transition from RUNNING to REBALANCING stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] partition revocation took 1 ms. suspended active tasks: [] suspended standby tasks: [] [Consumer clientId=wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer, groupId=wordcount-application] (Re-)joining group stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer] Assigned tasks to clients as {6f260d17-7347-4e80-94ad-786753f4d683=[activeTasks: ([0_0, 0_1, 1_0, 1_1]) standbyTasks: ([]) assignedTasks: ([0_0, 0_1, 1_0, 1_1]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}. [Consumer clientId=wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer, groupId=wordcount-application] Successfully joined group with generation 1 [Consumer clientId=wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer, groupId=wordcount-application] Setting newly assigned partitions [wordcount-application-counts-store-repartition-0, TextLinesTopic-1, TextLinesTopic-0, wordcount-application-counts-store-repartition-1] stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] partition assignment took 31 ms. current active tasks: [0_0, 0_1, 1_0, 1_1] current standby tasks: [] previous active tasks: [] stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING stream-client [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683]State transition from REBALANCING to RUNNING
能夠看到:
- streams應用state的改變:CREATED -> RUNNING -> REBALANCING -> RUNNING
- streams線程的狀態state的改變:CREATED -> RUNNING -> PARTITIONS_REVOKED -> PARTITIONS_ASSIGNED -> RUNNING
- 最終應用拿到的任務是:0_0, 0_1, 1_0, 1_1,下劃線前的數字是子拓撲,下劃線後的數字是source topic的分區。
再啓動一個WordCount應用(使用一樣的APPLICATION_ID_CONFIG
,與第一個應用組成集羣),控制檯日誌分別輸出:
#第一個WordCount應用日誌 ... [Consumer clientId=wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer, groupId=wordcount-application] Revoking previously assigned partitions [wordcount-application-counts-store-repartition-0, TextLinesTopic-1, TextLinesTopic-0, wordcount-application-counts-store-repartition-1] stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED stream-client [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683]State transition from RUNNING to REBALANCING stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] partition revocation took 277 ms. suspended active tasks: [0_0, 0_1, 1_0, 1_1] suspended standby tasks: [] [Consumer clientId=wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer, groupId=wordcount-application] (Re-)joining group stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer] Assigned tasks to clients as {b391295c-1bee-493e-9ebf-ac4ba0302190=[activeTasks: ([1_0, 1_1]) standbyTasks: ([]) assignedTasks: ([1_0, 1_1]) prevActiveTasks: ([]) prevAssignedTasks: ([0_0, 0_1, 1_0, 1_1]) capacity: 1], 6f260d17-7347-4e80-94ad-786753f4d683=[activeTasks: ([0_0, 0_1]) standbyTasks: ([]) assignedTasks: ([0_0, 0_1]) prevActiveTasks: ([0_0, 0_1, 1_0, 1_1]) prevAssignedTasks: ([0_0, 0_1, 1_0, 1_1]) capacity: 1]}. [Consumer clientId=wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer, groupId=wordcount-application] Successfully joined group with generation 7 [Consumer clientId=wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1-consumer, groupId=wordcount-application] Setting newly assigned partitions [TextLinesTopic-1, TextLinesTopic-0] stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] partition assignment took 229 ms. current active tasks: [0_0, 0_1] current standby tasks: [] previous active tasks: [0_0, 0_1, 1_0, 1_1] stream-thread [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING stream-client [wordcount-application-6f260d17-7347-4e80-94ad-786753f4d683]State transition from REBALANCING to RUNNING # 第二個WordCount應用的日誌 ... stream-thread [wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1] Starting stream-client [wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190]Started Streams client stream-thread [wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1] State transition from CREATED to RUNNING [Consumer clientId=wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1-consumer, groupId=wordcount-application] Discovered group coordinator K-PC:9092 (id: 2147483647 rack: null) [Consumer clientId=wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1-consumer, groupId=wordcount-application] Revoking previously assigned partitions [] stream-thread [wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1] State transition from RUNNING to PARTITIONS_REVOKED stream-client [wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190]State transition from RUNNING to REBALANCING stream-thread [wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1] partition revocation took 1 ms. suspended active tasks: [] suspended standby tasks: [] [Consumer clientId=wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1-consumer, groupId=wordcount-application] (Re-)joining group [Consumer clientId=wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1-consumer, groupId=wordcount-application] Successfully joined group with generation 7 [Consumer clientId=wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1-consumer, groupId=wordcount-application] Setting newly assigned partitions [wordcount-application-counts-store-repartition-0, wordcount-application-counts-store-repartition-1] stream-thread [wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED stream-thread [wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1] partition assignment took 35 ms. current active tasks: [1_0, 1_1] current standby tasks: [] previous active tasks: [] stream-thread [wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING stream-client [wordcount-application-b391295c-1bee-493e-9ebf-ac4ba0302190]State transition from REBALANCING to RUNNING
能夠看到,第二個應用啓動後組成了集羣,新的應用加入後,兩個應用的任務進行了REBALANCE
,第一個應用拿到了0_0, 0_1,第二個應用拿到了1_0, 1_1。REBALANCE
後第一個應用的子拓撲0執行topic分區0和分區1的任務,子拓撲1沒有拿到任務;第二個應用的子拓撲1執行topic分區0和分區1的任務,子拓撲0沒有拿到任務。以下圖:
這時,若是一條消息發往source topic的1分區:
- 消息會被第一個WordCount應用子拓撲0的
SourceNode
(KSTREAM-SOURCE-0000000000)拿到 - 子拓撲0處理後由子拓撲0的
SinkNode
(KSTREAM-SINK-0000000004)發回到kafka的reparation分區 - 而後被第二個WordCount應用的子拓撲1(2個WordCount應用經過任務
REBALANCE
後,的SourceNode
(KSTREAM-SOURCE-0000000006)拿到 - 通過子拓撲1處理由
SinkNode
(KSTREAM-SINK-0000000008)把最終結果發至target topic。
3 往source topic 發消息
往source topic(TextLinesTopic)發送一條消息,消息的key和value都是"hello world"。
- 1 消息被子拓撲0的
SourceNode
(KSTREAM-SOURCE-0000000000)拿到。 - 2 原始消息被
Kstream#flatMapValues
處理,對應子拓撲0的ProcessorNode
(KSTREAM-FLATMAPVALUES-0000000001)。這個算子(Processor)會把消息的value轉爲小寫格式在進行切分操做(以非字母數字下劃線切分),這些操做並不改變原消息的key。所以,key和value都是"hello world"的消息通過Kstream#flatMapValues
處理後,變成key是"hello world",value分別是"hello"和"world"的兩條消息。 - 3 而後由
Kstream#groupBy
處理。這個算子等價於前後調用Kstream#selectKey
和Kstream#groupByKey
,分別對應子拓撲0的ProcessorNode
(KSTREAM-KEY-SELECT-0000000002)和ProcessorNode
KSTREAM-FILTER-0000000005)。- 3.1
Kstream#selectKey
,改變消息的key。消息變爲key和value分別是「hello」和「world」的兩條消息。 - 3.2
Kstream#groupByKey
,按key進行分組,相同的key會被分到一組(Kafka Streams進行聚合操做前必須進行分組操做)。
- 3.1
- 4 消息經過
SinkNode
(KSTREAM-SINK-0000000004)將子拓撲0處理完的消息發往reparation topic。
接下來的操做由子拓撲1完成。
- 1 子拓撲1的
SourceNode
(KSTREAM-SOURCE-0000000006)從reparation topic拿到分組後的消息。 - 2 由
Kstream#count
處理,對應子拓撲1的ProcessorNode
(KSTREAM-AGGREGATE-0000000003),count操做後消息類型變成KTable
。- 2.1 local state store,將結果存進rocksdb,存儲在本地。
- 2.2 將結果發往changelog topic,用於容錯處理。
- 3
ProcessorNode
(KTABLE-TOSTREAM-0000000007)把消息類型從KTable
轉化爲KStream
。 - 4 經過
SinkNode
(KSTREAM-SINK-0000000008)將子拓撲1 count的消息發往target topic。
因爲時間有限,不少Kafka Streams的概念,好比窗口等本文沒有涉及。
4 概念及其餘
<span id ="jump-to-topology"></span>
4.1 topology
Kafka Streams 經過拓撲定義處理邏輯,拓撲由點和邊組成。
- 點,分爲3類:
SourceNode
、SinkNode
和ProcessorNode
。SourceNode
和SinkNode
分別是拓撲的起止。SourceNode
從Kafka的source topic中取消息給ProcessorNode
處理,SinkNode
把ProcessorNode
處理完的結果發往Kafka的target topic;Processor
負責處理流數據。分爲Kafka Streams DSL
和Processor API
兩類,前者提供經常使用的數據操做,如map、filter、join、aggregations;後者能夠由開發者本身定製開發。
- 邊,數據流向。
<span id ="jump-to-state"></span>
4.2 state
Stream thread的states
+-------------+ +<--- | Created (0) | | +-----+-------+ | | | v | +-----+-------+ +<--- | Running (1) | <----+ | +-----+-------+ | | | | | v | | +-----+-------+ | +<--- | Partitions | | | | Revoked (2) | <----+ | +-----+-------+ | | | | | v | | +-----+-------+ | | | Partitions | | | | Assigned (3)| ---->+ | +-----+-------+ | | | v | +-----+-------+ +---> | Pending | | Shutdown (4)| +-----+-------+ | v +-----+-------+ | Dead (5) | +-------------+
Kafka Streams的states
+--------------+ +<----- | Created (0) | | +-----+--------+ | | | v | +--------------+ +<----- | Running (2) | -------->+ | +----+--+------+ | | | ^ | | v | | | +----+--+------+ | | | Re- | v | | Balancing (1)| -------->+ | +-----+--------+ | | | | | v v | +-----+--------+ +----+-------+ +-----> | Pending |<--- | Error (5) | | Shutdown (3) | +------------+ +-----+--------+ | v +-----+--------+ | Not | | Running (4) | +--------------+
<span id = "jump-to-ktable"></span>
4.3 KTable與KStream
KStream,數據流,全部數據經過insert only的方式加入到這個數據流中。
Ktable,數據集,像是數據庫中的表,數據以update only的方式加入。
4.4 Kafka發送消息時,如何肯定分區
源碼KafkaProducer#doSend
在發送消息時,經過partition
方法得到分區。
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { ... int partition = partition(record, serializedKey, serializedValue, cluster); ... }
kafka提供了一個默認的partition
實現DefaultPartitioner#partition
/** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); }
上述源碼能夠看到:
- 消息key是null,topic可用分區大於0時,使用根據topic獲取的nextValue的值和可用分區數進行取模操做。
- 消息key是null,topic可用分區小於等於0時,獲取根據topic獲取的nextValue的值和總分區數進行取模操做(give a non-available partition,給了個不可用的分區)。
- 消息key不是null時,對key進行hash後和topic的分區數取模。能夠保證topic的分區數不變的狀況下,相同的key每次都發往同一個分區。