一文精通kafka 消費者的三種語義

本文主要是以kafka 09的client爲例子,詳解kafka client的使用,包括kafka消費者的三種消費語義at-most-once, at-least-once, 和 exactly-once message ,生產者的使用等。java

(一) 建立topicsql

bin/kafka-topics --zookeeper localhost:2181 --create --topic normal-topic --partitions 2 --replication-factor 1數據庫

(二) 生產者apache

public class ProducerExample {
   public static void main(String[] str) throws InterruptedException, IOException {
           System.out.println("Starting ProducerExample ...");
           sendMessages();
   }
   private static void sendMessages() throws InterruptedException, IOException {
           Producer<String, String> producer = createProducer();
           sendMessages(producer);
           // Allow the producer to complete sending of the messages before program exit.
           Thread.sleep(20);
   }
   private static Producer<String, String> createProducer() {
       Properties props = new Properties();
       props.put("bootstrap.servers", "localhost:9092");
       props.put("acks", "all");
       props.put("retries", 0);
       // Controls how much bytes sender would wait to batch up before publishing to Kafka.
       props.put("batch.size", 10);
       props.put("linger.ms", 1);
       props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       return new KafkaProducer(props);
   }
   private static void sendMessages(Producer<String, String> producer) {
       String topic = "normal-topic";
       int partition = 0;
       long record = 1;
       for (int i = 1; i <= 10; i++) {
           producer.send(
               new ProducerRecord<String, String>(topic, partition,                                 Long.toString(record),Long.toString(record++)));
       }
   }
}bootstrap

(三)消費者微信

消費者註冊到kafka有多種方式:session

subscribe:這種方式在新增topic或者partition或者消費者增長或者消費者減小的時候,會進行消費者組內消費者的再平衡。app

assign:這種方式註冊的消費者不會進行rebalance。異步

上面兩種方式都是能夠實現,三種消費語義的。具體API的使用請看下文。nosql

1. At-most-once Kafka Consumer

作多一次消費語義是kafka消費者的默認實現。配置這種消費者最簡單的方式是

1). enable.auto.commit設置爲true。

2). auto.commit.interval.ms設置爲一個較低的時間範圍。

3). consumer.commitSync()不要調用該方法。

因爲上面的配置,就能夠使得kafka有線程負責按照指定間隔提交offset。

可是這種方式會使得kafka消費者有兩種消費語義:

a.最多一次語義->at-most-once

消費者的offset已經提交,可是消息還在處理,這個時候掛了,再重啓的時候會從上次提交的offset處消費,致使上次在處理的消息部分丟失。

b. 最少一次消費語義->at-least-once

消費者已經處理完了,可是offset還沒提交,那麼這個時候消費者掛了,就會致使消費者重複消費消息處理。可是因爲auto.commit.interval.ms設置爲一個較低的時間範圍,會下降這種狀況出現的機率。

代碼以下:

