學習kafka教程(三)

歡迎關注公衆號:n平方
若有問題或建議,請後臺留言,我會盡力解決你的問題。

本文主要介紹【Kafka Streams的架構和使用java

目標

  • 瞭解kafka streams的架構。
  • 掌握kafka streams編程。

架構分析

整體

Kafka流經過構建Kafka生產者和消費者庫,並利用Kafka的本地功能來提供數據並行性、分佈式協調、容錯和操做簡單性,從而簡化了應用程序開發。
下圖展現了一個使用Kafka Streams庫的應用程序的結構。apache

架構圖

流分區和任務

Kafka的消息傳遞層對數據進行分區,以存儲和傳輸數據。Kafka流劃分數據進行處理。在這兩種狀況下,這種分區都支持數據局部性、靈活性、可伸縮性、高性能和容錯性。Kafka流使用分區和任務的概念做爲基於Kafka主題分區的並行模型的邏輯單元。Kafka流與Kafka在並行性上下文中有着緊密的聯繫:編程

  • 每一個流分區都是一個徹底有序的數據記錄序列,並映射到Kafka主題分區。
  • 流中的數據記錄映射到來自該主題的Kafka消息。
  • 數據記錄的鍵值決定了Kafka流和Kafka流中數據的分區,即,如何將數據路由到主題中的特定分區。

應用程序的處理器拓撲經過將其分解爲多個任務進行擴展。
更具體地說,Kafka流基於應用程序的輸入流分區建立固定數量的任務,每一個任務分配一個來自輸入流的分區列表(例如,kafka的topic)。分配給任務的分區永遠不會改變,所以每一個任務都是應用程序並行性的固定單元。
而後,任務能夠基於分配的分區實例化本身的處理器拓撲;它們還爲每一個分配的分區維護一個緩衝區,並從這些記錄緩衝區一次處理一條消息。
所以,流任務能夠獨立並行地處理,而無需人工干預。api

理解Kafka流不是一個資源管理器,而是一個「運行」其流處理應用程序運行的任何地方的庫。應用程序的多個實例要麼在同一臺機器上執行,要麼分佈在多臺機器上,庫能夠自動將任務分配給運行應用程序實例的那些實例。分配給任務的分區從未改變;若是應用程序實例失敗,它分配的全部任務將在其餘實例上自動從新啓動,並繼續從相同的流分區使用。安全

下圖顯示了兩個任務,每一個任務分配一個輸入流分區。
架構

線程模型

Kafka流容許用戶配置庫用於在應用程序實例中並行處理的線程數。每一個線程能夠獨立地使用其處理器拓撲執行一個或多個任務。
例如,下圖顯示了一個流線程運行兩個流任務。分佈式

啓動更多的流線程或應用程序實例僅僅至關於複製拓撲並讓它處理Kafka分區的不一樣子集,從而有效地並行處理。值得注意的是,線程之間不存在共享狀態,所以不須要線程間的協調。這使得跨應用程序實例和線程並行運行拓撲變得很是簡單。Kafka主題分區在各類流線程之間的分配是由Kafka流利用Kafka的協調功能透明地處理的。ide

如上所述,使用Kafka流擴展您的流處理應用程序很容易:您只須要啓動應用程序的其餘實例,Kafka流負責在應用程序實例中運行的任務之間分配分區。您能夠啓動與輸入Kafka主題分區同樣多的應用程序線程,以便在應用程序的全部運行實例中,每一個線程(或者更確切地說,它運行的任務)至少有一個輸入分區要處理。性能

本地狀態存儲

Kafka流提供了所謂的狀態存儲,流處理應用程序可使用它來存儲和查詢數據,這是實現有狀態操做時的一項重要功能。例如,Kafka Streams DSL在調用有狀態操做符(如join()或aggregate())或打開流窗口時自動建立和管理這樣的狀態存儲。ui

Kafka Streams應用程序中的每一個流任務均可以嵌入一個或多個本地狀態存儲,這些存儲能夠經過api訪問,以存儲和查詢處理所需的數據。Kafka流爲這種本地狀態存儲提供容錯和自動恢復功能。

下圖顯示了兩個流任務及其專用的本地狀態存儲。

容錯

Kafka流構建於Kafka中本地集成的容錯功能之上。Kafka分區是高度可用和複製的;所以,當流數據持久化到Kafka時,即便應用程序失敗並須要從新處理它,流數據也是可用的。Kafka流中的任務利用Kafka消費者客戶端提供的容錯功能來處理失敗。若是任務在失敗的機器上運行,Kafka流將自動在應用程序的一個剩餘運行實例中從新啓動該任務。

此外,Kafka流還確保本地狀態存儲對於故障也是健壯的。對於每一個狀態存儲,它維護一個複製的changelog Kafka主題,其中跟蹤任何狀態更新。這些變動日誌主題也被分區,這樣每一個本地狀態存儲實例,以及訪問該存儲的任務,都有本身專用的變動日誌主題分區。在changelog主題上啓用了日誌壓縮,這樣能夠安全地清除舊數據,防止主題無限增加。若是任務在一臺失敗的機器上運行,並在另外一臺機器上從新啓動,Kafka流經過在恢復對新啓動的任務的處理以前重播相應的更改日誌主題,確保在失敗以前將其關聯的狀態存儲恢復到內容。所以,故障處理對最終用戶是徹底透明的。

編程實例

管道(輸入輸出)實例

就是控制檯輸入到kafka中,通過處理輸出。

package com.example.kafkastreams.demo;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class PipeDemo {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        builder.stream("streams-plaintext-input").to("streams-pipe-output");

        final Topology topology = builder.build();

        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }

}
分詞實例

就是將你輸入的字符串進行分詞輸出。

package com.example.kafkastreams.demo;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class LineSplitDemo {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
                .to("streams-linesplit-output");

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);


        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);

    }
}
詞彙統計實例

將你輸入的字符串進行按單詞統計輸出。

package com.example.kafkastreams.demo;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class WordCountDemo {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
                .groupBy((key, value) -> value)
                .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
                .toStream()
                .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);


        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

最後

本人水平有限,歡迎各位建議以及指正。順便關注一下公衆號唄,會常常更新文章的哦。
n平方

相關文章
相關標籤/搜索