關於怎麼獲取kafka指定位置offset消息

1.在kafka中若是不設置消費的信息的話,一個消息只能被一個group.id消費一次,而新加如的group.id則會被「消費管理」記錄,並指定從當前記錄的消息位置開始向後消費。若是有段時間消費者關閉了,並有發送者發送消息那麼下次這個消費者啓動時也會接收到,可是咱們若是想要從這個topic的第一條消息消費呢?java

public class SimpleConsumerPerSonIndex2 {
public static void main(String[] args) throws Exception {
 
 
      //Kafka consumer configuration settings
      String topicName = "mypartition001";
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "partitiontest112");
      props.put("enable.auto.commit", "true"); 
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      
      //要發送自定義對象,須要指定對象的反序列化類
      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", "com.ys.test.SpringBoot.zktest.encode.DecodeingKafka");
        KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props);
        Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>();
      hashMaps.put(new TopicPartition(topicName, 0), new OffsetAndMetadata(0));
      consumer.commitSync(hashMaps);
      consumer.subscribe(Arrays.asList(topicName));
      while (true) {
      ConsumerRecords<String, Object> records = consumer.poll(100);
         for (ConsumerRecord<String, Object> record : records){
        System.out.println(record.toString());
         }
      }
      
   }
}

首先咱們在consumer.subscribe(Arrays.asList(topicName));訂閱一個topic以前要設置從這個topic的offset爲0的地方獲取。
注意:這樣的方法要保證這個group.id是新加入,若是是之前存在的,那麼會拋異常。
2.若是之前就存在的groupid想要獲取指定的topic的offset爲0開始以後的消息:apache

public class SimpleConsumerPerSonIndex2 {
public static void main(String[] args) throws Exception {
 
 
      //Kafka consumer configuration settings
      String topicName = "mypartition001";
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "partitiontest002");
      props.put("enable.auto.commit", "true"); 
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      
      //要發送自定義對象,須要指定對象的反序列化類
      props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", "com.ys.test.SpringBoot.zktest.encode.DecodeingKafka");
    KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props);
// consumer.subscribe(Arrays.asList(topicName));
      consumer.assign(Arrays.asList(new TopicPartition(topicName, 0)));
      consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));//不改變當前offset
//       consumer.seek(new TopicPartition(topicName, 0), 10);//不改變當前offset
     
      while (true) {
      ConsumerRecords<String, Object> records = consumer.poll(100);
         for (ConsumerRecord<String, Object> record : records){
        System.out.println(record.toString());
         }
      }
      
   }
}

使用 consumer.assign(Arrays.asList(new TopicPartition(topicName, 0)));來分配topic和partition,
 而consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));指定從這個topic和partition的開始位置獲取。bootstrap

3.存在的groupid獲取指定的topic任意的offset

上面的代碼放開 consumer.seek(new TopicPartition(topicName, 0), 10);//不改變當前offset
並註釋 consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));//不改變當前offset;
其中consumer.seek(new TopicPartition(topicName, 0), 10)中的10是表示從這個topic的partition中的offset爲10的開始獲取消息。session

須要注意的是 consumer.assign()是不會被消費者的組管理功能管理的,他相對因而一個臨時的,不會改變當前group.id的offset,好比:
在使用consumer.subscribe(Arrays.asList(topicName));時offset爲20,若是經過2和3,已經獲取了最新的消息offset是最新的,
在下次經過 consumer.subscribe(Arrays.asList(topicName));來獲取消息時offset仍是20.仍是會獲取20之後的消息。
其實在二、3的結果截圖中咱們也能夠發現沒有1中結果圖的joining group的日誌輸出,表示沒有加入到group中。3d

相關文章
相關標籤/搜索