Kafka生產者保證數據可靠傳輸

Kafka是衆多MQ(Message Queue)中的一種,MQ廣泛都會面臨消息丟失的問題,形成消息丟失的緣由有不少種,例如:java

  1. 生產者將消息發送,可是不確保消息到達MQ中
  2. MQ接收到消息,可是消息丟失了
  3. ...

本文實驗採用的Kafka是kafka_2.11-1.1.1版本node

Kafka發送消息模型

考慮有一個Topic,只有一個分區(num.partitions=1),副本因子是2(replication.factor=2).shell

簡單發送消息模型

在Kafka中,生產者生產的消息要分配到不一樣的partition中,而該Topic只有一個分區,因此只會生產進這個partition.apache

Note: 在Kafka中,生產者與消費者只與Partition leader 交互.測試

當ack=0時

生產者只管發送數據到Kafka中,不在乎Kafka是否接收到. 這樣丟數據的機率是比較高的.spa

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "0");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
final KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
// 發送1000W條數據
for (int i = 0; i < 10000000; i++) {
    kafkaProducer.send(new ProducerRecord<>("MillionMessage", i + ""));
}

經過kafka-console-consumer.sh進行消費,最後消費到9982846條數據(Processed a total of 9982846 messages). 丟失17154條數據,丟失的機率大概在0.017%左右..net

數據丟失的緣由就是生產者發送數據的時候,不會對數據進行確認. 在生產者發送數據的過程當中已經丟失. 數據沒有到達Kafka中. 對應到上圖中就是 生產者到parition leader的過程當中.日誌

當Ack=1時

當ack=1時,只保證數據到達partition leader中,可是不會保證數據必定會所有傳輸到partition replicas中. 在上面代碼的基礎上,只須要修改第三行代碼爲props.put(ProducerConfig.ACKS_CONFIG, "1");.code

測試發送100W條數據,最後也接收到100W條數據. 圖片

Ack=1,發送100W條數據

這裏測試接收到100W條數據,並非100%每次都能接收到100W條數據,按照第一張圖片解釋,數據只是100%到達了partition leader中,若是此時partition leader所在的broker掛掉了,會從partition replicas中選舉出一個replica變爲leader來繼續對外提供服務.(Kafka保證每一個副本都處於不一樣的broker中). 選舉後的partition leader中可能沒有最新寫入的數據,這樣就是形成數據丟失的問題.

當Ack=all時

當Ack=all時,生產者寫入的效率會變慢,這裏測試1000W條數寫入Kafka中. 仍是改動上面第三行代碼props.put(ProducerConfig.ACKS_CONFIG, "all");. 寫入1000W條數據耗時16147,約等於16s. 而ack=1時,耗時會在12s左右(在本地起的兩個實例,同步會比較快). 雖然相差不大,可是在生產環境中,不一樣的broker部署在不一樣的機器中,數據同步耗時相對比較長.

當發送數據的時候,有broker掛掉了

目前Topic的partition的分佈狀況

➜  kafka bin/kafka-topics.sh --describe --topic MillionMessage --zookeeper localhost:2181
Topic:MillionMessage    PartitionCount:1    ReplicationFactor:2    Configs:
    Topic: MillionMessage    Partition: 0    Leader: 1    Replicas: 1,0    Isr: 1,0

設置ack=1,掛掉broker=1

當設置ack=1時,只保證partition leader接收到,因此作一下實驗. 寫入代碼仍是按照前面操做邏輯,模式broker宕機 (使用kill命令).

在模擬宕機後,生產者打出日誌以下:

WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 1 (/192.168.0.251:9093) could not be established. Broker may not be available.

說明broker(192.168.0.251:9093)發生宕機, 最後消費數據爲9997816. 數據丟失!!!

ack=1,partition leader宕機

設置ack=all,掛掉broker=0

在進行上面這個實驗後,重啓掛掉的broker,查看主題:

➜  kafka bin/kafka-topics.sh --describe --topic MillionMessage --zookeeper localhost:2181
Topic:MillionMessage    PartitionCount:1    ReplicationFactor:2    Configs:
    Topic: MillionMessage    Partition: 0    Leader: 0    Replicas: 1,0    Isr: 0
➜  kafka bin/kafka-topics.sh --describe --topic MillionMessage --zookeeper localhost:2181
Topic:MillionMessage    PartitionCount:1    ReplicationFactor:2    Configs:
    Topic: MillionMessage    Partition: 0    Leader: 0    Replicas: 1,0    Isr: 0,1

最後broker成功上線,而且已經同步完成,可是該Topic的partition leader由broker=0中的副本擔當. 因此此次實驗須要掛掉broker=0.

仍是修改第三行代碼props.put(ProducerConfig.ACKS_CONFIG, "all");. 而且也是發送1000W條數據.

經過kafka-console-consumer.sh消費到10000867條數據. 說明數據確定發生了重複消費,可是不能保證數據沒有丟失. 這裏本身寫消費程序.

9999997
9999998
9999999
Processed a total of 10000867 messages

消費程序代碼以下:

public static void main(String[] args) {
    final Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "NewConsumer");
    final KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    kafkaConsumer.subscribe(Collections.singleton("MillionMessage"));
    final HashSet<String> total = new HashSet<>();
    final HashSet<Long> totalOffset = new HashSet<>();
    while (true) {
        final ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(3));
        for (ConsumerRecord<String, String> record : records) {
            total.add(record.key());
            totalOffset.add(record.offset());
        }
        System.out.println("total ==>" + total.size());
        System.out.println("totalOffset ==>" + total.size());
    }
}

最後經過觀察位移和key,均可以看出,確定沒有消息丟失,可是根據實驗,說明發生了消息重複.那麼怎麼解決消費重複呢?

消費者輸出

相關文章
相關標籤/搜索