Kafka核心API——Stream API

Kafka Stream概念及初識高層架構圖

Kafka Stream是Apache Kafka從0.10版本引入的一個新Feature,它提供了對存儲於Kafka內的數據進行流式處理和分析的功能。簡而言之,Kafka Stream就是一個用來作流計算的類庫,與Storm、Spark Streaming、Flink的做用相似,但要輕量得多。java

Kafka Stream的基本概念:數據庫

  • Kafka Stream是處理分析存儲在Kafka數據的客戶端程序庫(lib)
  • 因爲Kafka Streams是Kafka的一個lib,因此實現的程序不依賴單獨的環境
  • Kafka Stream經過state store能夠實現高效的狀態操做
  • 支持原語Processor和高層抽象DSL

Kafka Stream的高層架構圖:
Kafka核心API——Stream APIapache

  • Partition的數據會分發到不一樣的Task上,Task主要是用來作流式的並行處理
  • 每一個Task都會有本身的state store去記錄狀態
  • 每一個Thread裏會有多個Task

Kafka Stream 核心概念

Kafka Stream關鍵詞:bootstrap

  • 流和流處理器:流指的是數據流,流處理器指的是數據流到某個節點時對其進行處理的單元
  • 流處理拓撲:一個拓撲圖,該拓撲圖展現了數據流的走向,以及流處理器的節點位置
  • 源處理器及Sink處理器:源處理器指的是數據的源頭,即第一個處理器,Sink處理器則反之,是最終產出結果的一個處理器

以下圖所示:
Kafka核心API——Stream APIbash


Kafka Stream使用演示

下圖是Kafka Stream完整的高層架構圖:
Kafka核心API——Stream API服務器

從上圖中能夠看到,Consumer對一組Partition進行消費,這組Partition能夠在一個Topic中或多個Topic中。而後造成數據流,通過各個流處理器後最終經過Producer輸出到一組Partition中,一樣這組Partition也能夠在一個Topic中或多個Topic中。這個過程就是數據流的輸入和輸出。架構

所以,咱們在使用Stream API前須要先建立兩個Topic,一個做爲輸入,一個做爲輸出。到服務器上使用命令行建立兩個Topic:app

[root@txy-server2 ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic input-topic
[root@txy-server2 ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic output-topic

因爲以前依賴的kafka-clients包中沒有Stream API,因此須要另外引入Stream的依賴包。在項目中添加以下依賴:ide

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>2.5.0</version>
</dependency>

接下來以一個經典的詞頻統計爲例,演示一下Stream API的使用。代碼示例:測試

package com.zj.study.kafka.stream;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

import java.util.List;
import java.util.Properties;

public class StreamSample {

    private static final String INPUT_TOPIC = "input-topic";
    private static final String OUTPUT_TOPIC = "output-topic";

    /**
     * 構建配置屬性
     */
    public static Properties getProperties() {
        Properties properties = new Properties();
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "49.232.153.84:9092");
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        return properties;
    }

    public static KafkaStreams createKafkaStreams() {
        Properties properties = getProperties();

        // 構建流結構拓撲
        StreamsBuilder builder = new StreamsBuilder();
        // 構建wordCount這個Processor
        wordCountStream(builder);
        Topology topology = builder.build();

        // 構建KafkaStreams
        return new KafkaStreams(topology, properties);
    }

    /**
     * 定義流計算過程
     * 例子爲詞頻統計
     */
    public static void wordCountStream(StreamsBuilder builder) {
        // 不斷的從INPUT_TOPIC上獲取新的數據,並追加到流上的一個抽象對象
        KStream<String, String> source = builder.stream(INPUT_TOPIC);
        // KTable是數據集的抽象對象
        KTable<String, Long> count = source.flatMapValues(
                // 以空格爲分隔符將字符串進行拆分
                v -> List.of(v.toLowerCase().split(" "))
                // 按value進行分組統計
        ).groupBy((k, v) -> v).count();

        KStream<String, Long> sink = count.toStream();
        // 將統計結果輸出到OUTPUT_TOPIC
        sink.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
    }

    public static void main(String[] args) {
        KafkaStreams streams = createKafkaStreams();
        // 啓動該Stream
        streams.start();
    }
}

KTableKStream的關係與區別,以下圖:
Kafka核心API——Stream API

  • KTable相似於一個時間片斷,在一個時間片斷內輸入的數據就會update進去,以這樣的形式來維護這張表
  • KStream則沒有update這個概念,而是不斷的追加

運行以上代碼,而後到服務器中使用kafka-console-producer.sh腳本命令向input-topic生產一些數據,以下:

[root@txy-server2 ~]# kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic input-topic
>Hello World Java
>Hello World Kafka
>Hello Java Kafka
>Hello Java

而後再運行kafka-console-consumer.sh腳本命令從output-topic中消費數據,並進行打印。具體以下:

[root@txy-server2 ~]# kafka-console-consumer.sh --bootstrap-server 172.21.0.10:9092 --topic output-topic --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning

控制檯輸出的結果:

world   2
hello   3
java    2
kafka   2
hello   4
java    3

從輸出結果中能夠看到,Kafka Stream首先是對前三行語句進行了一次詞頻統計,因此前半段是:

world   2
hello   3
java    2
kafka   2

當最後一行輸入以後,又再作了一次詞頻統計,並針對新的統計結果進行輸出,其餘沒有變化的則不做輸出,因此最後打印了:

hello   4
java    3

這也是KTableKStream的一個體現,從測試的結果能夠看出Kafka Stream是實時進行流計算的,而且每次只會針對有變化的內容進行輸出。


foreach方法

在以前的例子中,咱們是從某個Topic讀取數據進行流處理後再輸出到另外一個Topic裏。但在一些場景下,咱們可能不但願將結果數據輸出到Topic,而是寫入到一些存儲服務中,例如ElasticSearch、MongoDB、MySQL等。

在這種場景下,就能夠利用到foreach方法,該方法用於迭代流中的元素。咱們能夠在foreach中將數據存入例如Map、List等容器,而後再批量寫入到數據庫或其餘存儲中間件便可。

foreach方法使用示例:

public static void foreachStream(StreamsBuilder builder) {
    KStream<String, String> source = builder.stream(INPUT_TOPIC);
    source.flatMapValues(
            v -> List.of(v.toLowerCase().split(" "))
    ).foreach((k, v) -> System.out.println(k + " : " + v));
}
相關文章
相關標籤/搜索