【Kafka】實戰(二)Kafka使用 Producer 發送消息及 Consumer 接收消息消費

1、Kafka的Producer

Producer就是用於向Kafka發送數據。以下:
在這裏插入圖片描述shell

一、添加依賴:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.0.0</version>
</dependency>

二、發送消息

2.1 啓動kafka(Kafka shell命令)

[root@hadoop001 ~]# kafka-server-start.sh -daemon /opt/lns/server.properties

2.2 建立topic:kb09two,3個分區,每一個分區有1個副本

[root@hadoop001 ~]# kafka-topics.sh --create --zookeeper 192.168.247.201:2181 --topic kb09two --partitions 3 --replication-factor 1
[root@hadoop001 ~]# kafka-topics.sh --zookeeper 192.168.247.201:2181 --list

在這裏插入圖片描述

2.3 建立生產者(Java代碼)

public class MyProducer {
    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.247.201:9092");
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        prop.put(ProducerConfig.ACKS_CONFIG,"-1");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);

        for (int i = 0; i < 200; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("kb09two", "hello world" + i);
            producer.send(producerRecord);
            try {
               Thread.sleep(100);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        System.out.println("game over");
    }
}

執行Java代碼,發送消息apache

2.4 查看消息隊列中每一個分區中的數量(Kafka shell命令)

[root@hadoop001 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.247.201:9092 --topic kb09two --time -1 --offsets 1

在這裏插入圖片描述

2.5 消費消息(Kafka shell命令)

[root@hadoop001 ~]# kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic kb09two --from-beginning

在這裏插入圖片描述

補充內容

kafka 客戶端 producer 配置參數
在這裏插入圖片描述
在這裏插入圖片描述bootstrap

2、Consumer概要

consumer中的關鍵術語:
消費者(consumer):從kafka中拉取數據並進行處理
消費者組(consumer group):一個消費者組由一個或者多個consumer實例組成
位移(offset):記錄當前分區消費數據的位置
位移提交(offset commit):將消費完成的消息的最大offset提交確認
位移topic(_consumer_offset):保存消費位移的topic服務器

1.消費方式:(poll/push)

Kafka Consumer採用的是主動拉取broker數據進行消費的。通常消息中間件存在推送(server推送數據給consumer)和拉取(consumer主動取服務器取數據)兩種方式,這兩種方式各有優劣。多線程

若是是選擇推送的方式最大的阻礙就是服務器不清楚consumer的消費速度,若是consumer中執行的操做又是比較耗時的,那麼consumer可能會不堪重負,甚至會致使系統掛掉。ide

而採用拉取的方式則能夠解決這種狀況,consumer根據本身的狀態來拉取數據,能夠對服務器的數據進行延遲處理。可是這種方式也有一個劣勢就是服務器沒有數據的時候可能會一直輪詢,不過還好Kafka在poll()有參數容許消費者請求在「長輪詢」中阻塞,等待數據到達(而且可選地等待直到給定數量的字節可用以確保傳輸大小)。oop

2.Consumer經常使用參數說明

在這裏插入圖片描述

3.Consumer程序開發

構建Consumer
Consumer有三種消費交付語義
一、至少一次:消息不會丟失,但可能被重複處理(實現簡單)
二、最多一次:消息可能丟失可能會被處理,但最多隻會被處理一次(實現簡單)
三、精確一次:消息被處理而且只會被處理一次(比較難實現)spa

一個消費者組G1裏只有一個消費者(單線程)線程

public class MyConsumer {
    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.247.201:9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 消息key反序列化器
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);// 消息value反序列化器
        prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000"); // 消費者和羣組協調器的最大心跳時間
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); // 設置手動提交方式
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); // 自動提交的時間

        // 當消費者讀取偏移量無效的狀況下,須要重置消費起始位置,默認爲latest(從消費者啓動後生成的記錄),另一個選項值是 earliest,將從有效的最小位移位置開始消費
        // (1)earliest ,會從該分區當前最開始的offset消息開始消費(即從頭消費),若是最開始的消息offset是0,那麼消費者的offset就會被更新爲0.
        // (2)latest,只消費當前消費者啓動完成後生產者新生產的數據。舊數據不會再消費。offset被重置爲分區的HW。
        // (3)none,啓動消費者時,該消費者所消費的主題的分區沒有被消費過,就會拋異常。
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        prop.put(ConsumerConfig.GROUP_ID_CONFIG,"G1"); // 標識一個consumer組的名稱

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Collections.singleton("kb09two"));

        // 一個消費者組G1裏只有一個消費者(單線程)
        while (true){
            ConsumerRecords<String, String> poll = consumer.poll(100);
            for (ConsumerRecord<String, String> record : poll) {
                System.out.println(record.offset()+"t"+record.key()+"t"+record.value());
            }
        }

模擬多消費者在同一個消費組G2(多線程)scala

public class MyConsumer {
    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.247.201:9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 消息key反序列化器
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);// 消息value反序列化器
        prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000"); // 消費者和羣組協調器的最大心跳時間
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); // 設置手動提交方式
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); // 自動提交的時間

        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        prop.put(ConsumerConfig.GROUP_ID_CONFIG,"G2");

        // 模擬多消費者在同一個消費組G2(多線程)
        for (int i = 0; i < 4; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
                    consumer.subscribe(Collections.singleton("kb09two"));
                    while (true){
                        ConsumerRecords<String, String> poll = consumer.poll(100);
                        for (ConsumerRecord<String, String> record : poll) {
                            System.out.println(Thread.currentThread().getName()+"t"+
                                    record.offset()+"t"+ record.key()+"t"+ record.value());
                        }
                    }
                }
            }).start();
        }
    }
}
相關文章
相關標籤/搜索