歡迎關注公衆號:n平方
若有問題或建議,請後臺留言,我會盡力解決你的問題。
本文主要介紹【Kafka Streams的架構和使用】java
Kafka流經過構建Kafka生產者和消費者庫,並利用Kafka的本地功能來提供數據並行性、分佈式協調、容錯和操做簡單性,從而簡化了應用程序開發。
下圖展現了一個使用Kafka Streams庫的應用程序的結構。apache
Kafka的消息傳遞層對數據進行分區,以存儲和傳輸數據。Kafka流劃分數據進行處理。在這兩種狀況下,這種分區都支持數據局部性、靈活性、可伸縮性、高性能和容錯性。Kafka流使用分區和任務的概念做爲基於Kafka主題分區的並行模型的邏輯單元。Kafka流與Kafka在並行性上下文中有着緊密的聯繫:編程
應用程序的處理器拓撲經過將其分解爲多個任務進行擴展。
更具體地說,Kafka流基於應用程序的輸入流分區建立固定數量的任務,每一個任務分配一個來自輸入流的分區列表(例如,kafka的topic)。分配給任務的分區永遠不會改變,所以每一個任務都是應用程序並行性的固定單元。
而後,任務能夠基於分配的分區實例化本身的處理器拓撲;它們還爲每一個分配的分區維護一個緩衝區,並從這些記錄緩衝區一次處理一條消息。
所以,流任務能夠獨立並行地處理,而無需人工干預。api
理解Kafka流不是一個資源管理器,而是一個「運行」其流處理應用程序運行的任何地方的庫。應用程序的多個實例要麼在同一臺機器上執行,要麼分佈在多臺機器上,庫能夠自動將任務分配給運行應用程序實例的那些實例。分配給任務的分區從未改變;若是應用程序實例失敗,它分配的全部任務將在其餘實例上自動從新啓動,並繼續從相同的流分區使用。安全
下圖顯示了兩個任務,每一個任務分配一個輸入流分區。
架構
Kafka流容許用戶配置庫用於在應用程序實例中並行處理的線程數。每一個線程能夠獨立地使用其處理器拓撲執行一個或多個任務。
例如,下圖顯示了一個流線程運行兩個流任務。分佈式
啓動更多的流線程或應用程序實例僅僅至關於複製拓撲並讓它處理Kafka分區的不一樣子集,從而有效地並行處理。值得注意的是,線程之間不存在共享狀態,所以不須要線程間的協調。這使得跨應用程序實例和線程並行運行拓撲變得很是簡單。Kafka主題分區在各類流線程之間的分配是由Kafka流利用Kafka的協調功能透明地處理的。ide
如上所述,使用Kafka流擴展您的流處理應用程序很容易:您只須要啓動應用程序的其餘實例,Kafka流負責在應用程序實例中運行的任務之間分配分區。您能夠啓動與輸入Kafka主題分區同樣多的應用程序線程,以便在應用程序的全部運行實例中,每一個線程(或者更確切地說,它運行的任務)至少有一個輸入分區要處理。性能
Kafka流提供了所謂的狀態存儲,流處理應用程序可使用它來存儲和查詢數據,這是實現有狀態操做時的一項重要功能。例如,Kafka Streams DSL在調用有狀態操做符(如join()或aggregate())或打開流窗口時自動建立和管理這樣的狀態存儲。ui
Kafka Streams應用程序中的每一個流任務均可以嵌入一個或多個本地狀態存儲,這些存儲能夠經過api訪問,以存儲和查詢處理所需的數據。Kafka流爲這種本地狀態存儲提供容錯和自動恢復功能。
下圖顯示了兩個流任務及其專用的本地狀態存儲。
Kafka流構建於Kafka中本地集成的容錯功能之上。Kafka分區是高度可用和複製的;所以,當流數據持久化到Kafka時,即便應用程序失敗並須要從新處理它,流數據也是可用的。Kafka流中的任務利用Kafka消費者客戶端提供的容錯功能來處理失敗。若是任務在失敗的機器上運行,Kafka流將自動在應用程序的一個剩餘運行實例中從新啓動該任務。
此外,Kafka流還確保本地狀態存儲對於故障也是健壯的。對於每一個狀態存儲,它維護一個複製的changelog Kafka主題,其中跟蹤任何狀態更新。這些變動日誌主題也被分區,這樣每一個本地狀態存儲實例,以及訪問該存儲的任務,都有本身專用的變動日誌主題分區。在changelog主題上啓用了日誌壓縮,這樣能夠安全地清除舊數據,防止主題無限增加。若是任務在一臺失敗的機器上運行,並在另外一臺機器上從新啓動,Kafka流經過在恢復對新啓動的任務的處理以前重播相應的更改日誌主題,確保在失敗以前將其關聯的狀態存儲恢復到內容。所以,故障處理對最終用戶是徹底透明的。
就是控制檯輸入到kafka中,通過處理輸出。
package com.example.kafkastreams.demo; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class PipeDemo { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); builder.stream("streams-plaintext-input").to("streams-pipe-output"); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
就是將你輸入的字符串進行分詞輸出。
package com.example.kafkastreams.demo; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class LineSplitDemo { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("streams-plaintext-input"); source.flatMapValues(value -> Arrays.asList(value.split("\\W+"))) .to("streams-linesplit-output"); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
將你輸入的字符串進行按單詞統計輸出。
package com.example.kafkastreams.demo; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.KeyValueStore; import java.util.Arrays; import java.util.Locale; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class WordCountDemo { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("streams-plaintext-input"); source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) .groupBy((key, value) -> value) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")) .toStream() .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
本人水平有限,歡迎各位建議以及指正。順便關注一下公衆號唄,會常常更新文章的哦。