kafka呢其實正道不是消息隊列,本質是日誌存儲系統,而stream processing是其最近大力推廣的特性,本文簡單介紹下word count的實例。java
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>0.10.2.1</version> </dependency>
sh kafka-topics.sh --create --topic wc-input --replication-factor 1 --partitions 1 --zookeeper localhost:2181
一個是wc-input,而後輸出呢,爲了簡單方便,這裏採用控制檯輸出,固然也能夠輸出到另一個topic等等。apache
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-demo"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data // Note: To re-run the demo, you need to use the offset reset tool: // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> source = builder.stream("wc-input"); KTable<String, Long> counts = source .flatMapValues(new ValueMapper<String, Iterable<String>>() { @Override public Iterable<String> apply(String value) { return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")); } }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() { @Override public KeyValue<String, String> apply(String key, String value) { return new KeyValue<>(value, value); } }) .groupByKey() .count("Counts"); counts.print(); KafkaStreams streams = new KafkaStreams(builder, props);
final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { e.printStackTrace(); }
sh kafka-console-producer.sh --broker-list localhost:9092 --topic wc-input
Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology.app
[KSTREAM-AGGREGATE-0000000003]: streams , (1<-null) [KSTREAM-AGGREGATE-0000000003]: is , (1<-null) [KSTREAM-AGGREGATE-0000000003]: a , (1<-null) [KSTREAM-AGGREGATE-0000000003]: library , (1<-null) [KSTREAM-AGGREGATE-0000000003]: for , (1<-null) [KSTREAM-AGGREGATE-0000000003]: building , (1<-null) [KSTREAM-AGGREGATE-0000000003]: microservices, , (1<-null) [KSTREAM-AGGREGATE-0000000003]: where , (1<-null) [KSTREAM-AGGREGATE-0000000003]: input , (1<-null) [KSTREAM-AGGREGATE-0000000003]: output , (1<-null) [KSTREAM-AGGREGATE-0000000003]: data , (1<-null) [KSTREAM-AGGREGATE-0000000003]: are , (1<-null) [KSTREAM-AGGREGATE-0000000003]: stored , (1<-null) [KSTREAM-AGGREGATE-0000000003]: in , (1<-null) [KSTREAM-AGGREGATE-0000000003]: kafka , (2<-null) [KSTREAM-AGGREGATE-0000000003]: clusters. , (1<-null) [KSTREAM-AGGREGATE-0000000003]: it , (1<-null) [KSTREAM-AGGREGATE-0000000003]: combines , (1<-null) [KSTREAM-AGGREGATE-0000000003]: simplicity , (1<-null) [KSTREAM-AGGREGATE-0000000003]: writing , (1<-null) [KSTREAM-AGGREGATE-0000000003]: deploying , (1<-null) [KSTREAM-AGGREGATE-0000000003]: standard , (1<-null) [KSTREAM-AGGREGATE-0000000003]: java , (1<-null) [KSTREAM-AGGREGATE-0000000003]: and , (5<-null) [KSTREAM-AGGREGATE-0000000003]: scala , (1<-null) [KSTREAM-AGGREGATE-0000000003]: applications , (2<-null) [KSTREAM-AGGREGATE-0000000003]: on , (1<-null) [KSTREAM-AGGREGATE-0000000003]: client , (2<-null) [KSTREAM-AGGREGATE-0000000003]: side , (1<-null) [KSTREAM-AGGREGATE-0000000003]: with , (1<-null) [KSTREAM-AGGREGATE-0000000003]: the , (4<-null) [KSTREAM-AGGREGATE-0000000003]: benefits , (1<-null) [KSTREAM-AGGREGATE-0000000003]: of , (2<-null) [KSTREAM-AGGREGATE-0000000003]: kafka's , (1<-null) [KSTREAM-AGGREGATE-0000000003]: server-side , (1<-null) [KSTREAM-AGGREGATE-0000000003]: cluster , (1<-null)