上一篇介紹了merge算子的做用。這一篇介紹如何從一個Kafka Streams中過濾掉那些重複出現的事件,只留下那些惟一的事件。html
假設咱們要執行去重邏輯的事件格式以下:java
{"ip":"10.0.0.1","url":"https://docs.confluent.io/current/tutorials/examples/kubernetes/gke-base/docs/index.html","timestamp":"2019-09-16T14:53:43+00:00"}git
每條事件依然由Protocol Buffer進行序列化,由三部分組成:ip + url + timestampgithub
首先建立項目路徑apache
$ mkdir distinct-events && cd distinct-eventsbootstrap
而後,在distinct-events目錄下建立Gradle配置文件build.gradle,內容以下:app
buildscript { repositories { jcenter() } dependencies { classpath 'com.github.jengelman.gradle.plugins:shadow:4.0.2' } } plugins { id 'java' id "com.google.protobuf" version "0.8.10" } apply plugin: 'com.github.johnrengelman.shadow' repositories { mavenCentral() jcenter() maven { url 'http://packages.confluent.io/maven' } } group 'huxihx.kafkastreams' sourceCompatibility = 1.8 targetCompatibility = '1.8' version = '0.0.1' dependencies { implementation 'com.google.protobuf:protobuf-java:3.0.0' implementation 'org.slf4j:slf4j-simple:1.7.26' implementation 'org.apache.kafka:kafka-streams:2.3.0' implementation 'com.google.protobuf:protobuf-java:3.9.1' testCompile group: 'junit', name: 'junit', version: '4.12' } protobuf { generatedFilesBaseDir = "$projectDir/src/" protoc { artifact = 'com.google.protobuf:protoc:3.0.0' } } jar { manifest { attributes( 'Class-Path': configurations.compile.collect { it.getName() }.join(' '), 'Main-Class': 'huxihx.kafkastreams.FindDistinctEvents' ) } } shadowJar { archiveName = "kstreams-transform-standalone-${version}.${extension}" }
注意咱們設定的主類名稱是huxihx.kafkastreams.FindDistinctEvents。maven
保存上面的文件,而後執行下列命令下載Gradle的wrapper套件:ide
$ gradle wrapper測試
作完這些以後,咱們在distinct-events目錄下建立名爲configuration的子目錄,用於保存咱們的參數配置文件dev.properties:
$ mkdir configuration
application.id=find-distinct-app bootstrap.servers=localhost:9092 input.topic.name=clicks input.topic.partitions=1 input.topic.replication.factor=1 output.topic.name=distinct-clicks output.topic.partitions=1 output.topic.replication.factor=1
這裏咱們配置了一個輸入topic和一個輸出topic,分別保存輸入消息流和去重以後的新消息流。
接下來建立用到的topic的schema。在distinct-events下執行命令建立保存schema的文件夾:
$ mkdir -p src/main/proto
以後在proto文件夾下建立名爲click.proto文件,內容以下:
syntax = "proto3"; package huxihx.kafkastreams.proto; message Click { string ip = 1; string url = 2; string timestamp = 3; }
保存以後在distinct-events目錄下運行gradlew命令:
$ ./gradlew build
此時,你應該能夠在distinct-events/src/main/java/huxihx/kafkastreams/proto下看到生成的Java類:ClickOuterClass。
這一步咱們爲所需的topic消息建立各自的Serdes。首先在distinct-events目錄下執行下面的命令建立對應的文件夾目錄:
$ mkdir -p src/main/java/huxihx/kafkastreams/serdes
以後在新建立的serdes文件夾下建立ProtobufSerializer.java:
package huxihx.kafkastreams.serdes; import com.google.protobuf.MessageLite; import org.apache.kafka.common.serialization.Serializer; public class ProtobufSerializer<T extends MessageLite> implements Serializer<T> { @Override public byte[] serialize(String topic, T data) { return data == null ? new byte[0] : data.toByteArray(); } }
而後是ProtobufDeserializer.java:
package huxihx.kafkastreams.serdes; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; import java.util.Map; public class ProtobufDeserializer<T extends MessageLite> implements Deserializer<T> { private Parser<T> parser; @Override public void configure(Map<String, ?> configs, boolean isKey) { parser = (Parser<T>) configs.get("parser"); } @Override public T deserialize(String topic, byte[] data) { try { return parser.parseFrom(data); } catch (InvalidProtocolBufferException e) { throw new SerializationException("Failed to deserialize from a protobuf byte array.", e); } } }
最後是ProtobufSerdes.java:
package huxihx.kafkastreams.serdes; import com.google.protobuf.MessageLite; import com.google.protobuf.Parser; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; import java.util.HashMap; import java.util.Map; public class ProtobufSerdes<T extends MessageLite> implements Serde<T> { private final Serializer<T> serializer; private final Deserializer<T> deserializer; public ProtobufSerdes(Parser<T> parser) { serializer = new ProtobufSerializer<>(); deserializer = new ProtobufDeserializer<>(); Map<String, Parser<T>> config = new HashMap<>(); config.put("parser", parser); deserializer.configure(config, false); } @Override public Serializer<T> serializer() { return serializer; } @Override public Deserializer<T> deserializer() { return deserializer; } }
首先在src/main/java/huxihx/kafkastreams下建立DeduplicationTransformer.java。該Java類用於實現去重邏輯:
package huxihx.kafkastreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; /** * 根據ip地址執行去重邏輯 * @param <K> * @param <V> * @param <E> */ public class DeduplicationTransformer<K, V, E> implements Transformer<K, V, KeyValue<K, V>> { private static final String storeName = "eventId-store"; private ProcessorContext context; private WindowStore<E, Long> eventIdStore; private final long leftDurationMs; private final long rightDurationMs; private final KeyValueMapper<K, V, E> idExtractor; DeduplicationTransformer(final long maintainDurationPerEventInMs, final KeyValueMapper<K, V, E> idExtractor) { if (maintainDurationPerEventInMs < 1) { throw new IllegalArgumentException("maintain duration per event must be >= 1"); } leftDurationMs = maintainDurationPerEventInMs / 2; rightDurationMs = maintainDurationPerEventInMs - leftDurationMs; this.idExtractor = idExtractor; } @Override public void init(ProcessorContext context) { this.context = context; eventIdStore = (WindowStore<E, Long>) context.getStateStore(storeName); } @Override public KeyValue<K, V> transform(K key, V value) { final E eventId = idExtractor.apply(key, value); if (eventId == null) { return KeyValue.pair(key, value); } else { final KeyValue<K, V> output; if (isDuplicate(eventId)) { output = null; updateTimestampOfExistingEventToPreventExpiry(eventId, context.timestamp()); } else { output = KeyValue.pair(key, value); rememberNewEvent(eventId, context.timestamp()); } return output; } } private boolean isDuplicate(final E eventId) { final long eventTime = context.timestamp(); final WindowStoreIterator<Long> timeIterator = eventIdStore.fetch( eventId, eventTime - leftDurationMs, eventTime + rightDurationMs); final boolean isDuplicate = timeIterator.hasNext(); timeIterator.close(); return isDuplicate; } private void updateTimestampOfExistingEventToPreventExpiry(final E eventId, final long newTimestamp) { eventIdStore.put(eventId, newTimestamp, newTimestamp); } private void rememberNewEvent(final E eventId, final long timestamp) { eventIdStore.put(eventId, timestamp, timestamp); } @Override public void close() { } }
而後,在src/main/java/huxihx/kafkastreams下建立FindDistinctEvents.java文件:
package huxihx.kafkastreams; import huxihx.kafkastreams.proto.ClickOuterClass; import huxihx.kafkastreams.serdes.ProtobufSerdes; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicListing; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; import java.io.FileInputStream; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; public class FindDistinctEvents { private static final String storeName = "eventId-store"; public static void main(String[] args) throws Exception { if (args.length < 1) { throw new IllegalArgumentException("Config file path must be specified."); } FindDistinctEvents app = new FindDistinctEvents(); Properties envProps = app.loadEnvProperties(args[0]); Properties streamProps = app.createStreamsProperties(envProps); Topology topology = app.buildTopology(envProps); app.preCreateTopics(envProps); final KafkaStreams streams = new KafkaStreams(topology, streamProps); final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Exception e) { System.exit(1); } System.exit(0); } private Topology buildTopology(Properties envProps) { final StreamsBuilder builder = new StreamsBuilder(); final ProtobufSerdes<ClickOuterClass.Click> clickSerdes = clickProtobufSerdes(); final String inputTopic = envProps.getProperty("input.topic.name"); final String outputTopic = envProps.getProperty("output.topic.name"); final Duration windowSize = Duration.ofMinutes(2); final StoreBuilder<WindowStore<String, Long>> dedupStoreBuilder = Stores.windowStoreBuilder( Stores.persistentWindowStore(storeName, windowSize, windowSize, false ), Serdes.String(), Serdes.Long()); builder.addStateStore(dedupStoreBuilder); builder.stream(inputTopic, Consumed.with(Serdes.String(), clickSerdes)) .transform(() -> new DeduplicationTransformer<>(windowSize.toMillis(), (key, value) -> value.getIp()), storeName) .to(outputTopic, Produced.with(Serdes.String(), clickSerdes)); return builder.build(); } private Properties createStreamsProperties(Properties envProps) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, envProps.getProperty("application.id")); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers")); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return props; } private void preCreateTopics(Properties envProps) throws Exception { Map<String, Object> config = new HashMap<>(); config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers")); String inputTopic = envProps.getProperty("input.topic.name"); String outputTopic = envProps.getProperty("output.topic.name"); try (AdminClient client = AdminClient.create(config)) { Collection<TopicListing> existingTopics = client.listTopics().listings().get(); List<NewTopic> topics = new ArrayList<>(); List<String> topicNames = existingTopics.stream().map(TopicListing::name).collect(Collectors.toList()); if (!topicNames.contains(inputTopic)) topics.add(new NewTopic( inputTopic, Integer.parseInt(envProps.getProperty("input.topic.partitions")), Short.parseShort(envProps.getProperty("input.topic.replication.factor")))); if (!topicNames.contains(outputTopic)) topics.add(new NewTopic( outputTopic, Integer.parseInt(envProps.getProperty("output.topic.partitions")), Short.parseShort(envProps.getProperty("output.topic.replication.factor")))); if (!topics.isEmpty()) client.createTopics(topics).all().get(); } } private Properties loadEnvProperties(String fileName) throws IOException { Properties envProps = new Properties(); try (FileInputStream input = new FileInputStream(fileName)) { envProps.load(input); } return envProps; } private static ProtobufSerdes<ClickOuterClass.Click> clickProtobufSerdes() { return new ProtobufSerdes<>(ClickOuterClass.Click.parser()); } }
主要的邏輯在buildTopology方法中,咱們使用自定義的DeduplicationTransformer來實現2分鐘的窗口化去重邏輯。
和以前的入門系列同樣,咱們編寫TestProducer和TestConsumer類。在src/main/java/huxihx/kafkastreams/tests/TestProducer.java和TestConsumer.java,內容分別以下:
TestProducer.java:
package huxihx.kafkastreams.tests; import huxihx.kafkastreams.proto.ClickOuterClass; import huxihx.kafkastreams.serdes.ProtobufSerializer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Arrays; import java.util.List; import java.util.Properties; public class TestProducer { private static final List<ClickOuterClass.Click> TEST_CLICK_EVENTS = Arrays.asList( ClickOuterClass.Click.newBuilder().setIp("10.0.0.1") .setUrl("https://docs.confluent.io/current/tutorials/examples/kubernetes/gke-base/docs/index.html") .setTimestamp("2019-09-16T14:53:43+00:00").build(), ClickOuterClass.Click.newBuilder().setIp("10.0.0.2") .setUrl("https://www.confluent.io/hub/confluentinc/kafka-connect-datagen") .setTimestamp("2019-09-16T14:53:43+00:01").build(), ClickOuterClass.Click.newBuilder().setIp("10.0.0.3") .setUrl("https://www.confluent.io/hub/confluentinc/kafka-connect-datagen") .setTimestamp("2019-09-16T14:53:43+00:03").build(), ClickOuterClass.Click.newBuilder().setIp("10.0.0.1") .setUrl("https://docs.confluent.io/current/tutorials/examples/kubernetes/gke-base/docs/index.html") .setTimestamp("2019-09-16T14:53:43+00:00").build(), ClickOuterClass.Click.newBuilder().setIp("10.0.0.2") .setUrl("https://www.confluent.io/hub/confluentinc/kafka-connect-datagen") .setTimestamp("2019-09-16T14:53:43+00:01").build(), ClickOuterClass.Click.newBuilder().setIp("10.0.0.3") .setUrl("https://www.confluent.io/hub/confluentinc/kafka-connect-datagen") .setTimestamp("2019-09-16T14:53:43+00:03").build() ); public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, new ProtobufSerializer<ClickOuterClass.Click>().getClass()); try (final Producer<String, ClickOuterClass.Click> producer = new KafkaProducer<>(props)) { TEST_CLICK_EVENTS.stream().map(click -> new ProducerRecord<String, ClickOuterClass.Click>("clicks", click)).forEach(producer::send); } } }
TestConsumer.java:
package huxihx.kafkastreams.tests; import com.google.protobuf.Parser; import huxihx.kafkastreams.proto.ClickOuterClass; import huxihx.kafkastreams.serdes.ProtobufDeserializer; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; public class TestConsumer { public static void main(String[] args) { Deserializer<ClickOuterClass.Click> deserializer = new ProtobufDeserializer<>(); Map<String, Parser<ClickOuterClass.Click>> config = new HashMap<>(); config.put("parser", ClickOuterClass.Click.parser()); deserializer.configure(config, false); Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group01"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); try (final Consumer<String, ClickOuterClass.Click> consumer = new KafkaConsumer<>(props, new StringDeserializer(), deserializer)) { consumer.subscribe(Arrays.asList("distinct-clicks")); while (true) { ConsumerRecords<String, ClickOuterClass.Click> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, ClickOuterClass.Click> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } } }
首先咱們運行下列命令構建項目:
$ ./gradlew shadowJar
而後啓動Kafka集羣,以後運行Kafka Streams應用:
$ java -jar build/libs/kstreams-transform-standalone-0.0.1.jar configuration/dev.properties
如今啓動兩個終端分別測試Producer和Consumer:
$ java -cp build/libs/kstreams-transform-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestProducer
$ java -cp build/libs/kstreams-transform-standalone-0.0.1.jar huxihx.kafkastreams.tests.TestConsumer
若是一切正常的話,那麼TestConsumer應該會輸出3條消息:
offset = 0, key = null, value = ip: "10.0.0.1"
url: "https://docs.confluent.io/current/tutorials/examples/kubernetes/gke-base/docs/index.html"
timestamp: "2019-09-16T14:53:43+00:00"
offset = 1, key = null, value = ip: "10.0.0.2"
url: "https://www.confluent.io/hub/confluentinc/kafka-connect-datagen"
timestamp: "2019-09-16T14:53:43+00:01"
offset = 2, key = null, value = ip: "10.0.0.3"
url: "https://www.confluent.io/hub/confluentinc/kafka-connect-datagen"
timestamp: "2019-09-16T14:53:43+00:03"