Kafka streams的相關中文資料很是少,筆者但願借該代碼講述一下本身對kafka streams API的用法。java
kafka streams從0.10.0開始引入,如今已經更新到0.11.0。首先它的使用成本很是低廉,僅需在代碼中依賴streams lib,編寫計算邏輯,啓動APP便可。其次它的負載均衡也很是簡單暴力,增長或者減小運行實例就能夠動態調整,無需人工干預。最後還有一個大殺器(0.11開始支持),提供 Exactly-once消息傳遞特性,它包含了producer冪等性,不會重複發送消息到broker;consumer exactly once,不會重複消費也不會丟失。在運算失敗的時候,重啓運算實例便可恢復。api
下面用demo講解streams api用法。負載均衡
WordCount:函數
KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, textLinesTopic); KStream<String, Long> wordCounts = textLines .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) .map((key, word) -> new KeyValue<>(word, word)) .groupByKey().count("counts").toStream(); wordCounts.to(stringSerde, longSerde, countTopic); KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); streams.start();
這段很是簡短的代碼包括了consumer頂閱主題並消費、統計詞頻、producer寫入到另外一個Topic。跟蹤代碼能夠發現flatMapValues,map函數本質上是處理單元processor,在函數調用時,API會建立特定的processor加入到拓撲中。網站
這種消費單個主題進行運算的方式能夠作一些日誌的統計分析,例如網站的UV,PV等。若是須要處理更復雜的業務,那麼關聯操做不可避免。一樣的,kafka streams 提供了join函數。ui
KStreamBuilder streamBuilder = new KStreamBuilder(); String userStore = "user_store"; String driverStore = "driver_Store"; KTable<String, UserOrder> userOrderKTable = streamBuilder.table(Serdes.String(), SerdeFactory.serdeFrom(UserOrder.class), TOPIC_USER_ORER, userStore); KTable<String, DriverOrder> driverOrderKTable= streamBuilder.table(Serdes.String(), SerdeFactory.serdeFrom(DriverOrder.class), TOPIC_DRIVER_ORDER, driverStore); userOrderKTable.leftJoin(driverOrderKTable, (userOrder,driverOrder)->join(userOrder,driverOrder)) .toStream() .map((k,v)->new KeyValue<>(k,v)) .to(Serdes.String(),SerdeFactory.serdeFrom(Travel.class),TOPIC_TRAVEL);
join的語法本質上是join by partition and key。爲了獲得正確的Join結果,兩個不一樣的topic須要再同一個運行實例中被消費到。假設,Topic1 和Topic2各有4個partition,有兩個實例在運行,因而一個task僅消費Topic1和Topic2的兩個,這樣就須要保證,topic1中的兩個partition的key值在另外一個topic中能被找到。日誌
上面的邏輯看起來很是繞口,在實際開發的過程當中,咱們僅需保證兩個topic擁有相同數量的partition,而且producer採用一樣的Paritioner。若是該條件不知足,須要經過through函數完成。 code
KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic);
假設在join過程當中,咱們須要最新的數據作聚合。kafka streams 提供了windowed函數,在時間窗口內,後續的記錄會覆蓋同一個Key的記錄。窗口結束後,會觸發後續的計算邏輯獲得正確的結果。開發
KTable<Windowed<String>, MultiUserOrder> userOrderKTable = streamBuilder.stream(Serdes.String(), SerdeFactory.serdeFrom(UserOrder.class), TOPIC_USER_ORER, userStore) .groupByKey() .aggregate( new MultiUserOrder(), (k, v, map) -> { map.setOrderId(k); map.getOrders().add(v); return map; }, TimeWindows.of(6 * 1000), SerdeFactory.serdeFrom(MultiUserOrder.class), userAggStore); KTable<Windowed<String>, MultiDriverOrder> driverOrderKTable = streamBuilder.stream(Serdes.String(), SerdeFactory.serdeFrom(DriverOrder.class), TOPIC_DRIVER_ORDER, driverStore) .groupByKey() .aggregate(new MultiDriverOrder(), (k, v, map) -> { map.setOrderId(k); map.getOrders().add(v); return map; }, TimeWindows.of(6 * 1000), SerdeFactory.serdeFrom(MultiDriverOrder.class), driverAggStore); userOrderKTable.leftJoin(driverOrderKTable, (multiUserOrder,multiDriverOrder)->join(multiUserOrder,multiDriverOrder)) .toStream() .map((k,v)->new KeyValue<>(k.key(),v)) .to(Serdes.String(),SerdeFactory.serdeFrom(Travel.class),TOPIC_TRAVEL);