public class AtMostOnceConsumer {
       public static void main(String[] str) throws InterruptedException {
           System.out.println("Starting  AtMostOnceConsumer ...");
           execute();
       }
       private static void execute() throws InterruptedException {
               KafkaConsumer<String, String> consumer = createConsumer();
               // Subscribe to all partition in that topic. 'assign' could be used here
               // instead of 'subscribe' to subscribe to specific partition.
               consumer.subscribe(Arrays.asList("normal-topic"));
               processRecords(consumer);
       }
       private static KafkaConsumer<String, String> createConsumer() {
               Properties props = new Properties();
               props.put("bootstrap.servers", "localhost:9092");
               String consumeGroup = "cg1";
               props.put("group.id", consumeGroup);
               // Set this property, if auto commit should happen.
               props.put("enable.auto.commit", "true");
               // Auto commit interval, kafka would commit offset at this interval.
               props.put("auto.commit.interval.ms", "101");
               // This is how to control number of records being read in each poll
               props.put("max.partition.fetch.bytes", "135");
               // Set this if you want to always read from beginning.
               // props.put("auto.offset.reset", "earliest");
               props.put("heartbeat.interval.ms", "3000");
               props.put("session.timeout.ms", "6001");
               props.put("key.deserializer",
                       "org.apache.kafka.common.serialization.StringDeserializer");
               props.put("value.deserializer",
                       "org.apache.kafka.common.serialization.StringDeserializer");
               return new KafkaConsumer<String, String>(props);
       }
       private static void processRecords(KafkaConsumer<String, String> consumer)  {
               while (true) {
                       ConsumerRecords<String, String> records = consumer.poll(100);
                       long lastOffset = 0;
                       for (ConsumerRecord<String, String> record : records) {
                               System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(),                                             record.key(), record.value());
                               lastOffset = record.offset();
                        }
               System.out.println("lastOffset read: " + lastOffset);
               process();
               }
       }
       private static void process() throws InterruptedException {
               // create some delay to simulate processing of the message.
               Thread.sleep(20);
       }
}

2. At-least-once kafka consumer

實現最少一次消費語義的消費者也很簡單。

1). 設置enable.auto.commit爲false

2). 消息處理完以後手動調用consumer.commitSync()

這種方式就是要手動在處理完該次poll獲得消息以後,調用offset異步提交函數consumer.commitSync()。建議是消費者內部實現密等,來避免消費者重複處理消息進而獲得重複結果。最多一次發生的場景是消費者的消息處理完並輸出到結果庫(也多是部分處理完),可是offset還沒提交,這個時候消費者掛掉了,再重啓的時候會從新消費並處理消息。

代碼以下:

public class AtLeastOnceConsumer {
   public static void main(String[] str) throws InterruptedException {
           System.out.println("Starting AutoOffsetGuranteedAtLeastOnceConsumer ...");
           execute();
    }
   private static void execute() throws InterruptedException {
           KafkaConsumer<String, String> consumer = createConsumer();
           // Subscribe to all partition in that topic. 'assign' could be used here
           // instead of 'subscribe' to subscribe to specific partition.
           consumer.subscribe(Arrays.asList("normal-topic"));
           processRecords(consumer);
    }
    private static KafkaConsumer<String, String> createConsumer() {
           Properties props = new Properties();
           props.put("bootstrap.servers", "localhost:9092");
           String consumeGroup = "cg1";
           props.put("group.id", consumeGroup);
           // Set this property, if auto commit should happen.
           props.put("enable.auto.commit", "true");
           // Make Auto commit interval to a big number so that auto commit does not happen,
           // we are going to control the offset commit via consumer.commitSync(); after processing             // message.
           props.put("auto.commit.interval.ms", "999999999999");
           // This is how to control number of messages being read in each poll
           props.put("max.partition.fetch.bytes", "135");
           props.put("heartbeat.interval.ms", "3000");
           props.put("session.timeout.ms", "6001");
           props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
           props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
           return new KafkaConsumer<String, String>(props);
   }
    private static void processRecords(KafkaConsumer<String, String> consumer) throws {
           while (true) {
                   ConsumerRecords<String, String> records = consumer.poll(100);
                   long lastOffset = 0;
                   for (ConsumerRecord<String, String> record : records) {
                       System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(),                                         record.key(), record.value());
                       lastOffset = record.offset();
                   }
                   System.out.println("lastOffset read: " + lastOffset);
                   process();
                   // Below call is important to control the offset commit. Do this call after you
                   // finish processing the business process.
                   consumer.commitSync();
           }
   }
   private static void process() throws InterruptedException {
       // create some delay to simulate processing of the record.
       Thread.sleep(20);
   }
}

3. 使用subscribe實現Exactly-once 

使用subscribe實現Exactly-once 很簡單,具體思路以下:

1). 將enable.auto.commit設置爲false。

2). 不調用consumer.commitSync()。

3). 使用subcribe定於topic。

