Kafka流式處理

Kafka Streams

初識流式處理

什麼是數據流

數據流(也叫事件流)是無邊界數據集的抽象表示。無邊界意味着無限和持續增加。無邊界數據集之因此是無限的,是由於隨着時間的推移,新記錄會不斷加入進來。數據流除了無邊界還有如下特性:html

  1. 數據流是有序的。事件的發生老是有前後順序的,如先下單再發貨java

  2. 數據記錄不可變。事件一旦發生,就不能被改變,以下單後,想要取消只會新產生一個事件git

  3. 數據流是可重播的。github

什麼是流式處理

持續的從一個無邊界的數據集讀取數據,而後對它們進行處理並生成結果,就是流式處理。spring

流式處理是一種編程範式,就像請求和響應範式、批處理範式那樣,下面是三種範式的比較:sql

  • 請求和響應,這種範式的特色是延遲最小數據庫

  • 批處理,這種範式的特色是高延遲和高吞吐量apache

  • 流式處理,這種範式介於上面兩種之間編程

什麼是Kafka Streams

Kafka Streams是一個用於構建流式處理應用程序的客戶端庫,其中輸入和輸出數據是存儲在Kafka集羣中的。一個用Kakfa Streams搭建的流處理程序,它的架構以下圖:bootstrap

Kafka Streams的優勢:

  1. 簡單輕量級的客戶端庫,能夠輕鬆嵌入到任何Java應用程序中;除了Kafka以外沒有其餘外部的依賴。全部能夠輕鬆的整合到本身的應用中,也不須要爲流式處理需求額外的的部署一個應用集羣。

  2. 使用Kafka做爲內部消息通信存儲介質,不須要從新加入其它外部組件來作消息通信。值得注意的是,它使用Kafka的分區模型來水平擴展處理,同時保持強大的排序保證。

  3. 支持本地狀態容錯,可實現很是快速有效的有狀態操做,如窗口鏈接和聚合。本地狀態被保存在Kafka中,在機器故障的時候,其餘機器能夠自動恢復這些狀態繼續處理。

  4. 能夠保證每一個記錄只處理一次,即便在處理過程當中Streams客戶端或Kafka代理髮生故障時也只處理一次。

  5. 採用一條記錄一次處理以實現毫秒處理延遲,並支持基於事件時間的窗口操做以及記錄的延遲到達。

  6. 提供豐富的流式處理API,包括高級的Streams DSL和低級的Processor API。

核心概念

時間

時間在流式處理中很是的重要,由於大部分的流式處理都是基於時間窗的,如計算5分鐘內用戶的訪問量,那麼對於五分鐘前的數據就不該該參與計算。

  • 事件時間

    事件時間是指事件的發生時間或事件的建立時間,如商品的出售時間,用戶的訪問時間。在Kakfa 0.10以後的版本,生產者會自動在記錄中添加建立時間,若是與業務的事件時間不一致,那就須要手動設置這個時間。

  • 日誌追加時間

    日誌追加時間是指時間保存到kafka broker上的時間。

  • 處理時間

    處理時間是指咱們的應用在收到事件後對其處理的時間。同一個事件的處理時間可能不一樣,這取決於不一樣的應用什麼時候讀取這個時間

狀態

若是隻是單獨的處理每個事件,那麼這個流式處理就很簡單,如從數據流中過濾出交易金額大於10000的數據,而後給這些交易人發個優惠券,這種需求咱們用kafka的消費者客戶端就徹底能知足,但一般狀況下咱們的操做中會包含多個事件,如統計總數、平均數、最大值等。事件與事件之間的信息被稱爲狀態。如咱們賣了1雙鞋,而後又買了2雙鞋,通過這兩個事件後,在如今這個時間,咱們鞋數量的狀態就是一雙。

流表的二元性

在業務系統中,有時咱們關注變化的過程,有時咱們關注結果。流是一系列事件,每一個事件就是一個變動;表包含了當前的狀態,是多個變動所產生的結果。將流轉換爲表,叫作流的物化。咱們捕獲到表所發生的變動(insert、update、delete)事件,這些事件就組成了流。

窗口

在流處理中,咱們的數據處理大部分都是基於窗操做的,如咱們在分析股價的走勢時,咱們須要統計出的天天或者每一個小時的內股價的平均價格,而後查看價格的一個走勢,而不是直接統計從股票發行到如今的平均價,這個是沒多大意義的,這裏的天天或每一個小時就是一個時間窗。

