Kafka是衆多MQ(Message Queue)中的一種,MQ廣泛都會面臨消息丟失的問題,形成消息丟失的緣由有不少種,例如:java
本文實驗採用的Kafka是kafka_2.11-1.1.1版本node
考慮有一個Topic,只有一個分區(num.partitions=1),副本因子是2(replication.factor=2).shell
在Kafka中,生產者生產的消息要分配到不一樣的partition中,而該Topic只有一個分區,因此只會生產進這個partition.apache
Note: 在Kafka中,生產者與消費者只與Partition leader 交互.測試
生產者只管發送數據到Kafka中,不在乎Kafka是否接收到. 這樣丟數據的機率是比較高的.spa
Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.ACKS_CONFIG, "0"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); final KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); // 發送1000W條數據 for (int i = 0; i < 10000000; i++) { kafkaProducer.send(new ProducerRecord<>("MillionMessage", i + "")); }
經過kafka-console-consumer.sh
進行消費,最後消費到9982846條數據(Processed a total of 9982846 messages). 丟失17154條數據,丟失的機率大概在0.017%左右..net
數據丟失的緣由就是生產者發送數據的時候,不會對數據進行確認. 在生產者發送數據的過程當中已經丟失. 數據沒有到達Kafka中. 對應到上圖中就是 生產者到parition leader的過程當中.日誌
當ack=1時,只保證數據到達partition leader中,可是不會保證數據必定會所有傳輸到partition replicas中. 在上面代碼的基礎上,只須要修改第三行代碼爲props.put(ProducerConfig.ACKS_CONFIG, "1");
.code
測試發送100W條數據,最後也接收到100W條數據. 圖片
這裏測試接收到100W條數據,並非100%每次都能接收到100W條數據,按照第一張圖片解釋,數據只是100%到達了partition leader中,若是此時partition leader所在的broker掛掉了,會從partition replicas中選舉出一個replica變爲leader來繼續對外提供服務.(Kafka保證每一個副本都處於不一樣的broker中). 選舉後的partition leader中可能沒有最新寫入的數據,這樣就是形成數據丟失的問題.
當Ack=all時,生產者寫入的效率會變慢,這裏測試1000W條數寫入Kafka中. 仍是改動上面第三行代碼props.put(ProducerConfig.ACKS_CONFIG, "all");
. 寫入1000W條數據耗時16147,約等於16s. 而ack=1時,耗時會在12s左右(在本地起的兩個實例,同步會比較快). 雖然相差不大,可是在生產環境中,不一樣的broker部署在不一樣的機器中,數據同步耗時相對比較長.
目前Topic的partition的分佈狀況
➜ kafka bin/kafka-topics.sh --describe --topic MillionMessage --zookeeper localhost:2181 Topic:MillionMessage PartitionCount:1 ReplicationFactor:2 Configs: Topic: MillionMessage Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0
當設置ack=1時,只保證partition leader接收到,因此作一下實驗. 寫入代碼仍是按照前面操做邏輯,模式broker宕機 (使用kill命令).
在模擬宕機後,生產者打出日誌以下:
WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 1 (/192.168.0.251:9093) could not be established. Broker may not be available.
說明broker(192.168.0.251:9093)發生宕機, 最後消費數據爲9997816. 數據丟失!!!
在進行上面這個實驗後,重啓掛掉的broker,查看主題:
➜ kafka bin/kafka-topics.sh --describe --topic MillionMessage --zookeeper localhost:2181 Topic:MillionMessage PartitionCount:1 ReplicationFactor:2 Configs: Topic: MillionMessage Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0 ➜ kafka bin/kafka-topics.sh --describe --topic MillionMessage --zookeeper localhost:2181 Topic:MillionMessage PartitionCount:1 ReplicationFactor:2 Configs: Topic: MillionMessage Partition: 0 Leader: 0 Replicas: 1,0 Isr: 0,1
最後broker成功上線,而且已經同步完成,可是該Topic的partition leader由broker=0中的副本擔當. 因此此次實驗須要掛掉broker=0.
仍是修改第三行代碼props.put(ProducerConfig.ACKS_CONFIG, "all");
. 而且也是發送1000W條數據.
經過kafka-console-consumer.sh
消費到10000867條數據. 說明數據確定發生了重複消費,可是不能保證數據沒有丟失. 這裏本身寫消費程序.
9999997 9999998 9999999 Processed a total of 10000867 messages
消費程序代碼以下:
public static void main(String[] args) { final Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "NewConsumer"); final KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Collections.singleton("MillionMessage")); final HashSet<String> total = new HashSet<>(); final HashSet<Long> totalOffset = new HashSet<>(); while (true) { final ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(3)); for (ConsumerRecord<String, String> record : records) { total.add(record.key()); totalOffset.add(record.offset()); } System.out.println("total ==>" + total.size()); System.out.println("totalOffset ==>" + total.size()); } }
最後經過觀察位移和key,均可以看出,確定沒有消息丟失,可是根據實驗,說明發生了消息重複.那麼怎麼解決消費重複呢?