4). 實現一個ConsumerRebalanceListener,在該listener內部執行

consumer.seek(topicPartition,offset),從指定的topic/partition的offset處啓動。

5). 在處理消息的時候,要同時控制保存住每一個消息的offset。以原子事務的方式保存offset和處理的消息結果。傳統數據庫實現原子事務比較簡單。但對於非傳統數據庫,好比hdfs或者nosql,爲了實現這個目標,只能將offset與消息保存在同一行。

6). 實現密等,做爲保護層。

代碼以下:

public class ExactlyOnceDynamicConsumer {
      private static OffsetManager offsetManager = new OffsetManager("storage2");
       public static void main(String[] str) throws InterruptedException {
               System.out.println("Starting ExactlyOnceDynamicConsumer ...");
               readMessages();
       }
       private static void readMessages() throws InterruptedException {
               KafkaConsumer<String, String> consumer = createConsumer();
               // Manually controlling offset but register consumer to topics to get dynamically
               //  assigned partitions. Inside MyConsumerRebalancerListener use
               // consumer.seek(topicPartition,offset) to control offset which messages to be read.
               consumer.subscribe(Arrays.asList("normal-topic"),
                               new MyConsumerRebalancerListener(consumer));
               processRecords(consumer);
       }
       private static KafkaConsumer<String, String> createConsumer() {
               Properties props = new Properties();
               props.put("bootstrap.servers", "localhost:9092");
               String consumeGroup = "cg3";
               props.put("group.id", consumeGroup);
               // Below is a key setting to turn off the auto commit.
               props.put("enable.auto.commit", "false");
               props.put("heartbeat.interval.ms", "2000");
               props.put("session.timeout.ms", "6001");
               // Control maximum data on each poll, make sure this value is bigger than the maximum                   // single message size
               props.put("max.partition.fetch.bytes", "140");
               props.put("key.deserializer",                                 "org.apache.kafka.common.serialization.StringDeserializer");
               props.put("value.deserializer",                         "org.apache.kafka.common.serialization.StringDeserializer");
               return new KafkaConsumer<String, String>(props);
       }
       private static void processRecords(KafkaConsumer<String, String> consumer) {
           while (true) {
                   ConsumerRecords<String, String> records = consumer.poll(100);
                   for (ConsumerRecord<String, String> record : records) {
                           System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(),                                     record.key(), record.value());
                           // Save processed offset in external storage.
                           offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(),                                             record.offset());
                   }
              }
       }
}
public class MyConsumerRebalancerListener implements                                 org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
       private OffsetManager offsetManager = new OffsetManager("storage2");
       private Consumer<String, String> consumer;
       public MyConsumerRebalancerListener(Consumer<String, String> consumer) {
               this.consumer = consumer;
       }
       public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
               for (TopicPartition partition : partitions) {
                   offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(),                     consumer.position(partition));
               }
       }
       public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
               for (TopicPartition partition : partitions) {
                       consumer.seek(partition,                             offsetManager.readOffsetFromExternalStore(partition.topic(),                             partition.partition()));
               }
       }
}
/**
* The partition offset are stored in an external storage. In this case in a local file system where
* program runs.
*/

public class OffsetManager {
       private String storagePrefix;
       public OffsetManager(String storagePrefix) {
               this.storagePrefix = storagePrefix;
       }
   /**
       * Overwrite the offset for the topic in an external storage.
       *
       * @param topic - Topic name.
       * @param partition - Partition of the topic.
       * @param offset - offset to be stored.
       */

       void saveOffsetInExternalStore(String topic, int partition, long offset) {
           try {
               FileWriter writer = new FileWriter(storageName(topic, partition), false);
               BufferedWriter bufferedWriter = new BufferedWriter(writer);
               bufferedWriter.write(offset + "");
               bufferedWriter.flush();
               bufferedWriter.close();
           } catch (Exception e) {
                   e.printStackTrace();
                   throw new RuntimeException(e);
           }
       }
       /**
           * @return he last offset + 1 for the provided topic and partition.
       */