在Kafka streams中窗口有兩種,時間窗口和會話窗口(其實會話窗口也是基於時間窗的)。

時間窗有兩個重要的屬性:窗口大小和步長(移動間隔)。

  • 滾動窗口:步長等於窗口大小,滾動窗口是沒有沒有記錄的重疊。

  • 跳躍窗口:步長不等於窗口大小

  • 滑動窗口:窗口隨着每一條記錄移動,滑動窗口不與時間對齊,而是與數據記錄時間戳對齊。

  • 會話窗口:會話窗口與時間窗最大的不一樣是,他的大小是不肯定的(由於它的大小是由數據自己決定的)

    看下圖,這是一個時間間隔爲5分鐘的session窗口 ,先忽略圖中遲到的兩個記錄,假設他們沒遲到。

Processing Topology(拓撲)和Stream processor(流處理器)

在kakfa streams中,計算邏輯被定義爲拓撲,它是一個操做和變換的集合,每一個事件從輸入到輸出都會流經它。每一個流式處理的應用至少會有一個拓撲。

流處理器是處理拓撲中的各個節點,表明拓撲中的每一個處理步驟,用來完成數據轉換功能,如過濾、映射、分組、聚合等。一個流處理器同一時間從上游接收一條輸入數據,產生一個或多個輸出記錄到下個流式處理器。

一個拓撲中有兩種特殊的的處理器:

  1. Source Processor,沒有上游處理器,從一個或多個Kafka topic爲拓撲生成輸入流。

  2. Slink Processor,沒有下游處理器,將從上游處理器接收的記錄發送到指定的Kafka主題。

從三個示例來了解Kafka Streams 的應用

單詞統計

將數據按空格拆分紅單個單詞,過濾掉不須要的單詞,統計每一個單詞出現的次數

// 配置信息
Properties props = new Properties();
//Streams應用Id
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordCount");
//Kafka集羣地址
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.195.88:9092");
//指定序列化和反序列化類型
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

//建立一個topology構建器,在kakfa中計算邏輯被定義爲鏈接的處理器節點的拓撲。
StreamsBuilder builder = new StreamsBuilder();
//使用topology構建器建立一個源流,指定源topic
KStream<String, String> source = builder.stream("wordCountInput");
// 構建topology
KStream<String, Long> wordCounts = source
    //把數據按空格拆分紅單個單詞
    .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
    //過濾掉the這個單詞,不統計這個單詞
    .filter((key, value) -> (!value.equals("the")))
    //分組
    .groupBy((key, word) -> word)
    //計數,其中'countsStore'是狀態存儲的名字
    .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("countsStore"))
    .toStream();

//將stream寫回到Kafka的topic
wordCounts.to("wordCountOutput", Produced.with(Serdes.String(), Serdes.String()));
//建立Streams客戶端
KafkaStreams streams = new KafkaStreams(builder.build(), props);
//啓動Streams客戶端
streams.start();

股價統計

統計5秒鐘內股票交易的最高價、最低價、交易次數及平均價格,統計信息每隔一秒鐘更新一次。這是一個典型的時間窗的應用。

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Trade> source = builder.stream("stockStatsInput");
KStream<Windowed<String>, TradeStats> stats =
    // 按key分組,這裏的key是股票代碼    
    source.groupByKey()
    // 建立一個跳躍時間窗,窗口大小5s,步長1s
    .windowedBy(TimeWindows.of(Duration.ofSeconds(5)).advanceBy(Duration.ofSeconds(1)))
    // 進行聚合操做,用TradeStats對象存儲每一個窗口的統計信息——最高價、最低價、交易次數及總交易額
    .aggregate(TradeStats :: new, (k, v, tradeStats) -> tradeStats.add(v),
            Materialized.<String, TradeStats, 
            WindowStore<Bytes, byte[]>>as("tradeAggregates").withValueSerde(new TradeStatsSerde()))
    .toStream()
    //計算平均股價
    .mapValues(TradeStats :: computeAvgPrice);
//將stream寫回到Kafka
stats.to("stockStatsOutput", Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class)));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

網站點擊事件分析

