4 Kafka Streams -> Stateful Operations of KStreams and KTables without Joins and Window


  • groupBy

KGroupedStream / KGroupedTable:apache

  • Count
  • Reduce
  • Aggregate


  • Peek
  • Transform / TransformValues

KTable GroupBy()

KGroupedStream / KGroupedTable Count()

KGroupedStream comes after groupBy() or groupByKey() call on KStreamide

Count counts the number of record by grouped key函數

If used on KGroupedStream:this

  • Null keys or values are ignored

If used on KGroupedTable:spa

  • Null keys are ignored
  • Null values are treated as "delete" (=tombstones)

KGroupedStream / KGroupedTable Aggregate()


經過groupByKeygroupBy分組後,返回KGroupedStreamKGroupedTable數據類型,它們能夠進行聚合的操做。聚合是基於key操做的。這裏有個注意點,kafka streams要求同一個鏈接操做所涉及的topic必需要有相同數量的分區,而且鏈接所用的key必須就是分區的key,至於爲何能夠想想分庫分表後的join問題。.net


聚合分組流時,必須提供初始值設定項(例如,aggValue = 0)和「加法」聚合器(例如,aggValue + curValue)。debug

聚合分組表時,必須提供「減法」聚合器(例如:aggValue - oldValue)。

KGroupedStream<byte[], String> groupedStream = ;
KGroupedTable<byte[], String> groupedTable = ;

// 聚合分組流 (注意值類型如何從String更改成Long)
KTable<byte[], Long> aggregatedStream = groupedStream.aggregate(
    () -> 0L, // 初始值
    (aggKey, newValue, aggValue) -> aggValue + newValue.length(), 
    Materialized.as("aggregated-stream-store") // 本地狀態名稱

// 聚合分組表
KTable<byte[], Long> aggregatedTable = groupedTable.aggregate(
    () -> 0L, 
    (aggKey, newValue, aggValue) -> aggValue + newValue.length(), 
    (aggKey, oldValue, aggValue) -> aggValue - oldValue.length(), 
    Materialized.as("aggregated-table-store") .withValueSerde(Serdes.Long())



  1. key爲null的記錄會被忽略。

  2. 第一次收到記錄key時,將調用初始化(並在加法器以前調用)。

  3. 只要記錄的值爲非null時,就會調用加法器。


  1. key爲null的記錄會被忽略。
  2. 第一次收到記錄key時,將調用初始化(並在加法器和減法器以前調用)。
  3. 當一個key的第一個非null的值被接收,只調用加法器。
  4. 當接收到key的後續非空值(例如,UPDATE)時,則(1)使用存儲在表中的舊值調用減法器,以及(2)使用輸入記錄的新值調用加法器。那是剛收到的。未定義減法器和加法器的執行順序。
  5. 當爲一個key(例如,DELETE)接收到邏輯刪除記錄(即具備空值的記錄)時,則僅調用減法器。請注意,只要減法器自己返回空值,就會從生成的KTable中刪除相應的鍵。若是發生這種狀況,該key的任何下一個輸入記錄將再次觸發初始化程序。

下面是關於 KGroupedTable 中 aggregate 函數的介紹, 我會晚一些把它詳細的翻譯成中文,由於這個函數若是用好功能是十分強大,同時也是很難掌握的。

<VR> KTable<K,VR> aggregate(Initializer<VR> initializer,
                            Aggregator<? super K,? super V,VR> adder,
                            Aggregator<? super K,? super V,VR> subtractor,
                            Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Aggregate the value of records of the original KTable that got mapped to the same key into a new instance of KTable. Records with null key are ignored. Aggregating is a generalization of combining via reduce(...) as it, for example, allows the result to have a different type than the input values. The result is written into a local KeyValueStore (which is basically an ever-updating materialized view) that can be queried using the provided queryableStoreName. Furthermore, updates to the store are sent downstream into a KTable changelog stream.
The specified Initializer is applied once directly before the first input record is processed to provide an initial intermediate aggregation result that is used to process the first record. Each update to the original KTable results in a two step update of the result KTable. The specified adder is applied for each update record and computes a new aggregate using the current aggregate (or for the very first record using the intermediate aggregation result provided via the Initializer) and the record's value by adding the new record to the aggregate. The specified subtractor is applied for each "replaced" record of the original KTable and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced" record from the aggregate. Thus, aggregate(Initializer, Aggregator, Aggregator, Materialized) can be used to compute aggregate functions like sum. For sum, the initializer, adder, and subtractor would work as follows:

 // in this example, LongSerde.class must be set as value serde in Materialized#withValueSerde
 public class SumInitializer implements Initializer<Long> {
   public Long apply() {
     return 0L;

 public class SumAdder implements Aggregator<String, Integer, Long> {
   public Long apply(String key, Integer newValue, Long aggregate) {
     return aggregate + newValue;

 public class SumSubtractor implements Aggregator<String, Integer, Long> {
   public Long apply(String key, Integer oldValue, Long aggregate) {
     return aggregate - oldValue;
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to the same key. The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of parallel running Kafka Streams instances, and the configuration parameters for cache size, and commit intervall.
To query the local KeyValueStore it must be obtained via KafkaStreams#store(...):

 KafkaStreams streams = ... // counting words
 ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
 String key = "some-word";
 Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata() to query the value of the key on a parallel running instance of your Kafka Streams application.
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "storeName" is the provide store name defined in Materialized, and "-changelog" is a fixed suffix. You can retrieve all generated internal topic names via Topology.describe().

Type Parameters:
VR - the value type of the aggregated KTable
initializer - an Initializer that provides an initial aggregate result value
adder - an Aggregator that adds a new record to the aggregate result
subtractor - an Aggregator that removed an old record from the aggregate result
materialized - the instance of Materialized used to materialize the state store. Cannot be null
a KTable that contains "update" records with unmodified keys, and values that represent the latest (rolling) aggregate for each key


For KGroupedStream aggregate() you need an initializer, an adder, a Serde and a StateStore name (name of your aggregation)

For KGroupedTable aggregate() you need an initializer, an adder, a substractor, a Serde and a StateStore name (name of your aggregation)

substractor is used for keep current state of data, if key1, value1 is updated to key1, value2 then by count we will got key1, (count of value1 - 1)

Why do we need  substractor in KGroupedTable, but not in KGroupedStream? Because in Stream we only will perform insert operation, which for aggregate() could be handled with add.  In KGroupedTable we perform not only insert, but also update/delete, that's why we need a substractor for it.


KGroupedStream / KGroupedTable Reduce()

Reduce is a simplified version of aggregate(), but the result type has to be the same as an input:

(int, int) => int or (String, String) => String (example concat(a,b))

KStream Peek()

Peek allows you to apply a side-effect operation to a KStream and get the same KStream as a result

For example:

  • Printing the stream to the consolo
  • Statistics collection

Warning: it could be executed multiple times (cause of at least once concept) as it is side effect (in case of failures)

Like for debugging on the console:

KStream Transform() / TransformValues()

Trust me, you won't need them !!!


This diagram explains, how different Kafka object interact and transform:



