Kakfa學習筆記(三)——Java API發送消費消息

上一篇Kakfa學習筆記(二)——體驗Kafkajava

此次咱們用Java API來發送和消費消息shell

一對一的發送消費

首先啓動zk和三個brokerapache

> bin/zookeeper-server-start.sh config/zookeeper.properties
> bin/kafka-server-start.sh config/server.properties
> bin/kafka-server-start.sh config/server-1.properties
> bin/kafka-server-start.sh config/server-2.properties
複製代碼

建立chat主題,分區和副本都爲3bootstrap

> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic chat
複製代碼

maven引入依賴bash

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.0</version>
</dependency>
複製代碼

編寫生產者代碼,指定topic爲chat,發送10條消息maven

public class Sender {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++)
            producer.send(new ProducerRecord<String, String>("chat", ""+Integer.toString(i), Integer.toString(i)));

        producer.close();

    }
}
複製代碼

編寫消費者代碼,很簡單,把消費的內容打印出來post

public class Receiver {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "chat-room-1");//消費組id
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("chat"));//chat主題
      
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}
複製代碼

咱們能夠前後啓動一個消費者和一個生產者,能夠看到消費者輸出10條數據學習

offset = 8, key = 0, value = 0
offset = 9, key = 2, value = 2
offset = 10, key = 3, value = 3
offset = 11, key = 9, value = 9
offset = 4, key = 4, value = 4
offset = 5, key = 6, value = 6
offset = 8, key = 1, value = 1
offset = 9, key = 5, value = 5
offset = 10, key = 7, value = 7
offset = 11, key = 8, value = 8
複製代碼

能夠看到消息並非有序的,由於消息分發到三個分區,這個消息者消費了三個分區spa

多消費者消費

由於咱們這個topic有三個分區,咱們能夠先掛起三個消費者(同一個消費組),再起一個生產者,此次能夠看到消息是分攤在三個消費者,不會出現重複消息code

consumer-0:
offset = 4, key = 0, value = 0
offset = 5, key = 2, value = 2
offset = 6, key = 3, value = 3
offset = 7, key = 9, value = 9

consumer-1:
offset = 4, key = 1, value = 1
offset = 5, key = 5, value = 5
offset = 6, key = 7, value = 7
offset = 7, key = 8, value = 8

consumer-2:
offset = 2, key = 4, value = 4
offset = 3, key = 6, value = 6
複製代碼

若是咱們起四個消費者,看一下結果

consumer-0:
offset = 16, key = 0, value = 0
offset = 17, key = 2, value = 2
offset = 18, key = 3, value = 3
offset = 19, key = 9, value = 9

consumer-1:

consumer-2:
offset = 16, key = 1, value = 1
offset = 17, key = 5, value = 5
offset = 18, key = 7, value = 7
offset = 19, key = 8, value = 8

consumer-3
offset = 8, key = 4, value = 4
offset = 9, key = 6, value = 6
複製代碼

會發現consumer-1是消費不到消息的,由於只有3個分區,因此最多隻有3個消費者同時工做,多出來的一個不會消費

有序消息

若是咱們想消息有序被送達並消費,在發送時能夠指定partition

producer.send(new ProducerRecord<String, String>("chat",1, Integer.toString(i), Integer.toString(i)));//第二個參數指定partition=1
複製代碼

咱們起兩個消費者

consumer-0:

consumer-1:
offset = 10, key = 0, value = 0
offset = 11, key = 1, value = 1
offset = 12, key = 2, value = 2
offset = 13, key = 3, value = 3
offset = 14, key = 4, value = 4
offset = 15, key = 5, value = 5
offset = 16, key = 6, value = 6
offset = 17, key = 7, value = 7
offset = 18, key = 8, value = 8
offset = 19, key = 9, value = 9
複製代碼

結果只有一個消費者消費了消息,而且能夠看到,消息是有序的

相關文章
相關標籤/搜索