根據用戶的搜索信息、點擊信息、及我的信息,把這些信息鏈接在一塊兒,動態的分析出用戶行爲。如一個用戶搜索了"奶粉",並在一分鐘內點擊了「貝因美」,我的信息是年齡20-30歲之間的女性,咱們把這些事件流鏈接起來後,就能夠獲得一條用戶分析的數據,之後貝因美的奶粉搞活動了就能夠直接向該用戶推薦。這個例子主要演示了數據流間的鏈接。

// 搜索事件
KStream<Integer, Search> searches =builder.stream(Constants.SEARCH_TOPIC,
                                    Consumed.with(Serdes.Integer(), new SearchSerde()));
// 點擊事件
KStream<Integer, PageView> views = builder.stream(Constants.PAGE_VIEW_TOPIC,
                                    Consumed.with(Serdes.Integer(), new PageViewSerde()));
// 用戶信息
KTable<Integer, UserProfile> profiles = builder.table(Constants.USER_PROFILE_TOPIC,
             Consumed.with(Serdes.Integer(), new ProfileSerde()), Materialized.as("profileStore"));
//將點擊事件與用戶信息鏈接,用UserActivity對象來存儲狀態
KStream<Integer, UserActivity> viewsWithProfile = views.leftJoin(profiles,
        (page, profile) -> {
            if (profile != null){
                return new UserActivity(profile.getUserId(), profile.getUserName(), 
                profile.getPhone(),  "", page.getPage());
            }else {
                return new UserActivity(-1, "", "", "", page.getPage());
            }
        });
//將用戶點擊事件與搜索信息鏈接
KStream<Integer, UserActivity> userActivityKStream = viewsWithProfile.leftJoin(searches,
        (userActivity, search) -> {
            if (search != null) {
                userActivity.updateSearch(search.getSearchTerms());
            }else {
                userActivity.updateSearch("");
            }
            return userActivity;
        },
        // 搜索事後的10s秒鐘內的數據才被認爲是相關聯的
        JoinWindows.of(Duration.ofSeconds(10)), 
                    Joined.with(Serdes.Integer(), new UserActivitySerde(), new SearchSerde()));

userActivityKStream.to(Constants.USER_ACTIVITY_TOPIC, 
                        Produced.with(Serdes.Integer(), new UserActivitySerde()) );
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Kafka Streams常見的API

Kafka Streams的API有兩種,Kafka Streams DSL和Processor API。

  1. Kafka Streams DSL是高級API,它提供最多見的數據轉換操做,諸如map,filter,join等。

  2. Processor API一種低級API,容許您添加和鏈接處理器以及直接與狀態存儲進行交互,Processor API爲您提供比DSL更多的靈活性,但代價是須要在應用程序開發人員方面進行更多的手動工做(例如,更多行代碼)。所以接下來全部的工做都是基於DSL。

