本文來解析一下kafka streams的KStreamBuilder以及舉例如何自定義kafka streams的processorjava
KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> source = builder.stream("demo-topic"); KafkaStreams streams = new KafkaStreams(builder, props); streams.start();
KStreamBuilder裏頭隱藏着Topologynode
kafka-streams-0.10.2.1-sources.jar!/org/apache/kafka/streams/kstream/KStreamBuilder.javaapache
public class KStreamBuilder extends TopologyBuilder { public <K, V> KStream<K, V> stream(final String... topics) { return stream(null, null, null, topics); } public <K, V> KStream<K, V> stream(final AutoOffsetReset offsetReset, final Serde<K> keySerde, final Serde<V> valSerde, final String... topics) { final String name = newName(KStreamImpl.SOURCE_NAME); addSource(offsetReset, name, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics); return new KStreamImpl<>(this, name, Collections.singleton(name), false); } }
這裏的addSource就是調用TopologyBuilder的方法併發
kafka-streams-0.10.2.1-sources.jar!/org/apache/kafka/streams/processor/TopologyBuilder.javaapp
public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final String... topics) { if (topics.length == 0) { throw new TopologyBuilderException("You must provide at least one topic"); } Objects.requireNonNull(name, "name must not be null"); if (nodeFactories.containsKey(name)) throw new TopologyBuilderException("Processor " + name + " is already added."); for (String topic : topics) { Objects.requireNonNull(topic, "topic names cannot be null"); validateTopicNotAlreadyRegistered(topic); maybeAddToResetList(earliestResetTopics, latestResetTopics, offsetReset, topic); sourceTopicNames.add(topic); } nodeFactories.put(name, new SourceNodeFactory(name, topics, null, keyDeserializer, valDeserializer)); nodeToSourceTopics.put(name, Arrays.asList(topics)); nodeGrouper.add(name); return this; }
這個topology是否是很熟悉呢,storm也有topologyide
TopologyBuilder builder = new TopologyBuilder(); //併發度10 builder.setSpout("spout", new TestWordSpout(), 10); builder.setBolt("count", new WordCountBolt(), 5).fieldsGrouping("spout", new Fields("word")); builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("count"); String topologyName = "DemoTopology"; Config conf = new Config(); conf.setDebug(true); try { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(topologyName, conf,builder.createTopology()); Thread.sleep(60 * 1000); cluster.shutdown(); } catch (Exception e) { e.printStackTrace(); }
Processor Topology定義了數據在各個處理單元(在Kafka Stream中被稱做Processor)間的流動方式,或者說定義了數據的處理邏輯。ui
具體看kafka-streams-0.10.2.1-sources.jar!/org/apache/kafka/streams/kstream/internals/KStreamImpl.javathis
@Override public KStream<K, V> filter(Predicate<? super K, ? super V> predicate) { Objects.requireNonNull(predicate, "predicate can't be null"); String name = topology.newName(FILTER_NAME); topology.addProcessor(name, new KStreamFilter<>(predicate, false), this.name); return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired); } @Override public KStream<K, V> filterNot(final Predicate<? super K, ? super V> predicate) { Objects.requireNonNull(predicate, "predicate can't be null"); String name = topology.newName(FILTER_NAME); topology.addProcessor(name, new KStreamFilter<>(predicate, true), this.name); return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired); } @Override public <K1> KStream<K1, V> selectKey(final KeyValueMapper<? super K, ? super V, ? extends K1> mapper) { Objects.requireNonNull(mapper, "mapper can't be null"); return new KStreamImpl<>(topology, internalSelectKey(mapper), sourceNodes, true); } @Override public <K1, V1> KStream<K1, V1> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends K1, ? extends V1>> mapper) { Objects.requireNonNull(mapper, "mapper can't be null"); String name = topology.newName(MAP_NAME); topology.addProcessor(name, new KStreamMap<K, V, K1, V1>(mapper), this.name); return new KStreamImpl<>(topology, name, sourceNodes, true); } @Override public <V1> KStream<K, V1> mapValues(ValueMapper<? super V, ? extends V1> mapper) { Objects.requireNonNull(mapper, "mapper can't be null"); String name = topology.newName(MAPVALUES_NAME); topology.addProcessor(name, new KStreamMapValues<>(mapper), this.name); return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired); } @Override public <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends K1, ? extends V1>>> mapper) { Objects.requireNonNull(mapper, "mapper can't be null"); String name = topology.newName(FLATMAP_NAME); topology.addProcessor(name, new KStreamFlatMap<>(mapper), this.name); return new KStreamImpl<>(topology, name, sourceNodes, true); } @Override public <V1> KStream<K, V1> flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends V1>> mapper) { Objects.requireNonNull(mapper, "mapper can't be null"); String name = topology.newName(FLATMAPVALUES_NAME); topology.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name); return new KStreamImpl<>(topology, name, sourceNodes, this.repartitionRequired); } @Override public void foreach(ForeachAction<? super K, ? super V> action) { Objects.requireNonNull(action, "action can't be null"); String name = topology.newName(FOREACH_NAME); topology.addProcessor(name, new KStreamForeach<>(action), this.name); }
能夠看到各類流式操做,都是往topology添加processorspa
kafka-streams-0.10.2.1-sources.jar!/org/apache/kafka/streams/processor/ProcessorSupplier.java
具體的流式操做processor能夠看這個ProcessorSupplier的實現類,好比kafka-streams-0.10.2.1-sources.jar!/org/apache/kafka/streams/kstream/internals/KStreamFilter.java設計
class KStreamFilter<K, V> implements ProcessorSupplier<K, V> { private final Predicate<K, V> predicate; private final boolean filterNot; public KStreamFilter(Predicate<K, V> predicate, boolean filterNot) { this.predicate = predicate; this.filterNot = filterNot; } @Override public Processor<K, V> get() { return new KStreamFilterProcessor(); } private class KStreamFilterProcessor extends AbstractProcessor<K, V> { @Override public void process(K key, V value) { if (filterNot ^ predicate.test(key, value)) { context().forward(key, value); } } } }
public class WordCountProcessorSupplier implements ProcessorSupplier<String, Processor> { @Override public Processor get() { return new WordCountProcessor(); } private class WordCountProcessor implements Processor<String, String>{ private ProcessorContext context; private KeyValueStore<String, Integer> kvStore; @Override public void init(ProcessorContext context) { this.context = context; this.context.schedule(1000); this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts"); } @Override public void process(String key, String value) { String[] words = value.toLowerCase(Locale.getDefault()).split(" "); for (String word : words) { Integer oldValue = this.kvStore.get(word); if (oldValue == null) { this.kvStore.put(word, 1); } else { this.kvStore.put(word, oldValue + 1); } } } /** * Perform any periodic operations * @param timestamp */ @Override public void punctuate(long timestamp) { try (KeyValueIterator<String, Integer> itr = this.kvStore.all()) { while (itr.hasNext()) { KeyValue<String, Integer> entry = itr.next(); System.out.println("[" + entry.key + ", " + entry.value + "]"); context.forward(entry.key, entry.value.toString()); } context.commit(); } } @Override public void close() { this.kvStore.close(); } } }
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-demo"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data // Note: To re-run the demo, you need to use the offset reset tool: // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); TopologyBuilder builder = new TopologyBuilder(); builder.addSource("SOURCE", "wc-input"); builder.addProcessor("PROCESSOR1", new WordCountProcessorSupplier(), "SOURCE"); builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "PROCESSOR1"); // builder.addSink("SINK", OUTPUT_TOPIC_NAME, "PROCESSOR1"); KafkaStreams streams = new KafkaStreams(builder, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { e.printStackTrace(); }
[aaa, 2] [bb, 1] [ccc, 1] [ddd, 1] [hello, 2] [kafka, 1] [world, 2] 2017-10-16 22:33:01.835 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_0 2017-10-16 22:33:08.775 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed 2017-10-16 22:33:08.776 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_0 2017-10-16 22:33:08.776 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_1 2017-10-16 22:33:08.776 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_2 [aaa, 1] 2017-10-16 22:33:11.621 INFO --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_2 [bb, 1]