kafka-stream數據清洗

一、數據清洗業務類LogProcessorcss

package com.css.kafka.kafka_stream;

import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;

/**
 * 數據清洗*/
public class LogProcessor implements Processor<byte[], byte[]>{

    private ProcessorContext context;
    
    //初始化
    public void init(ProcessorContext context) {
        //傳輸
        this.context = context;
    }

    //具體業務邏輯
    public void process(byte[] key, byte[] value) {
        //1.拿到消息數據,轉成字符串
        String message = new String(value);
        
        //2.若是包含-  去除
        if (message.contains("-")) {
            //3.把- 去掉 以後去掉左側數據
            message = message.split("-")[1];
        }
        //4.發送數據
        context.forward(key, message.getBytes());
    }

    //釋放資源
    public void close() {
    }
}

二、Application類java

package com.css.kafka.kafka_stream;

import java.util.Properties;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;

/**
 * 需求:對數據進行清洗操做
 * 
 * 思路:wo-henshuai  把-和wo清洗掉*/
public class Application {

    public static void main(String[] args) {
        //1.定義主題 發送到 另一個主題中 數據清洗
        String oneTopic = "t1";
        String twoTopic = "t2";
        
        //2.設置屬性
        Properties prop = new Properties();
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "logProcessor");
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.146.132:9092,192.168.146.133:9092,192.168.146.134:9092");
        
        //3.實例對象
        StreamsConfig config = new StreamsConfig(prop);
        
        //4.流計算 拓撲
        Topology builder = new Topology();
        
        //5.定義kafka組件數據源
        builder.addSource("Source", oneTopic).addProcessor("Processor", new ProcessorSupplier<byte[], byte[]>() {

            public Processor<byte[], byte[]> get() {
                return new LogProcessor();
            }
            //從哪裏來
        }, "Source")
        //到哪裏去
        .addSink("Sink", twoTopic, "Processor");

        //6.實例化kafkaStream
        KafkaStreams kafkaStreams = new KafkaStreams(builder, prop);
        kafkaStreams.start();
    }
}

三、運行Application類的main方法apache

四、在hd09-1機器上建立主題t1bootstrap

bin/kafka-topics.sh --zookeeper hd09-1:2181 --create --replication-factor 3 --partition 1 --topic t1

五、在hd09-2機器上啓動消費者ui

bin/kafka-console-consumer.sh --bootstrap-server hd09-2:9092 --topic t2 --from-beginning --consumer.config config/consumer.properties

六、在hd09-1機器上啓動生產者this

bin/kafka-console-producer.sh --broker-list hd09-1:9092 --topic t1

七、此時在hd09-1機器kafka生產者上輸入  wo-henshuai,在hd09-2消費者機器上會顯示henshuai,即完成了數據清洗,以下圖。spa

相關文章
相關標籤/搜索