Assume that you installed confluent platform quickstart and have already started all of services.html
In this section we will use built-in CLI tools to manually write some example data to Kafka. In practice, you would rather rely on other means to feed your data into Kafka, for instance via Kafka Connect if you want to move data from other data systems into Kafka, or via Kafka Clients from within your own applications.java
We will now send some input data to a Kafka topic, which will be subsequently processed by a Kafka Streams application.apache
First, we need to create the input topic, named streams-plaintext-input
, and the output topic, named streams-wordcount-output
:bootstrap
# Create the input topic kafka-topics --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic streams-plaintext-input # Create the output topic kafka-topics --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic streams-wordcount-output
Next, we generate some input data and store it in a local file at /tmp/file-input.txt
:api
echo -e "all streams lead to kafka\nhello kafka streams\njoin kafka summit" > /tmp/file-input.txt
The resulting file will have the following contents:bash
all streams lead to kafka
hello kafka streams
join kafka summit
Lastly, we send this input data to the input topic:app
cat /tmp/file-input.txt | ./bin/kafka-console-producer --broker-list localhost:9092 --topic streams-plaintext-input
The Kafka console producer reads the data from STDIN
line-by-line, and publishes each line as a separate Kafka message to the topic streams-plaintext-input
, where the message key is null
and the message value is the respective line such as all streams lead to kafka
, encoded as a string.less
kafka-console-consumer --bootstrap-server localhost:9092 --topic streams-plaintext-input --from-beginning
Create a new maven projectmaven
assign GroupId and ArtifactIdide
Assign Project Name:
Edit Maven POM.xml for build and download dependencies
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.bai</groupId> <artifactId>kafka-streams-v1</artifactId> <version>1.0-SNAPSHOT</version> <properties> <avro.version>1.8.2</avro.version> <kafka.version>2.1.1-cp1</kafka.version> <confluent.version>5.1.2</confluent.version> <slf4j.version>1.7.25</slf4j.version> <java.version>1.8</java.version> <maven.compiler.version>3.6.1</maven.compiler.version> <maven.assembly.version>3.1.0</maven.assembly.version> <build-helper-maven.version>3.0.0</build-helper-maven.version> </properties> <repositories> <repository> <id>confluent</id> <url>https://packages.confluent.io/maven/</url> </repository> </repositories> <dependencies> <!--project dependencies --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> <!--for logging purposes--> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> </dependency> <!-- Dependencies below are required/recommended only when using Apache Avro. --> <dependency> <groupId>io.confluent</groupId> <artifactId>kafka-avro-serializer</artifactId> <version>${confluent.version}</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>${avro.version}</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>${avro.version}</version> </dependency> </dependencies> <build> <plugins> <!--force java 8--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>${maven.compiler.version}</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> </configuration> </plugin> <!--package as one fat jar--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>${maven.assembly.version}</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <addClasspath>true</addClasspath> <mainClass>com.example.kafka_stream.WordCountApp</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>assemble-all</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
Under folder resources create a new file named log4j.properties and add :
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%p %m (%c:%L) %n
Create a new Class
package com.example.kafka_stream; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; import java.util.Arrays; import java.util.Properties; import java.util.regex.Pattern; public class WordCountApp { public static void main(String[] args) { Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); config.put(StreamsConfig.CLIENT_ID_CONFIG, "wordcount-lambda-example-client"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); // Records should be flushed every 10 seconds. This is less than the default // in order to keep this example interactive. config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); // For illustrative purposes we disable record caches config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); // Set up serializers and deserializers, which we will use for overriding the default serdes // specified above. final Serde<String> stringSerde = Serdes.String(); final Serde<Long> longSerde = Serdes.Long(); StreamsBuilder builder = new StreamsBuilder(); // stream from kafka KStream<String, String> wordCountInput = builder.stream("streams-plaintext-input"); // wordCountInput.foreach(new ForeachAction<String, String>() { // @Override // public void apply(String s, String s2) { // System.out.println(s2); // } // }); final Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); final KTable<String, Long> wordCounts = wordCountInput .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))) .groupBy((k,v) -> v) .count(); // write results back to kafka wordCounts.toStream().to("streams-wordcount-output", Produced.with(stringSerde, longSerde)); KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config); kafkaStreams.cleanUp(); kafkaStreams.start(); // Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close)); } }
Here a common five important Kafka Streams Configurations, that you will need in every Kafka Streams project:
roperties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // consume from start config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
APPLICATION_ID_CONFIG (very important) will be used for:
StreamsConfig.DEFAULT_KEY / VALUE_SERDE_CLASS_CONFIG is used for serialization and deserialization of data
Stream from Kafka < null, "Kafka Kafka Streams" >
MapValues -> lowercase < null, "kafka kafka streams" >
FlatMapValues -> split by space <null, "kafka">, <null, "kafka", <null, "streams">
SelectKey -> apply a key <"kafka","kafka">, <"kafka","kafka">, <"streams","streams">
GroupByKey -> before aggregation (<"kafka","kafka">, <"kafka","kafka">), (<"streams","streams">)
Count -> occurrences in each group <"kafka", 2>, <"streams", 1>
In this section we will use built-in CLI tools to manually read data from Kafka. In practice, you would rather rely on other means to retrieve data from Kafka, for instance via Kafka Connect if you want to move data from Kafka to other data systems, or via Kafka Clients from within your own applications.
Open three terminals and perform:
# in first terminal # in project folder mvn package # in target folder java -cp kafka-streams-v1-1.0-SNAPSHOT-jar-with-dependencies.jar com.example.kafka_stream.WordCountApp # in second terminal kafka-console-producer --broker-list localhost:9092 --topic streams-plaintext-input >hallo blabla hallo # in third terminal kafka-console-consumer --topic streams-wordcount-outputs --from-beginning --bootstrap-server localhost:9092 --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer hallo 2 blabla
Running a Kafka Streams may eventually create internal intermediary topics.
Once you are done with the quick start you can shut down the Kafka cluster in the following order:
Ctrl-C
in the terminal it is running in. Alternatively, you can kill
the broker process.Ctrl-C
in its respective terminal. Alternatively, you can kill
the ZooKeeper process.Congratulations, you have now run your first Kafka Streams
for Scalling out your Kafka Streams application you can simply start more applications with java -jar <your app ja with dependencies>
Only make sure, that you have enough topic partitions!!!