重要的抽象

  • KStream:數據流抽象。建立方法以下:

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, Long> wordCounts = builder.stream(
        "word-counts-input-topic", // 輸入的topic 
        Consumed.with(Serdes.String(), Serdes.Long())   //key和value的序列化方式
        );
  • KTable:數據表抽象。建立方法以下:

    KStream<String, Long> wordCounts = builder.table(
     "word-counts-input-topic", // 輸入的topic
     Consumed.with(Serdes.String(), Serdes.Long(), //key和value的序列化方式
     Materialized.as("word-counts-store")    // 狀態存儲名
     );
  • GlobalKTable:同KTable,只不過是全局的,KTable是讀取的當前分區的數據,而GlobalKTable是讀取的所有分區的數據,這在進行join操做時是很是有用的。比較相似關係型數據庫在分庫分表後join的問題。

如何理解kStream和KTable的區別:

咱們能夠這樣看,咱們從topic前後讀取了兩條數據,("蘋果", 1) --> ("蘋果", 3),對於KStream來講,表示有一個蘋果,而後我又有3個蘋果,結果是我就有了4個蘋果 ;可是對於KTable來講,表示我如今有1蘋果,我如今有3個蘋果,結果是我有3個蘋果。由於對於KTable來講,第二條記錄是第一條記錄的更新。

因此官網對它們區別描述的很是好,KStream對於流中的記錄始終解釋爲insert,而KTable對流中的記錄解釋爲upsert

無狀態的轉換

只須要數據流過一遍就能夠,不依賴先後的狀態。

  • branch:將一個Kstream分紅多個

    KStream<String, Long>[] branches = stream.branch(
          (key, value) -> key.startsWith("A"), //branches[0]中只包含key以「A」開頭的全部記錄
          (key, value) -> key.startsWith("B"), //branches[1]中只包含key以「B」開頭的全部記錄
          (key, value) -> true                 //branches[2]中包含其餘記錄
      );
  • filter:過濾操做

    // 過濾掉value不大於0的記錄
    KStream<String, Long> onlyPositives = stream.filter((key, value) -> value > 0);
  • filterNot:反向過濾,與filter相反

  • flatMap:將一條記錄轉換成0條、1條或多條記錄

    // 把一條記錄轉換成了兩條記錄。如: (345L, "Hello") -> ("HELLO", 1000), ("hello", 9000)
    KStream<String, Integer> transformed = stream.flatMap((key, value) -> {
          List<KeyValue<String, Integer>> result = new LinkedList<>();
          result.add(KeyValue.pair(value.toUpperCase(), 1000));
          result.add(KeyValue.pair(value.toLowerCase(), 9000));
          return result;
    });
  • flatMapValues:做用和flatMap相同,可是隻是對value操做,轉換後記錄的key同原來的key

    // 經過空格拆分紅單個單詞
    KStream<byte[], String> words = sentences.flatMapValues(value -> Arrays.asList(value.split("\\s+")));
  • foreach:循環

    // 循環打印出每條記錄
    stream.foreach((key, value) -> System.out.println(key + " => " + value));
  • groupByKey:根據key分組

    KGroupedStream<byte[], String> groupedStream = stream.groupByKey();
  • GroupBy: 分組

    // 分組,並修改了key和value的類型
      KGroupedStream<String, String> groupedStream = stream.groupBy(
          (key, value) -> value, Serialized.with(Serdes.String(), Serdes.String())  
      );
      // 分組,並生成新的key,而且修改了key和value的類型
      KGroupedTable<String, Integer> groupedTable = table.groupBy(
          (key, value) -> KeyValue.pair(value, value.length()),       
          Serialized.with(Serdes.String(), Serdes.Integer()) 
      );
  • map:將一條記錄轉換成另外一條記錄

    KStream<String, Integer> transformed 
        = stream.map(key, value) -> KeyValue.pair(value.toLowerCase(), value.length()));
  • mapValues:做用同map,可是隻是對value操做,轉換後記錄的key同原來的key

    KStream<byte[], String> uppercased = stream.mapValues(value -> value.toUpperCase());
  • merge:合併兩個流

    KStream<byte[], String> merged = stream1.merge(stream2);
  • peek:對每條記錄執行無狀態操做,並返回未更改的流,也就是說peek中的任何操做,返回的都是之前的流,能夠用來調試

    KStream<byte[], String> unmodifiedStream 
          = stream.peek((key, value) -> System.out.println("key=" + key + ", value=" + value));
  • print:打印流,能夠用來調試

    stream.print();
  • SelectKey:從新構建key

    //將key值改成value的第一個單詞
      KStream<String, String> rekeyed = stream.selectKey((key, value) -> value.split(" ")[0])
  • toStream:將KTable轉換成KStream

    KStream<byte[], String> stream = table.toStream();

有狀態的轉換

有狀態的轉換包括:Aggregating、Joining、Windowing。他們間的關係以下圖:

Aggregating(聚合)

經過groupByKeygroupBy分組後,返回KGroupedStreamKGroupedTable數據類型,它們能夠進行聚合的操做。聚合是基於key操做的。這裏有個注意點,kafka streams要求同一個鏈接操做所涉及的topic必需要有相同數量的分區,而且鏈接所用的key必須就是分區的key,至於爲何能夠想想分庫分表後的join問題。

