Kafka0.11發佈--流式計算新玩法

    Kafka streams的相關中文資料很是少,筆者但願借該代碼講述一下本身對kafka streams API的用法。java

    kafka streams從0.10.0開始引入,如今已經更新到0.11.0。首先它的使用成本很是低廉,僅需在代碼中依賴streams lib,編寫計算邏輯,啓動APP便可。其次它的負載均衡也很是簡單暴力,增長或者減小運行實例就能夠動態調整,無需人工干預。最後還有一個大殺器(0.11開始支持),提供 Exactly-once消息傳遞特性,它包含了producer冪等性,不會重複發送消息到broker;consumer exactly once,不會重複消費也不會丟失。在運算失敗的時候,重啓運算實例便可恢復。api

    下面用demo講解streams api用法。負載均衡

    WordCount:函數

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, textLinesTopic);

KStream<String, Long> wordCounts = textLines
      .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
      .map((key, word) -> new KeyValue<>(word, word))
      .groupByKey().count("counts").toStream();

wordCounts.to(stringSerde, longSerde, countTopic);

KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();

        這段很是簡短的代碼包括了consumer頂閱主題並消費、統計詞頻、producer寫入到另外一個Topic。跟蹤代碼能夠發現flatMapValues,map函數本質上是處理單元processor,在函數調用時,API會建立特定的processor加入到拓撲中。網站

        這種消費單個主題進行運算的方式能夠作一些日誌的統計分析,例如網站的UV,PV等。若是須要處理更復雜的業務,那麼關聯操做不可避免。一樣的,kafka streams 提供了join函數。ui

        

KStreamBuilder streamBuilder = new KStreamBuilder();
        String userStore = "user_store";
        String driverStore = "driver_Store";
        KTable<String, UserOrder> userOrderKTable = streamBuilder.table(Serdes.String(),
                SerdeFactory.serdeFrom(UserOrder.class), TOPIC_USER_ORER, userStore);
        KTable<String, DriverOrder> driverOrderKTable= streamBuilder.table(Serdes.String(),
                SerdeFactory.serdeFrom(DriverOrder.class), TOPIC_DRIVER_ORDER, driverStore);

        userOrderKTable.leftJoin(driverOrderKTable,
                (userOrder,driverOrder)->join(userOrder,driverOrder))
                .toStream()
                .map((k,v)->new KeyValue<>(k,v))
                .to(Serdes.String(),SerdeFactory.serdeFrom(Travel.class),TOPIC_TRAVEL);

         join的語法本質上是join by partition and key。爲了獲得正確的Join結果,兩個不一樣的topic須要再同一個運行實例中被消費到。假設,Topic1 和Topic2各有4個partition,有兩個實例在運行,因而一個task僅消費Topic1和Topic2的兩個,這樣就須要保證,topic1中的兩個partition的key值在另外一個topic中能被找到。日誌

       上面的邏輯看起來很是繞口,在實際開發的過程當中,咱們僅需保證兩個topic擁有相同數量的partition,而且producer採用一樣的Paritioner。若是該條件不知足,須要經過through函數完成。            code

KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, 
         StreamPartitioner<K, V> partitioner, String topic);

           假設在join過程當中,咱們須要最新的數據作聚合。kafka streams 提供了windowed函數,在時間窗口內,後續的記錄會覆蓋同一個Key的記錄。窗口結束後,會觸發後續的計算邏輯獲得正確的結果。開發

KTable<Windowed<String>, MultiUserOrder> userOrderKTable = streamBuilder.stream(Serdes.String(),
                SerdeFactory.serdeFrom(UserOrder.class),
                TOPIC_USER_ORER, userStore)
                .groupByKey()
                .aggregate(
                        new MultiUserOrder(), (k, v, map) -> {
                                map.setOrderId(k);
                                map.getOrders().add(v);
                                return map;
                        },
                        TimeWindows.of(6 * 1000),
                        SerdeFactory.serdeFrom(MultiUserOrder.class), userAggStore);


        KTable<Windowed<String>, MultiDriverOrder> driverOrderKTable = streamBuilder.stream(Serdes.String(),
                SerdeFactory.serdeFrom(DriverOrder.class),
                TOPIC_DRIVER_ORDER, driverStore)
                .groupByKey()
                .aggregate(new MultiDriverOrder(), (k, v, map) -> {
                            map.setOrderId(k);
                            map.getOrders().add(v);
                            return map;
                        }, TimeWindows.of(6 * 1000),
                        SerdeFactory.serdeFrom(MultiDriverOrder.class), driverAggStore);

        userOrderKTable.leftJoin(driverOrderKTable,
                (multiUserOrder,multiDriverOrder)->join(multiUserOrder,multiDriverOrder))
                .toStream()
                .map((k,v)->new KeyValue<>(k.key(),v))
                .to(Serdes.String(),SerdeFactory.serdeFrom(Travel.class),TOPIC_TRAVEL);
相關文章
相關標籤/搜索