Kafka:實時計算-實時生產-適時傳遞-實時存儲--實時展示

需求

使用StreamAPI獲取student這個topic當中的數據,而後將數據所有轉爲大寫,寫入到teacher這個topic當中去java

第一步:建立兩個Topic

bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181  --replication-factor 2 --partitions 3 --topic student
-------------------------------------------------------------
bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181  --replication-factor 2 --partitions 3 --topic teacher

第二步:建立StreamApi(使用IDEA)

public class SteamApi {
    public static void main(String[] args) {

            Properties props = new Properties();
            //設置程序的惟一標識
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
            //設置kafka集羣
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");
            //設置序列化與反序列化
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            //實例一個計算邏輯
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            //設置計算邏輯
            streamsBuilder.stream("student").mapValues(line->line.toString().toUpperCase()).to("teacher");
            //構建Topology對象(拓撲,流程)
            final Topology topology = streamsBuilder.build();
            //實例 kafka流
            KafkaStreams streams = new KafkaStreams(topology, props);
            //啓動流計算
            streams.start();


    }
}

第三步:建立消費者Consumer

bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --topic student --from-beginning
--------------------------------------------------------------------------------------------
bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --topic teacher --from-beginning

第四步:建立生產者Producer

方式一:
使用代碼的形式:node

public class Producer {
    public static void main(String[] args) {

        //一、配置kafka集羣
        Properties props = new Properties();

        //kafka服務器地址
        props.put("bootstrap.servers", "node01:9092,node02:9092,node03:9092");
        //消息確認機制
        props.put("acks", "all");
        //重試機制
        props.put("retries", 0);
        //批量發送的大小
        props.put("batch.size", 16384);
        //消息延遲
        props.put("linger.ms", 1);
        //批量的緩衝區大小
        props.put("buffer.memory", 33554432);
        //kafka數據中key value的序列化
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
            ProducerRecord record = new ProducerRecord("student", "bbbb_" + i);
            kafkaProducer.send(record);
        }
        kafkaProducer.close();
    }
}

結果:
在這裏插入圖片描述
方式二:
使用Linux窗口命令的形式:web

bin/kafka-console-producer.sh  --broker-list node01:9092,node02:9092,node03:9092  --topic  student

在這裏插入圖片描述