經常使用API
  • aggregate

    滾動聚合,按分組鍵進行聚合。

    聚合分組流時,必須提供初始值設定項(例如,aggValue = 0)和「加法」聚合器(例如,aggValue + curValue)。

    聚合分組表時,必須提供「減法」聚合器(例如:aggValue - oldValue)。

    KGroupedStream<byte[], String> groupedStream = ;
    KGroupedTable<byte[], String> groupedTable = ;
    
    // 聚合分組流 (注意值類型如何從String更改成Long)
    KTable<byte[], Long> aggregatedStream = groupedStream.aggregate(
        () -> 0L, // 初始值
        (aggKey, newValue, aggValue) -> aggValue + newValue.length(), 
        Materialized.as("aggregated-stream-store") // 本地狀態名稱
            .withValueSerde(Serdes.Long());
    
    // 聚合分組表
    KTable<byte[], Long> aggregatedTable = groupedTable.aggregate(
        () -> 0L, 
        (aggKey, newValue, aggValue) -> aggValue + newValue.length(), 
        (aggKey, oldValue, aggValue) -> aggValue - oldValue.length(), 
        Materialized.as("aggregated-table-store") .withValueSerde(Serdes.Long())

    KGroupedStream:

    1. key爲null的記錄會被忽略。

    2. 第一次收到記錄key時,將調用初始化(並在加法器以前調用)。

    3. 只要記錄的值爲非null時,就會調用加法器。

    KGroupedTable:

    1. key爲null的記錄會被忽略。
    2. 第一次收到記錄key時,將調用初始化(並在加法器和減法器以前調用)。
    3. 當一個key的第一個非null的值被接收,只調用加法器。
    4. 當接收到key的後續非空值(例如,UPDATE)時,則(1)使用存儲在表中的舊值調用減法器,以及(2)使用輸入記錄的新值調用加法器。那是剛收到的。未定義減法器和加法器的執行順序。
    5. 當爲一個key(例如,DELETE)接收到邏輯刪除記錄(即具備空值的記錄)時,則僅調用減法器。請注意,只要減法器自己返回空值,就會從生成的KTable中刪除相應的鍵。若是發生這種狀況,該key的任何下一個輸入記錄將再次觸發初始化程序。
  • aggregate (windowed)

    窗口聚合,按分組鍵聚合每一個窗口的記錄值。

    KGroupedStream<String, Long> groupedStream = ...;
    
    // 與基於時間的窗口進行聚合(此處:使用5分鐘的翻滾窗口)
    KTable<Windowed<String>, Long> timeWindowedAggregatedStream 
        = groupedStream.windowedBy(Duration.ofMinutes(5))
            .aggregate(
                () -> 0L, 
                (aggKey, newValue, aggValue) -> aggValue + newValue, 
                Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store").withValueSerde(Serdes.Long())); 
    
    // 使用基於會話的窗口進行聚合(此處:不活動間隔爲5分鐘)
    KTable<Windowed<String>, Long> sessionizedAggregatedStream 
        = groupedStream.windowedBy(SessionWindows.with(Duration.ofMinutes(5))
            .aggregate(
                () -> 0L, 
                (aggKey, newValue, aggValue) -> aggValue + newValue, 
                (aggKey, leftAggValue, rightAggValue) -> leftAggValue + rightAggValue, 
                Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("sessionized-aggregated-stream-store").withValueSerde(Serdes.Long()));
  • count

    滾動聚合,按分組鍵統計記錄數。

    // Counting a KGroupedStream
    KTable<String, Long> aggregatedStream = groupedStream.count();
    
    // Counting a KGroupedTable
    KTable<String, Long> aggregatedTable = groupedTable.count();

    對於KGroupedStream,會忽略具備空鍵或空值的記錄。

    對於KGroupedTable,會忽略具備空鍵的輸入記錄,具備空值的記錄,會從table中刪除該鍵。

  • count(windowed)

    窗口聚合。按分組鍵統計每一個窗口的記錄數,它會忽略具備空鍵或空值的記錄。

    KTable<Windowed<String>, Long> aggregatedStream = groupedStream.windowedBy(
        TimeWindows.of(Duration.ofMinutes(5))) // 基於時間的窗口
        .count();
    
    KTable<Windowed<String>, Long> aggregatedStream = groupedStream.windowedBy(
        SessionWindows.with(Duration.ofMinutes(5))) // session窗口
        .count();
  • Reduce

    滾動聚合,經過分組鍵組合(非窗口)記錄的值。當前記錄值與最後一個減小的值組合,並返回一個新的減小值。與聚合不一樣,結果值類型不能更改。

    KGroupedStream<String, Long> groupedStream = ...;
    KGroupedTable<String, Long> groupedTable = ...;
    
    KTable<String, Long> aggregatedStream = groupedStream.reduce(
        (aggValue, newValue) -> aggValue + newValue );
    
    KTable<String, Long> aggregatedTable = groupedTable.reduce(
        (aggValue, newValue) -> aggValue + newValue, 
        (aggValue, oldValue) -> aggValue - oldValue );
  • Reduce (windowed)

    窗口聚合。經過分組鍵將每一個窗口的記錄值組合在一塊兒。當前記錄值與最後一個減小的值組合,並返回一個新的減小值。使用null鍵或值的記錄將被忽略。與聚合不一樣,結果值類型不能更改。

    KGroupedStream<String, Long> groupedStream = ...;
    KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy(
      TimeWindows.of(Duration.ofMinutes(5)))
      .reduce((aggValue, newValue) -> aggValue + newValue );
    
    KTable<Windowed<String>, Long> sessionzedAggregatedStream = groupedStream.windowedBy(
      SessionWindows.with(Duration.ofMinutes(5))) 
      .reduce((aggValue, newValue) -> aggValue + newValue );
聚合過程詳細分析

分組流的聚合:

KStream<String, Integer> wordCounts = ...;

KGroupedStream<String, Integer> groupedStream = wordCounts
    .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()));

KTable<String, Integer> aggregated = groupedStream.aggregate(
    () -> 0, 
    (aggKey, newValue, aggValue) -> aggValue + newValue, 
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>as("aggregated-stream-store" )
        .withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer());

這段段代碼運行後,聚合會以下圖所示隨着時間的變化:

分組表的聚合:

KTable<String, String> userProfiles = ...;
KGroupedTable<String, Integer> groupedTable = userProfiles
    .groupBy((user, region) ->KeyValue.pair(region, user.length()), Serdes.String(), Serdes.Integer());

KTable<String, Integer> aggregated = groupedTable.aggregate(
    () -> 0, 
    (aggKey, newValue, aggValue) -> aggValue + newValue, 
    (aggKey, oldValue, aggValue) -> aggValue - oldValue, 
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>as("aggregated-table-store" )
        .withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer());

這段段代碼運行後,聚合會以下圖所示隨着時間的變化:

Joining

所謂鏈接,就是將兩條記錄按照必定的規則鏈接爲一條記錄,其實和sql中的鏈接是同樣的做用。在Kafka stream中,join都是基於Key的,join的方式有三種:innerJoin、leftJoin和outerJoin。

  • join

    KStream<String, Long> left = ...;
    KStream<String, Double> right = ...;
    
    KStream<String, String> joined = left.join(right,
        (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, 
    
        JoinWindows.of(Duration.ofMinutes(5)),
        Joined.with(Serdes.String(), Serdes.Long(), Serdes.Double())  
      );
  • leftJoin

    KStream<String, Long> left = ...;
    KStream<String, Double> right = ...;
    
    KStream<String, String> joined = left.leftJoin(right,
        (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, 
    
        JoinWindows.of(Duration.ofMinutes(5)),
        Joined.with(Serdes.String(), Serdes.Long(), Serdes.Double()) 
      );
  • outerJoin

    KStream<String, Long> left = ...;
    KStream<String, Double> right = ...;
    
    KStream<String, String> joined = left.outerJoin(right,
        (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
        JoinWindows.of(Duration.ofMinutes(5)),
        Joined.with(
          Serdes.String(), Serdes.Long(), Serdes.Double()) 
      );

KStream-KStream的join老是基於windowed的。空鍵或空值的輸入記錄將被忽略,而且不會觸發鏈接

KTable-KTable的join都是不基於windowed的。空鍵的輸入記錄將被忽略,而且不會觸發鏈接

KStream-KTable的join都是不基於windowed的。只有左側(流)的輸入記錄纔會觸發鏈接。右側(表)的輸入記錄僅更新內部右側鏈接狀態。空鍵或空值的輸入記錄將被忽略,而且不會觸發鏈接。

KStream-GlobalKTable的join都是不基於windowed的。只有左側(流)的輸入記錄纔會觸發鏈接。右側(表)的輸入記錄僅更新內部右側鏈接狀態。空鍵或空值的輸入記錄將被忽略,而且不會觸發鏈接。

爲何只有流與流的鏈接必須是基於窗口的呢?由於流的數據是無限的,因此流和流的鏈接是不能完成的。

Windowing

窗口化使您能夠控制如何將具備相同鍵的記錄分組,以進行有狀態操做,例如聚合或鏈接到所謂的窗口。根據記錄密鑰跟蹤Windows。

// 建立一個時間窗口:窗口大小5s,步長1s
TimeWindows.of(Duration.ofSeconds(5)).advanceBy(Duration.ofSeconds(1));

// 建立一個會話窗口:窗口大小5分鐘
SessionWindows.with(Duration.ofMinutes(5));

在實際狀況中,咱們不能保證,每條記錄都能準時的到達,因此就不能保證窗口的結果必定是正確的。例如,咱們要統計每個小時的每種產品的的銷售量,而後篩選出銷量大於3的產品,可是有一條銷售記錄的確是在這個小時內產生的,因爲某種緣由,在這個時間窗關閉的之後纔到達,這樣的話咱們這個時間窗的統計數據其實是不許確的,解決這個問題,能夠用以下的方法:那就是,時間窗在到時間時,先不着急關閉,等待一段時間。

KGroupedStream<UserId, Event> grouped = ...;
     //容許時間窗接受遲到10分鐘內的記錄
grouped.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
    .count()
    //控制在窗口關閉前,下游接受不到任何記錄
    .suppress(Suppressed.untilWindowCloses(unbounded()))
    .filter((windowedUserId, count) -> count < 3)
    .toStream();

自定義API

能夠繼承ProcessTransform來達到咱們自定義的目的。

Kafka Streams常見的配置

參數名稱 默認值 描述
application.id kafka集羣地址,必須參數
bootstrap.servers 應用id,必須參數,同一應用的全部實例的都應該一直。
commit.interval.ms 30000 ms 提交任務位置的頻率
replication.factor 1 應用程序建立的更改日誌主題和從新分區主題的複製因子
state.dir /tmp/kafka-streams 狀態存儲的物理位置,注意這個是保存的本地的

還有不少配置,等之後用到了再慢慢更新。詳細請參考官方的配置介紹

分佈式下的Kafka Streams

既然咱們選擇了kafka作應用,那麼只用單線程或單實例的處理咱們的業務那基本上是不太可能的,若是已經使用過kafka,咱們知道kafka的擴展能力那是很是出色的,對使用者也是很是的簡單,如kafka集羣自身的擴展,咱們僅僅須要集羣的配置文件複製到新節點中,修改一下broker id就好了。對於Kafka Streams的應用來講,經過啓動多個實例組建集羣來提升吞掉量,那也是很是的容易,由於kafka會自動幫咱們作好這些事情。

kafka能自動的根據咱們的實例數量和每一個實例的線程數量,將任務進行拆分,固然和topic的分區數也是直接相關的。和咱們的消費者客戶端同樣,kafka會自動的協調工做,爲每一個任務分配屬於任務本身的分區,這樣每一個任務獨自處理本身的分區,並維護與聚合相關的本地狀態。

若是咱們須要處理來自多個分區的結果,即對多個任務結果再進行處理,這時咱們就能夠根據新的key進行從新分區後寫入到重分區主題上,並啓動新的任務重新主題上讀取和處理事件。

容錯

Kafka Streams對故障的處理有很是好的支持,若是應用出現故障須要重啓,能夠自動的從Kafka上找到上一處理的位置,從該位置繼續開始處理。若是本地狀態丟失(如宕機),應用能夠自動從保存到kafka上的變動日誌新建本地狀態,由於本地狀態的全部數據都保存到了kafka中。若是集羣中的一個任務失敗,只要還有其餘任務實例可用,就能夠用其餘實例來繼續這個任務,由於Kafka有消費者的重平衡機制。

與Spring Cloud Stream整合

依賴:

<dependency>
       <groupId>org.springframework.cloud</groupId>
       <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
 </dependency>

流處理類:

@EnableBinding(KafkaStreamsProcessor.class)
public class WordCountProcessor {
    @StreamListener("input")
    @SendTo("output")
    public KStream<?, WordCountDto> process(KStream<Object, String> input) {
        return input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, value) -> new KeyValue<>(value, value))
                .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
                .windowedBy(TimeWindows.of(30000))
                .count(Materialized.as("WordCounts-1"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCountDto(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
    }
}

配置文件:

spring.cloud.stream.kafka.streams.binder:
        brokers: 192.168.195.88
        applicationId: word-count
        configuration:
          default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
          default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
          commit.interval.ms: 1000
          serdeError: logAndFail
spring.cloud.stream.bindings.output:
    destination: wordCountOutput
spring.cloud.stream.bindings.input:
    destination: wordCountInput

參考:

相關文章
相關標籤/搜索