Kafka Streams將來可期

核心知識預熱

TIPS
1.資料來源 說明書以及 內部構造
2.學習技術就是不斷解惑的過程,就kafka stream自問:是個什麼技術,能幹什麼,怎麼使用..
  1. Kafka Streams是一個數據輸入和數據輸出都保存在kafka集羣程序和微服務構建的客戶端類庫,那麼就不須要專門去搭建計算集羣,方便快捷;
  2. Kafka Streams提供兩種方法來定義流處理拓撲。Kafka Streams DSL提供了最通用的可直接使用的數據轉換操做(好比map);低階的處理器API則容許開發者定義和鏈接到自定義的處理器或者和state store進行交互。也就是說前者是高階API,封裝好了的,通用場景使用且能快速開發;後者是低階API,更接近底層,開發難度大可是能更好地適配程序和業務。
  3. Kafka Streams一樣支持狀態統計、窗口函數、eventTime和exactly-once語義等實時場景;

前置概念

concept desc
stream processing application 多個處理器造成的拓撲結構,包含有必定處理邏輯的應用程序
processor topology 流處理器拓撲,是processor+...+processor的形式,source和sink是特殊的processor
Source Processor 源頭處理器,即上游沒有其餘的流處理器,從kafka的topic中消費數據產生數據流輸送到下游
Sink Processor 結果處理器,即下游沒有其餘的流處理器,將上游的數據輸送到指定的kafka topic
Time 聯想flink的時間語義,例如某某time1手機端購買某商品,產生了日誌數據,而後time2這個日誌數據被實時採集到Kafka持久化到topic,而後進入流式處理框架,在time3正式被計算,那麼time123分別稱爲:event time,ingestion time,processing time
states 保存和查詢數據狀態的功能,能夠定義流處理應用外的程序進行只讀訪問
processing guarantees 消費是否丟失和是否重複的級別,好比exactly-once,at-least-once,at-most-once

拓撲

kafka stream的拓撲其實就是一個個processor鏈接起來的流程圖,其中source和sink是比較特殊的processor,分別沒有上游和下游處理器。拓撲建立方式是在建立下游processor的時候指定上游的processor名稱進行鏈接java

// DSL轉換算子生成新KStream是調用
void addGraphNode(final StreamsGraphNode parent,final StreamsGraphNode child) {}
// 直接經過builder添加processor
public synchronized Topology addProcessor(final String name,final ProcessorSupplier supplier,final String... parentNames) {}

topology.png

使用

使用上核心都是四個步驟:node

  1. 建立流處理應用配置參數;
  2. 構造流處理拓撲結構;
  3. 建立流處理客戶端實例;
  4. 開始執行流處理程序;

使用DSL編寫單詞統計

測試代碼

/* 1.props */
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");//可做爲consumer的group id
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//kafka的地址,多個逗號分隔
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());// 序列化和反序列化,在讀取和寫出流的時候、在讀取和寫出state的時候都會用到
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    /* 2.topology */
    final StreamsBuilder builder = new StreamsBuilder();

    KStream<String, String> source = builder.stream("streams-plaintext-input");//source processor,傳入參數可定義key,value的序列化方式,以及時間提取器等

    source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))//KString<String,String>
            .groupBy((key, value) -> value)// KGroupedStream<String,String>
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))//KTable<String,String>
            .toStream()//KStream<String,Long>
            .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));//sink processor,指定輸出key,value的數據類型

    final Topology topology = builder.build();

    /* 3.KafkaStreams實例 */
    final KafkaStreams streams = new KafkaStreams(topology, props);
    // CountDownLatch用await()阻塞當前線程,countDown()記錄完成線程的數量
    // 當getCount()=0的時候繼續執行await後續的代碼
    final CountDownLatch latch = new CountDownLatch(1);

    System.out.println(topology.describe());// 打印流處理拓撲

    // 鉤子函數
    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
        @Override
        public void run() {
            streams.close();
            latch.countDown();
        }
    });

    try {
        // 4.執行
        streams.start();
        latch.await();
    } catch (Throwable e) {
        System.exit(1);
    }
    System.exit(0);

測試數據

# 生產者打印生產數據
langjiang@langs-MacBook-Pro kafka_2.11-2.1.0 % bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
>hello hello hello hello
>kafka kafka kafka kafka
# 消費者打印消費數據
langjiang@langs-MacBook-Pro kafka_2.11-2.1.0 % bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

hello    4
kafka    4

打印拓撲