       long readOffsetFromExternalStore(String topic, int partition) {
               try {
                       Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));
                       return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;
               } catch (Exception e) {
                   e.printStackTrace();
               }
               return 0;
       }
       private String storageName(String topic, int partition) {
           return storagePrefix + "-" + topic + "-" + partition;
       }
}

4. 使用assign實現Exactly-once

使用assign實現Exactly-once 也很簡單,具體思路以下:

1). 將enable.auto.commit設置爲false。

2). 不調用consumer.commitSync()。

3). 調用assign註冊kafka消費者到kafka

4). 初次啓動的時候,調用consumer.seek(topicPartition,offset)來指定offset。

5). 在處理消息的時候,要同時控制保存住每一個消息的offset。以原子事務的方式保存offset和處理的消息結果。傳統數據庫實現原子事務比較簡單。但對於非傳統數據庫,好比hdfs或者nosql,爲了實現這個目標,只能將offset與消息保存在同一行。

6). 實現密等,做爲保護層。

代碼以下:

public class ExactlyOnceStaticConsumer {
       private static OffsetManager offsetManager = new OffsetManager("storage1");
       public static void main(String[] str) throws InterruptedException, IOException {
               System.out.println("Starting ExactlyOnceStaticConsumer ...");
               readMessages();
       }
       private static void readMessages() throws InterruptedException, IOException {
               KafkaConsumer<String, String> consumer = createConsumer();
               String topic = "normal-topic";
               int partition = 1;
               TopicPartition topicPartition =
                               registerConsumerToSpecificPartition(consumer, topic, partition);
               // Read the offset for the topic and partition from external storage.
               long offset = offsetManager.readOffsetFromExternalStore(topic, partition);
               // Use seek and go to exact offset for that topic and partition.
               consumer.seek(topicPartition, offset);
               processRecords(consumer);
       }
       private static KafkaConsumer<String, String> createConsumer() {
               Properties props = new Properties();
               props.put("bootstrap.servers", "localhost:9092");
               String consumeGroup = "cg2";
               props.put("group.id", consumeGroup);
               // Below is a key setting to turn off the auto commit.
               props.put("enable.auto.commit", "false");
               props.put("heartbeat.interval.ms", "2000");
               props.put("session.timeout.ms", "6001");
               // control maximum data on each poll, make sure this value is bigger than the maximum                 // single message size
               props.put("max.partition.fetch.bytes", "140");
               props.put("key.deserializer",                                     "org.apache.kafka.common.serialization.StringDeserializer");
               props.put("value.deserializer",                                     "org.apache.kafka.common.serialization.StringDeserializer");
               return new KafkaConsumer<String, String>(props);
       }
       /**
           * Manually listens for specific topic partition. But, if you are looking for example of how to                * dynamically listens to partition and want to manually control offset then see
           * ExactlyOnceDynamicConsumer.java
           */

        private static TopicPartition registerConsumerToSpecificPartition(
                   KafkaConsumer<String, String> consumer, String topic, int partition) {
                   TopicPartition topicPartition = new TopicPartition(topic, partition);
                   List<TopicPartition> partitions = Arrays.asList(topicPartition);
                   consumer.assign(partitions);
                   return topicPartition;
         }
           /**
               * Process data and store offset in external store. Best practice is to do these operations
               * atomically.
               */

           private static void processRecords(KafkaConsumer<String, String> consumer) throws {
                   while (true) {
                          ConsumerRecords<String, String> records = consumer.poll(100);
                           for (ConsumerRecord<String, String> record : records) {
                                   System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(),                                                 record.key(), record.value());
                                   offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(),                                                 record.offset());
                           }
                   }
           }
}

[完]

歡迎點贊轉發。

本文分享自微信公衆號 - 浪尖聊大數據(bigdatatip)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索