上一篇Kakfa學習筆記(二)——體驗Kafkajava
此次咱們用Java API來發送和消費消息shell
首先啓動zk和三個brokerapache
> bin/zookeeper-server-start.sh config/zookeeper.properties
> bin/kafka-server-start.sh config/server.properties
> bin/kafka-server-start.sh config/server-1.properties
> bin/kafka-server-start.sh config/server-2.properties
複製代碼
建立chat主題,分區和副本都爲3bootstrap
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic chat
複製代碼
maven引入依賴bash
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
複製代碼
編寫生產者代碼,指定topic爲chat,發送10條消息maven
public class Sender {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>("chat", ""+Integer.toString(i), Integer.toString(i)));
producer.close();
}
}
複製代碼
編寫消費者代碼,很簡單,把消費的內容打印出來post
public class Receiver {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "chat-room-1");//消費組id
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("chat"));//chat主題
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
複製代碼
咱們能夠前後啓動一個消費者和一個生產者,能夠看到消費者輸出10條數據學習
offset = 8, key = 0, value = 0
offset = 9, key = 2, value = 2
offset = 10, key = 3, value = 3
offset = 11, key = 9, value = 9
offset = 4, key = 4, value = 4
offset = 5, key = 6, value = 6
offset = 8, key = 1, value = 1
offset = 9, key = 5, value = 5
offset = 10, key = 7, value = 7
offset = 11, key = 8, value = 8
複製代碼
能夠看到消息並非有序的,由於消息分發到三個分區,這個消息者消費了三個分區spa
由於咱們這個topic有三個分區,咱們能夠先掛起三個消費者(同一個消費組),再起一個生產者,此次能夠看到消息是分攤在三個消費者,不會出現重複消息code
consumer-0:
offset = 4, key = 0, value = 0
offset = 5, key = 2, value = 2
offset = 6, key = 3, value = 3
offset = 7, key = 9, value = 9
consumer-1:
offset = 4, key = 1, value = 1
offset = 5, key = 5, value = 5
offset = 6, key = 7, value = 7
offset = 7, key = 8, value = 8
consumer-2:
offset = 2, key = 4, value = 4
offset = 3, key = 6, value = 6
複製代碼
若是咱們起四個消費者,看一下結果
consumer-0:
offset = 16, key = 0, value = 0
offset = 17, key = 2, value = 2
offset = 18, key = 3, value = 3
offset = 19, key = 9, value = 9
consumer-1:
consumer-2:
offset = 16, key = 1, value = 1
offset = 17, key = 5, value = 5
offset = 18, key = 7, value = 7
offset = 19, key = 8, value = 8
consumer-3
offset = 8, key = 4, value = 4
offset = 9, key = 6, value = 6
複製代碼
會發現consumer-1是消費不到消息的,由於只有3個分區,因此最多隻有3個消費者同時工做,多出來的一個不會消費
若是咱們想消息有序被送達並消費,在發送時能夠指定partition
producer.send(new ProducerRecord<String, String>("chat",1, Integer.toString(i), Integer.toString(i)));//第二個參數指定partition=1
複製代碼
咱們起兩個消費者
consumer-0:
consumer-1:
offset = 10, key = 0, value = 0
offset = 11, key = 1, value = 1
offset = 12, key = 2, value = 2
offset = 13, key = 3, value = 3
offset = 14, key = 4, value = 4
offset = 15, key = 5, value = 5
offset = 16, key = 6, value = 6
offset = 17, key = 7, value = 7
offset = 18, key = 8, value = 8
offset = 19, key = 9, value = 9
複製代碼
結果只有一個消費者消費了消息,而且能夠看到,消息是有序的