使用StreamAPI獲取student這個topic當中的數據,而後將數據所有轉爲大寫,寫入到teacher這個topic當中去java
bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 2 --partitions 3 --topic student ------------------------------------------------------------- bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --replication-factor 2 --partitions 3 --topic teacher
public class SteamApi { public static void main(String[] args) { Properties props = new Properties(); //設置程序的惟一標識 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); //設置kafka集羣 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092"); //設置序列化與反序列化 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); //實例一個計算邏輯 StreamsBuilder streamsBuilder = new StreamsBuilder(); //設置計算邏輯 streamsBuilder.stream("student").mapValues(line->line.toString().toUpperCase()).to("teacher"); //構建Topology對象(拓撲,流程) final Topology topology = streamsBuilder.build(); //實例 kafka流 KafkaStreams streams = new KafkaStreams(topology, props); //啓動流計算 streams.start(); } }
bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --topic student --from-beginning -------------------------------------------------------------------------------------------- bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --topic teacher --from-beginning
方式一:
使用代碼的形式:node
public class Producer { public static void main(String[] args) { //一、配置kafka集羣 Properties props = new Properties(); //kafka服務器地址 props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092"); //消息確認機制 props.put("acks", "all"); //重試機制 props.put("retries", 0); //批量發送的大小 props.put("batch.size", 16384); //消息延遲 props.put("linger.ms", 1); //批量的緩衝區大小 props.put("buffer.memory", 33554432); //kafka數據中key value的序列化 props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { ProducerRecord record = new ProducerRecord("student", "bbbb_" + i); kafkaProducer.send(record); } kafkaProducer.close(); } }
結果:
方式二:
使用Linux窗口命令的形式:web
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic student