數據流(也叫事件流)是無邊界數據集的抽象表示。無邊界意味着無限和持續增加。無邊界數據集之因此是無限的,是由於隨着時間的推移,新記錄會不斷加入進來。數據流除了無邊界還有如下特性:html
數據流是有序的。事件的發生老是有前後順序的,如先下單再發貨java
數據記錄不可變。事件一旦發生,就不能被改變,以下單後,想要取消只會新產生一個事件git
數據流是可重播的。github
持續的從一個無邊界的數據集讀取數據,而後對它們進行處理並生成結果,就是流式處理。spring
流式處理是一種編程範式,就像請求和響應範式、批處理範式那樣,下面是三種範式的比較:sql
請求和響應,這種範式的特色是延遲最小數據庫
批處理,這種範式的特色是高延遲和高吞吐量apache
流式處理,這種範式介於上面兩種之間編程
Kafka Streams是一個用於構建流式處理應用程序的客戶端庫,其中輸入和輸出數據是存儲在Kafka集羣中的。一個用Kakfa Streams搭建的流處理程序,它的架構以下圖:bootstrap
Kafka Streams的優勢:
簡單輕量級的客戶端庫,能夠輕鬆嵌入到任何Java應用程序中;除了Kafka以外沒有其餘外部的依賴。全部能夠輕鬆的整合到本身的應用中,也不須要爲流式處理需求額外的的部署一個應用集羣。
使用Kafka做爲內部消息通信存儲介質,不須要從新加入其它外部組件來作消息通信。值得注意的是,它使用Kafka的分區模型來水平擴展處理,同時保持強大的排序保證。
支持本地狀態容錯,可實現很是快速有效的有狀態操做,如窗口鏈接和聚合。本地狀態被保存在Kafka中,在機器故障的時候,其餘機器能夠自動恢復這些狀態繼續處理。
能夠保證每一個記錄只處理一次,即便在處理過程當中Streams客戶端或Kafka代理髮生故障時也只處理一次。
採用一條記錄一次處理以實現毫秒處理延遲,並支持基於事件時間的窗口操做以及記錄的延遲到達。
提供豐富的流式處理API,包括高級的Streams DSL和低級的Processor API。
時間在流式處理中很是的重要,由於大部分的流式處理都是基於時間窗的,如計算5分鐘內用戶的訪問量,那麼對於五分鐘前的數據就不該該參與計算。
事件時間
事件時間是指事件的發生時間或事件的建立時間,如商品的出售時間,用戶的訪問時間。在Kakfa 0.10以後的版本,生產者會自動在記錄中添加建立時間,若是與業務的事件時間不一致,那就須要手動設置這個時間。
日誌追加時間
日誌追加時間是指時間保存到kafka broker上的時間。
處理時間
處理時間是指咱們的應用在收到事件後對其處理的時間。同一個事件的處理時間可能不一樣,這取決於不一樣的應用什麼時候讀取這個時間
若是隻是單獨的處理每個事件,那麼這個流式處理就很簡單,如從數據流中過濾出交易金額大於10000的數據,而後給這些交易人發個優惠券,這種需求咱們用kafka的消費者客戶端就徹底能知足,但一般狀況下咱們的操做中會包含多個事件,如統計總數、平均數、最大值等。事件與事件之間的信息被稱爲狀態。如咱們賣了1雙鞋,而後又買了2雙鞋,通過這兩個事件後,在如今這個時間,咱們鞋數量的狀態就是一雙。
在業務系統中,有時咱們關注變化的過程,有時咱們關注結果。流是一系列事件,每一個事件就是一個變動;表包含了當前的狀態,是多個變動所產生的結果。將流轉換爲表,叫作流的物化。咱們捕獲到表所發生的變動(insert、update、delete)事件,這些事件就組成了流。
在流處理中,咱們的數據處理大部分都是基於窗操做的,如咱們在分析股價的走勢時,咱們須要統計出的天天或者每一個小時的內股價的平均價格,而後查看價格的一個走勢,而不是直接統計從股票發行到如今的平均價,這個是沒多大意義的,這裏的天天或每一個小時就是一個時間窗。
在Kafka streams中窗口有兩種,時間窗口和會話窗口(其實會話窗口也是基於時間窗的)。
時間窗有兩個重要的屬性:窗口大小和步長(移動間隔)。
滾動窗口:步長等於窗口大小,滾動窗口是沒有沒有記錄的重疊。
跳躍窗口:步長不等於窗口大小
滑動窗口:窗口隨着每一條記錄移動,滑動窗口不與時間對齊,而是與數據記錄時間戳對齊。
會話窗口:會話窗口與時間窗最大的不一樣是,他的大小是不肯定的(由於它的大小是由數據自己決定的)
看下圖,這是一個時間間隔爲5分鐘的session窗口 ,先忽略圖中遲到的兩個記錄,假設他們沒遲到。
在kakfa streams中,計算邏輯被定義爲拓撲,它是一個操做和變換的集合,每一個事件從輸入到輸出都會流經它。每一個流式處理的應用至少會有一個拓撲。
流處理器是處理拓撲中的各個節點,表明拓撲中的每一個處理步驟,用來完成數據轉換功能,如過濾、映射、分組、聚合等。一個流處理器同一時間從上游接收一條輸入數據,產生一個或多個輸出記錄到下個流式處理器。
一個拓撲中有兩種特殊的的處理器:
Source Processor,沒有上游處理器,從一個或多個Kafka topic爲拓撲生成輸入流。
Slink Processor,沒有下游處理器,將從上游處理器接收的記錄發送到指定的Kafka主題。
將數據按空格拆分紅單個單詞,過濾掉不須要的單詞,統計每一個單詞出現的次數
// 配置信息 Properties props = new Properties(); //Streams應用Id props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordCount"); //Kafka集羣地址 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.195.88:9092"); //指定序列化和反序列化類型 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); //建立一個topology構建器,在kakfa中計算邏輯被定義爲鏈接的處理器節點的拓撲。 StreamsBuilder builder = new StreamsBuilder(); //使用topology構建器建立一個源流,指定源topic KStream<String, String> source = builder.stream("wordCountInput"); // 構建topology KStream<String, Long> wordCounts = source //把數據按空格拆分紅單個單詞 .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+"))) //過濾掉the這個單詞,不統計這個單詞 .filter((key, value) -> (!value.equals("the"))) //分組 .groupBy((key, word) -> word) //計數,其中'countsStore'是狀態存儲的名字 .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("countsStore")) .toStream(); //將stream寫回到Kafka的topic wordCounts.to("wordCountOutput", Produced.with(Serdes.String(), Serdes.String())); //建立Streams客戶端 KafkaStreams streams = new KafkaStreams(builder.build(), props); //啓動Streams客戶端 streams.start();
統計5秒鐘內股票交易的最高價、最低價、交易次數及平均價格,統計信息每隔一秒鐘更新一次。這是一個典型的時間窗的應用。
StreamsBuilder builder = new StreamsBuilder(); KStream<String, Trade> source = builder.stream("stockStatsInput"); KStream<Windowed<String>, TradeStats> stats = // 按key分組,這裏的key是股票代碼 source.groupByKey() // 建立一個跳躍時間窗,窗口大小5s,步長1s .windowedBy(TimeWindows.of(Duration.ofSeconds(5)).advanceBy(Duration.ofSeconds(1))) // 進行聚合操做,用TradeStats對象存儲每一個窗口的統計信息——最高價、最低價、交易次數及總交易額 .aggregate(TradeStats :: new, (k, v, tradeStats) -> tradeStats.add(v), Materialized.<String, TradeStats, WindowStore<Bytes, byte[]>>as("tradeAggregates").withValueSerde(new TradeStatsSerde())) .toStream() //計算平均股價 .mapValues(TradeStats :: computeAvgPrice); //將stream寫回到Kafka stats.to("stockStatsOutput", Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start();
根據用戶的搜索信息、點擊信息、及我的信息,把這些信息鏈接在一塊兒,動態的分析出用戶行爲。如一個用戶搜索了"奶粉",並在一分鐘內點擊了「貝因美」,我的信息是年齡20-30歲之間的女性,咱們把這些事件流鏈接起來後,就能夠獲得一條用戶分析的數據,之後貝因美的奶粉搞活動了就能夠直接向該用戶推薦。這個例子主要演示了數據流間的鏈接。
// 搜索事件 KStream<Integer, Search> searches =builder.stream(Constants.SEARCH_TOPIC, Consumed.with(Serdes.Integer(), new SearchSerde())); // 點擊事件 KStream<Integer, PageView> views = builder.stream(Constants.PAGE_VIEW_TOPIC, Consumed.with(Serdes.Integer(), new PageViewSerde())); // 用戶信息 KTable<Integer, UserProfile> profiles = builder.table(Constants.USER_PROFILE_TOPIC, Consumed.with(Serdes.Integer(), new ProfileSerde()), Materialized.as("profileStore")); //將點擊事件與用戶信息鏈接,用UserActivity對象來存儲狀態 KStream<Integer, UserActivity> viewsWithProfile = views.leftJoin(profiles, (page, profile) -> { if (profile != null){ return new UserActivity(profile.getUserId(), profile.getUserName(), profile.getPhone(), "", page.getPage()); }else { return new UserActivity(-1, "", "", "", page.getPage()); } }); //將用戶點擊事件與搜索信息鏈接 KStream<Integer, UserActivity> userActivityKStream = viewsWithProfile.leftJoin(searches, (userActivity, search) -> { if (search != null) { userActivity.updateSearch(search.getSearchTerms()); }else { userActivity.updateSearch(""); } return userActivity; }, // 搜索事後的10s秒鐘內的數據才被認爲是相關聯的 JoinWindows.of(Duration.ofSeconds(10)), Joined.with(Serdes.Integer(), new UserActivitySerde(), new SearchSerde())); userActivityKStream.to(Constants.USER_ACTIVITY_TOPIC, Produced.with(Serdes.Integer(), new UserActivitySerde()) ); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start();
Kafka Streams的API有兩種,Kafka Streams DSL和Processor API。
Kafka Streams DSL是高級API,它提供最多見的數據轉換操做,諸如map,filter,join等。
Processor API一種低級API,容許您添加和鏈接處理器以及直接與狀態存儲進行交互,Processor API爲您提供比DSL更多的靈活性,但代價是須要在應用程序開發人員方面進行更多的手動工做(例如,更多行代碼)。所以接下來全部的工做都是基於DSL。
KStream:數據流抽象。建立方法以下:
StreamsBuilder builder = new StreamsBuilder(); KStream<String, Long> wordCounts = builder.stream( "word-counts-input-topic", // 輸入的topic Consumed.with(Serdes.String(), Serdes.Long()) //key和value的序列化方式 );
KTable:數據表抽象。建立方法以下:
KStream<String, Long> wordCounts = builder.table( "word-counts-input-topic", // 輸入的topic Consumed.with(Serdes.String(), Serdes.Long(), //key和value的序列化方式 Materialized.as("word-counts-store") // 狀態存儲名 );
GlobalKTable:同KTable,只不過是全局的,KTable是讀取的當前分區的數據,而GlobalKTable是讀取的所有分區的數據,這在進行join操做時是很是有用的。比較相似關係型數據庫在分庫分表後join的問題。
如何理解kStream和KTable的區別:
咱們能夠這樣看,咱們從topic前後讀取了兩條數據,("蘋果", 1) --> ("蘋果", 3)
,對於KStream來講,表示有一個蘋果,而後我又有3個蘋果,結果是我就有了4個蘋果 ;可是對於KTable來講,表示我如今有1蘋果,我如今有3個蘋果,結果是我有3個蘋果。由於對於KTable來講,第二條記錄是第一條記錄的更新。
因此官網對它們區別描述的很是好,KStream對於流中的記錄始終解釋爲insert,而KTable對流中的記錄解釋爲upsert。
只須要數據流過一遍就能夠,不依賴先後的狀態。
branch:將一個Kstream分紅多個
KStream<String, Long>[] branches = stream.branch( (key, value) -> key.startsWith("A"), //branches[0]中只包含key以「A」開頭的全部記錄 (key, value) -> key.startsWith("B"), //branches[1]中只包含key以「B」開頭的全部記錄 (key, value) -> true //branches[2]中包含其餘記錄 );
filter:過濾操做
// 過濾掉value不大於0的記錄 KStream<String, Long> onlyPositives = stream.filter((key, value) -> value > 0);
filterNot:反向過濾,與filter相反
flatMap:將一條記錄轉換成0條、1條或多條記錄
// 把一條記錄轉換成了兩條記錄。如: (345L, "Hello") -> ("HELLO", 1000), ("hello", 9000) KStream<String, Integer> transformed = stream.flatMap((key, value) -> { List<KeyValue<String, Integer>> result = new LinkedList<>(); result.add(KeyValue.pair(value.toUpperCase(), 1000)); result.add(KeyValue.pair(value.toLowerCase(), 9000)); return result; });
flatMapValues:做用和flatMap相同,可是隻是對value操做,轉換後記錄的key同原來的key
// 經過空格拆分紅單個單詞 KStream<byte[], String> words = sentences.flatMapValues(value -> Arrays.asList(value.split("\\s+")));
foreach:循環
// 循環打印出每條記錄 stream.foreach((key, value) -> System.out.println(key + " => " + value));
groupByKey:根據key分組
KGroupedStream<byte[], String> groupedStream = stream.groupByKey();
GroupBy: 分組
// 分組,並修改了key和value的類型 KGroupedStream<String, String> groupedStream = stream.groupBy( (key, value) -> value, Serialized.with(Serdes.String(), Serdes.String()) ); // 分組,並生成新的key,而且修改了key和value的類型 KGroupedTable<String, Integer> groupedTable = table.groupBy( (key, value) -> KeyValue.pair(value, value.length()), Serialized.with(Serdes.String(), Serdes.Integer()) );
map:將一條記錄轉換成另外一條記錄
KStream<String, Integer> transformed = stream.map(key, value) -> KeyValue.pair(value.toLowerCase(), value.length()));
mapValues:做用同map,可是隻是對value操做,轉換後記錄的key同原來的key
KStream<byte[], String> uppercased = stream.mapValues(value -> value.toUpperCase());
merge:合併兩個流
KStream<byte[], String> merged = stream1.merge(stream2);
peek:對每條記錄執行無狀態操做,並返回未更改的流,也就是說peek中的任何操做,返回的都是之前的流,能夠用來調試
KStream<byte[], String> unmodifiedStream = stream.peek((key, value) -> System.out.println("key=" + key + ", value=" + value));
print:打印流,能夠用來調試
stream.print();
SelectKey:從新構建key
//將key值改成value的第一個單詞 KStream<String, String> rekeyed = stream.selectKey((key, value) -> value.split(" ")[0])
toStream:將KTable轉換成KStream
KStream<byte[], String> stream = table.toStream();
有狀態的轉換包括:Aggregating、Joining、Windowing。他們間的關係以下圖:
經過groupByKey
或groupBy
分組後,返回KGroupedStream
或KGroupedTable
數據類型,它們能夠進行聚合的操做。聚合是基於key操做的。這裏有個注意點,kafka streams要求同一個鏈接操做所涉及的topic必需要有相同數量的分區,而且鏈接所用的key必須就是分區的key,至於爲何能夠想想分庫分表後的join問題。
aggregate
滾動聚合,按分組鍵進行聚合。
聚合分組流時,必須提供初始值設定項(例如,aggValue = 0)和「加法」聚合器(例如,aggValue + curValue)。
聚合分組表時,必須提供「減法」聚合器(例如:aggValue - oldValue)。
KGroupedStream<byte[], String> groupedStream = ; KGroupedTable<byte[], String> groupedTable = ; // 聚合分組流 (注意值類型如何從String更改成Long) KTable<byte[], Long> aggregatedStream = groupedStream.aggregate( () -> 0L, // 初始值 (aggKey, newValue, aggValue) -> aggValue + newValue.length(), Materialized.as("aggregated-stream-store") // 本地狀態名稱 .withValueSerde(Serdes.Long()); // 聚合分組表 KTable<byte[], Long> aggregatedTable = groupedTable.aggregate( () -> 0L, (aggKey, newValue, aggValue) -> aggValue + newValue.length(), (aggKey, oldValue, aggValue) -> aggValue - oldValue.length(), Materialized.as("aggregated-table-store") .withValueSerde(Serdes.Long())
KGroupedStream:
key爲null的記錄會被忽略。
第一次收到記錄key時,將調用初始化(並在加法器以前調用)。
只要記錄的值爲非null時,就會調用加法器。
KGroupedTable:
aggregate (windowed)
窗口聚合,按分組鍵聚合每一個窗口的記錄值。
KGroupedStream<String, Long> groupedStream = ...; // 與基於時間的窗口進行聚合(此處:使用5分鐘的翻滾窗口) KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy(Duration.ofMinutes(5)) .aggregate( () -> 0L, (aggKey, newValue, aggValue) -> aggValue + newValue, Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store").withValueSerde(Serdes.Long())); // 使用基於會話的窗口進行聚合(此處:不活動間隔爲5分鐘) KTable<Windowed<String>, Long> sessionizedAggregatedStream = groupedStream.windowedBy(SessionWindows.with(Duration.ofMinutes(5)) .aggregate( () -> 0L, (aggKey, newValue, aggValue) -> aggValue + newValue, (aggKey, leftAggValue, rightAggValue) -> leftAggValue + rightAggValue, Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("sessionized-aggregated-stream-store").withValueSerde(Serdes.Long()));
count
滾動聚合,按分組鍵統計記錄數。
// Counting a KGroupedStream KTable<String, Long> aggregatedStream = groupedStream.count(); // Counting a KGroupedTable KTable<String, Long> aggregatedTable = groupedTable.count();
對於KGroupedStream,會忽略具備空鍵或空值的記錄。
對於KGroupedTable,會忽略具備空鍵的輸入記錄,具備空值的記錄,會從table中刪除該鍵。
count(windowed)
窗口聚合。按分組鍵統計每一個窗口的記錄數,它會忽略具備空鍵或空值的記錄。
KTable<Windowed<String>, Long> aggregatedStream = groupedStream.windowedBy( TimeWindows.of(Duration.ofMinutes(5))) // 基於時間的窗口 .count(); KTable<Windowed<String>, Long> aggregatedStream = groupedStream.windowedBy( SessionWindows.with(Duration.ofMinutes(5))) // session窗口 .count();
Reduce
滾動聚合,經過分組鍵組合(非窗口)記錄的值。當前記錄值與最後一個減小的值組合,並返回一個新的減小值。與聚合不一樣,結果值類型不能更改。
KGroupedStream<String, Long> groupedStream = ...; KGroupedTable<String, Long> groupedTable = ...; KTable<String, Long> aggregatedStream = groupedStream.reduce( (aggValue, newValue) -> aggValue + newValue ); KTable<String, Long> aggregatedTable = groupedTable.reduce( (aggValue, newValue) -> aggValue + newValue, (aggValue, oldValue) -> aggValue - oldValue );
Reduce (windowed)
窗口聚合。經過分組鍵將每一個窗口的記錄值組合在一塊兒。當前記錄值與最後一個減小的值組合,並返回一個新的減小值。使用null鍵或值的記錄將被忽略。與聚合不一樣,結果值類型不能更改。
KGroupedStream<String, Long> groupedStream = ...; KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy( TimeWindows.of(Duration.ofMinutes(5))) .reduce((aggValue, newValue) -> aggValue + newValue ); KTable<Windowed<String>, Long> sessionzedAggregatedStream = groupedStream.windowedBy( SessionWindows.with(Duration.ofMinutes(5))) .reduce((aggValue, newValue) -> aggValue + newValue );
分組流的聚合:
KStream<String, Integer> wordCounts = ...; KGroupedStream<String, Integer> groupedStream = wordCounts .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer())); KTable<String, Integer> aggregated = groupedStream.aggregate( () -> 0, (aggKey, newValue, aggValue) -> aggValue + newValue, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>as("aggregated-stream-store" ) .withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer());
這段段代碼運行後,聚合會以下圖所示隨着時間的變化:
分組表的聚合:
KTable<String, String> userProfiles = ...; KGroupedTable<String, Integer> groupedTable = userProfiles .groupBy((user, region) ->KeyValue.pair(region, user.length()), Serdes.String(), Serdes.Integer()); KTable<String, Integer> aggregated = groupedTable.aggregate( () -> 0, (aggKey, newValue, aggValue) -> aggValue + newValue, (aggKey, oldValue, aggValue) -> aggValue - oldValue, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>as("aggregated-table-store" ) .withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer());
這段段代碼運行後,聚合會以下圖所示隨着時間的變化:
所謂鏈接,就是將兩條記錄按照必定的規則鏈接爲一條記錄,其實和sql中的鏈接是同樣的做用。在Kafka stream中,join都是基於Key的,join的方式有三種:innerJoin、leftJoin和outerJoin。
join
KStream<String, Long> left = ...; KStream<String, Double> right = ...; KStream<String, String> joined = left.join(right, (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, JoinWindows.of(Duration.ofMinutes(5)), Joined.with(Serdes.String(), Serdes.Long(), Serdes.Double()) );
leftJoin
KStream<String, Long> left = ...; KStream<String, Double> right = ...; KStream<String, String> joined = left.leftJoin(right, (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, JoinWindows.of(Duration.ofMinutes(5)), Joined.with(Serdes.String(), Serdes.Long(), Serdes.Double()) );
outerJoin
KStream<String, Long> left = ...; KStream<String, Double> right = ...; KStream<String, String> joined = left.outerJoin(right, (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */ JoinWindows.of(Duration.ofMinutes(5)), Joined.with( Serdes.String(), Serdes.Long(), Serdes.Double()) );
KStream-KStream的join老是基於windowed的。空鍵或空值的輸入記錄將被忽略,而且不會觸發鏈接
KTable-KTable的join都是不基於windowed的。空鍵的輸入記錄將被忽略,而且不會觸發鏈接
KStream-KTable的join都是不基於windowed的。只有左側(流)的輸入記錄纔會觸發鏈接。右側(表)的輸入記錄僅更新內部右側鏈接狀態。空鍵或空值的輸入記錄將被忽略,而且不會觸發鏈接。
KStream-GlobalKTable的join都是不基於windowed的。只有左側(流)的輸入記錄纔會觸發鏈接。右側(表)的輸入記錄僅更新內部右側鏈接狀態。空鍵或空值的輸入記錄將被忽略,而且不會觸發鏈接。
爲何只有流與流的鏈接必須是基於窗口的呢?由於流的數據是無限的,因此流和流的鏈接是不能完成的。
窗口化使您能夠控制如何將具備相同鍵的記錄分組,以進行有狀態操做,例如聚合或鏈接到所謂的窗口。根據記錄密鑰跟蹤Windows。
// 建立一個時間窗口:窗口大小5s,步長1s TimeWindows.of(Duration.ofSeconds(5)).advanceBy(Duration.ofSeconds(1)); // 建立一個會話窗口:窗口大小5分鐘 SessionWindows.with(Duration.ofMinutes(5));
在實際狀況中,咱們不能保證,每條記錄都能準時的到達,因此就不能保證窗口的結果必定是正確的。例如,咱們要統計每個小時的每種產品的的銷售量,而後篩選出銷量大於3的產品,可是有一條銷售記錄的確是在這個小時內產生的,因爲某種緣由,在這個時間窗關閉的之後纔到達,這樣的話咱們這個時間窗的統計數據其實是不許確的,解決這個問題,能夠用以下的方法:那就是,時間窗在到時間時,先不着急關閉,等待一段時間。
KGroupedStream<UserId, Event> grouped = ...; //容許時間窗接受遲到10分鐘內的記錄 grouped.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10))) .count() //控制在窗口關閉前,下游接受不到任何記錄 .suppress(Suppressed.untilWindowCloses(unbounded())) .filter((windowedUserId, count) -> count < 3) .toStream();
能夠繼承Process和Transform來達到咱們自定義的目的。
參數名稱 | 默認值 | 描述 |
---|---|---|
application.id | 無 | kafka集羣地址,必須參數 |
bootstrap.servers | 無 | 應用id,必須參數,同一應用的全部實例的都應該一直。 |
commit.interval.ms | 30000 ms | 提交任務位置的頻率 |
replication.factor | 1 | 應用程序建立的更改日誌主題和從新分區主題的複製因子 |
state.dir | /tmp/kafka-streams | 狀態存儲的物理位置,注意這個是保存的本地的 |
還有不少配置,等之後用到了再慢慢更新。詳細請參考官方的配置介紹
既然咱們選擇了kafka作應用,那麼只用單線程或單實例的處理咱們的業務那基本上是不太可能的,若是已經使用過kafka,咱們知道kafka的擴展能力那是很是出色的,對使用者也是很是的簡單,如kafka集羣自身的擴展,咱們僅僅須要集羣的配置文件複製到新節點中,修改一下broker id就好了。對於Kafka Streams的應用來講,經過啓動多個實例組建集羣來提升吞掉量,那也是很是的容易,由於kafka會自動幫咱們作好這些事情。
kafka能自動的根據咱們的實例數量和每一個實例的線程數量,將任務進行拆分,固然和topic的分區數也是直接相關的。和咱們的消費者客戶端同樣,kafka會自動的協調工做,爲每一個任務分配屬於任務本身的分區,這樣每一個任務獨自處理本身的分區,並維護與聚合相關的本地狀態。
若是咱們須要處理來自多個分區的結果,即對多個任務結果再進行處理,這時咱們就能夠根據新的key進行從新分區後寫入到重分區主題上,並啓動新的任務重新主題上讀取和處理事件。
Kafka Streams對故障的處理有很是好的支持,若是應用出現故障須要重啓,能夠自動的從Kafka上找到上一處理的位置,從該位置繼續開始處理。若是本地狀態丟失(如宕機),應用能夠自動從保存到kafka上的變動日誌新建本地狀態,由於本地狀態的全部數據都保存到了kafka中。若是集羣中的一個任務失敗,只要還有其餘任務實例可用,就能夠用其餘實例來繼續這個任務,由於Kafka有消費者的重平衡機制。
依賴:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId> </dependency>
流處理類:
@EnableBinding(KafkaStreamsProcessor.class) public class WordCountProcessor { @StreamListener("input") @SendTo("output") public KStream<?, WordCountDto> process(KStream<Object, String> input) { return input .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) .map((key, value) -> new KeyValue<>(value, value)) .groupByKey(Serialized.with(Serdes.String(), Serdes.String())) .windowedBy(TimeWindows.of(30000)) .count(Materialized.as("WordCounts-1")) .toStream() .map((key, value) -> new KeyValue<>(null, new WordCountDto(key.key(), value, new Date(key.window().start()), new Date(key.window().end())))); } }
配置文件:
spring.cloud.stream.kafka.streams.binder: brokers: 192.168.195.88 applicationId: word-count configuration: default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde commit.interval.ms: 1000 serdeError: logAndFail spring.cloud.stream.bindings.output: destination: wordCountOutput spring.cloud.stream.bindings.input: destination: wordCountInput
參考: