KTable:java
KGroupedStream / KGroupedTable:apache
KStreams:app
KGroupedStream comes after groupBy() or groupByKey() call on KStreamide
Count counts the number of record by grouped key函數
If used on KGroupedStream:this
If used on KGroupedTable:spa
經過groupByKey
或groupBy
分組後,返回KGroupedStream
或KGroupedTable
數據類型,它們能夠進行聚合的操做。聚合是基於key操做的。這裏有個注意點,kafka streams要求同一個鏈接操做所涉及的topic必需要有相同數量的分區,而且鏈接所用的key必須就是分區的key,至於爲何能夠想想分庫分表後的join問題。.net
滾動聚合,按分組鍵進行聚合。翻譯
聚合分組流時,必須提供初始值設定項(例如,aggValue = 0)和「加法」聚合器(例如,aggValue + curValue)。debug
聚合分組表時,必須提供「減法」聚合器(例如:aggValue - oldValue)。
KGroupedStream<byte[], String> groupedStream = ; KGroupedTable<byte[], String> groupedTable = ; // 聚合分組流 (注意值類型如何從String更改成Long) KTable<byte[], Long> aggregatedStream = groupedStream.aggregate( () -> 0L, // 初始值 (aggKey, newValue, aggValue) -> aggValue + newValue.length(), Materialized.as("aggregated-stream-store") // 本地狀態名稱 .withValueSerde(Serdes.Long()); // 聚合分組表 KTable<byte[], Long> aggregatedTable = groupedTable.aggregate( () -> 0L, (aggKey, newValue, aggValue) -> aggValue + newValue.length(), (aggKey, oldValue, aggValue) -> aggValue - oldValue.length(), Materialized.as("aggregated-table-store") .withValueSerde(Serdes.Long())
KGroupedStream:
key爲null的記錄會被忽略。
第一次收到記錄key時,將調用初始化(並在加法器以前調用)。
只要記錄的值爲非null時,就會調用加法器。
KGroupedTable:
下面是關於 KGroupedTable 中 aggregate 函數的介紹, 我會晚一些把它詳細的翻譯成中文,由於這個函數若是用好功能是十分強大,同時也是很難掌握的。
<VR> KTable<K,VR> aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> adder, Aggregator<? super K,? super V,VR> subtractor, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized) Aggregate the value of records of the original KTable that got mapped to the same key into a new instance of KTable. Records with null key are ignored. Aggregating is a generalization of combining via reduce(...) as it, for example, allows the result to have a different type than the input values. The result is written into a local KeyValueStore (which is basically an ever-updating materialized view) that can be queried using the provided queryableStoreName. Furthermore, updates to the store are sent downstream into a KTable changelog stream. The specified Initializer is applied once directly before the first input record is processed to provide an initial intermediate aggregation result that is used to process the first record. Each update to the original KTable results in a two step update of the result KTable. The specified adder is applied for each update record and computes a new aggregate using the current aggregate (or for the very first record using the intermediate aggregation result provided via the Initializer) and the record's value by adding the new record to the aggregate. The specified subtractor is applied for each "replaced" record of the original KTable and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced" record from the aggregate. Thus, aggregate(Initializer, Aggregator, Aggregator, Materialized) can be used to compute aggregate functions like sum. For sum, the initializer, adder, and subtractor would work as follows: // in this example, LongSerde.class must be set as value serde in Materialized#withValueSerde public class SumInitializer implements Initializer<Long> { public Long apply() { return 0L; } } public class SumAdder implements Aggregator<String, Integer, Long> { public Long apply(String key, Integer newValue, Long aggregate) { return aggregate + newValue; } } public class SumSubtractor implements Aggregator<String, Integer, Long> { public Long apply(String key, Integer oldValue, Long aggregate) { return aggregate - oldValue; } } Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to the same key. The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of parallel running Kafka Streams instances, and the configuration parameters for cache size, and commit intervall. To query the local KeyValueStore it must be obtained via KafkaStreams#store(...): KafkaStreams streams = ... // counting words ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); String key = "some-word"; Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata() to query the value of the key on a parallel running instance of your Kafka Streams application. For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "storeName" is the provide store name defined in Materialized, and "-changelog" is a fixed suffix. You can retrieve all generated internal topic names via Topology.describe(). Type Parameters: VR - the value type of the aggregated KTable Parameters: initializer - an Initializer that provides an initial aggregate result value adder - an Aggregator that adds a new record to the aggregate result subtractor - an Aggregator that removed an old record from the aggregate result materialized - the instance of Materialized used to materialize the state store. Cannot be null Returns: a KTable that contains "update" records with unmodified keys, and values that represent the latest (rolling) aggregate for each key
For KGroupedStream aggregate() you need an initializer, an adder, a Serde and a StateStore name (name of your aggregation)
For KGroupedTable aggregate() you need an initializer, an adder, a substractor, a Serde and a StateStore name (name of your aggregation)
substractor is used for keep current state of data, if key1, value1 is updated to key1, value2 then by count we will got key1, (count of value1 - 1)
Why do we need substractor in KGroupedTable, but not in KGroupedStream? Because in Stream we only will perform insert operation, which for aggregate() could be handled with add. In KGroupedTable we perform not only insert, but also update/delete, that's why we need a substractor for it.
Reduce is a simplified version of aggregate(), but the result type has to be the same as an input:
(int, int) => int or (String, String) => String (example concat(a,b))
Peek allows you to apply a side-effect operation to a KStream and get the same KStream as a result
For example:
Warning: it could be executed multiple times (cause of at least once concept) as it is side effect (in case of failures)
Like for debugging on the console:
Trust me, you won't need them !!!
This diagram explains, how different Kafka object interact and transform:
發現一篇不錯的文章:
https://my.oschina.net/u/2424727/blog/2989115