這裏能夠看到有點相似於寬依賴的時候,拓撲會劃分,中間會生成streams-wordcount-counts-store-repartition主題保存中間結果。git

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input])
      --> KSTREAM-FLATMAPVALUES-0000000001
    Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: [])
      --> KSTREAM-KEY-SELECT-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-KEY-SELECT-0000000002 (stores: [])
      --> counts-store-repartition-filter
      <-- KSTREAM-FLATMAPVALUES-0000000001
    Processor: counts-store-repartition-filter (stores: [])
      --> counts-store-repartition-sink
      <-- KSTREAM-KEY-SELECT-0000000002
    Sink: counts-store-repartition-sink (topic: counts-store-repartition)
      <-- counts-store-repartition-filter

  Sub-topology: 1
    Source: counts-store-repartition-source (topics: [counts-store-repartition])
      --> KSTREAM-AGGREGATE-0000000003
    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [counts-store])
      --> KTABLE-TOSTREAM-0000000007
      <-- counts-store-repartition-source
    Processor: KTABLE-TOSTREAM-0000000007 (stores: [])
      --> KSTREAM-SINK-0000000008
      <-- KSTREAM-AGGREGATE-0000000003
    Sink: KSTREAM-SINK-0000000008 (topic: streams-wordcount-output)
      <-- KTABLE-TOSTREAM-0000000007

主題建立

DSL程序工做後會生成streams-wordcount-counts-store-changelog的主題,名稱規則是:application_id+store_name+changelog,是由於每次更新KTable,都會發送最新的鍵值記錄到流處理內部的變動日誌主題github

langjiang@langs-MacBook-Pro kafka_2.11-2.1.0 % bin/kafka-topics.sh --zookeeper localhost:2181 --list
streams-plaintext-input
streams-wordcount-counts-store-changelog
streams-wordcount-counts-store-repartition
streams-wordcount-output

使用Processor編寫單詞統計

測試代碼

/* 1.應用配置參數 */
    Properties props = new Properties();
    // 可做爲consumer的group id
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount1");
    // kafka的地址,多個逗號分隔,目前只支持單集羣
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    // 序列化和反序列化,在讀取和寫出流的時候、在讀取和寫出state的時候都會用到
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    /* 2.拓撲 */
    StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("Counts"),
            Serdes.String(),
            Serdes.Long())
            .withLoggingDisabled(); // disable backing up the store to a changelog topic

    Topology builder = new Topology();

    // add the source processor node that takes Kafka topic "source-topic" as input
    builder.addSource("Source", "source-topic")
            // add the WordCountProcessor node which takes the source processor as its upstream processor
            .addProcessor("Process", WordCountProcessor::new, "Source")
            // add the count store associated with the WordCountProcessor processor
            .addStateStore(countStoreSupplier, "Process")
            // add the sink processor node that takes Kafka topic "sink-topic" as output
            // and the WordCountProcessor node as its upstream processor
            .addSink("Sink", "sink-topic", "Process");

    /* 3.流處理客戶端實例 */
    KafkaStreams streams = new KafkaStreams(builder, props);

    /* 4.啓動 */
    streams.start();

這裏自定義了單詞統計的Processorshell

  1. void init():初始化Processor
  2. void process():處理key-value的消息
  3. void close():清理資源
/**
 * @Description 經過實現Processor重寫process方法自定義Processor
 * 自定義的時候,先看上層接口,而後找一個內置的實現類參考好比KStreamAggregateProcessor
 */
public static class WordCountProcessor implements Processor<String, String> {
    private ProcessorContext context;
    private KeyValueStore<String, Long> kvStore;

    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        // keep the processor context locally because we need it in punctuate() and commit()
        this.context = context;
        // 獲取名爲Counts的狀態
        kvStore = (KeyValueStore) context.getStateStore("Counts");
    }

    // 每接收到一條消息就會執行一次process,這裏是將結果放回緩存中
    public void process(String key, String value) {
        String[] words = value.toLowerCase().split(" ");
        for (String word : words) {
            // 獲取以前這個單詞統計的數量,以前沒有統計就設置爲1
            Long preCount = this.kvStore.get(word);
            Long result = preCount == null ? 1 : preCount + 1;
            // 將結果寫到緩存中
            this.kvStore.put(word, result);
            // 將結果寫到下游topic
            this.context.forward(word,result.toString());
            System.out.println("process , key = " + word + ",and value = " + result);
        }
    }

    public void close() {
        // close any resources managed by this processor
        // Note: Do not close any StateStores as these are managed by the library
    }
}

測試數據

# 生產者生產數據
langjiang@langs-MacBook-Pro kafka_2.11-2.1.0 % bin/kafka-console-producer.sh --broker-list localhost:9092 --topic source-topic
>test test test
# 消費者消費數據
langjiang@langs-MacBook-Pro kafka_2.11-2.1.0 % bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic sink-topic
1
2
3
# 注意到forward寫出的key,value,實際僅打印保存了統計數據,即value

打印拓撲

Topologies:
   Sub-topology: 0
    Source: Source (topics: [source-topic])
      --> Process
    Processor: Process (stores: [Counts])
      --> Sink
      <-- Source
    Sink: Sink (topic: sink-topic)
      <-- Process
相關文章
相關標籤/搜索