一、數據清洗業務類LogProcessorcss
package com.css.kafka.kafka_stream; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; /** * 數據清洗*/ public class LogProcessor implements Processor<byte[], byte[]>{ private ProcessorContext context; //初始化 public void init(ProcessorContext context) { //傳輸 this.context = context; } //具體業務邏輯 public void process(byte[] key, byte[] value) { //1.拿到消息數據,轉成字符串 String message = new String(value); //2.若是包含- 去除 if (message.contains("-")) { //3.把- 去掉 以後去掉左側數據 message = message.split("-")[1]; } //4.發送數據 context.forward(key, message.getBytes()); } //釋放資源 public void close() { } }
二、Application類java
package com.css.kafka.kafka_stream; import java.util.Properties; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorSupplier; /** * 需求:對數據進行清洗操做 * * 思路:wo-henshuai 把-和wo清洗掉*/ public class Application { public static void main(String[] args) { //1.定義主題 發送到 另一個主題中 數據清洗 String oneTopic = "t1"; String twoTopic = "t2"; //2.設置屬性 Properties prop = new Properties(); prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "logProcessor"); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.146.132:9092,192.168.146.133:9092,192.168.146.134:9092"); //3.實例對象 StreamsConfig config = new StreamsConfig(prop); //4.流計算 拓撲 Topology builder = new Topology(); //5.定義kafka組件數據源 builder.addSource("Source", oneTopic).addProcessor("Processor", new ProcessorSupplier<byte[], byte[]>() { public Processor<byte[], byte[]> get() { return new LogProcessor(); } //從哪裏來 }, "Source") //到哪裏去 .addSink("Sink", twoTopic, "Processor"); //6.實例化kafkaStream KafkaStreams kafkaStreams = new KafkaStreams(builder, prop); kafkaStreams.start(); } }
三、運行Application類的main方法apache
四、在hd09-1機器上建立主題t1bootstrap
bin/kafka-topics.sh --zookeeper hd09-1:2181 --create --replication-factor 3 --partition 1 --topic t1
五、在hd09-2機器上啓動消費者ui
bin/kafka-console-consumer.sh --bootstrap-server hd09-2:9092 --topic t2 --from-beginning --consumer.config config/consumer.properties
六、在hd09-1機器上啓動生產者this
bin/kafka-console-producer.sh --broker-list hd09-1:9092 --topic t1
七、此時在hd09-1機器kafka生產者上輸入 wo-henshuai,在hd09-2消費者機器上會顯示henshuai,即完成了數據清洗,以下圖。spa