1.在kafka中若是不設置消費的信息的話,一個消息只能被一個group.id消費一次,而新加如的group.id則會被「消費管理」記錄,並指定從當前記錄的消息位置開始向後消費。若是有段時間消費者關閉了,並有發送者發送消息那麼下次這個消費者啓動時也會接收到,可是咱們若是想要從這個topic的第一條消息消費呢?java
public class SimpleConsumerPerSonIndex2 { public static void main(String[] args) throws Exception { //Kafka consumer configuration settings String topicName = "mypartition001"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "partitiontest112"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); //要發送自定義對象,須要指定對象的反序列化類 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "com.ys.test.SpringBoot.zktest.encode.DecodeingKafka"); KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props); Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>(); hashMaps.put(new TopicPartition(topicName, 0), new OffsetAndMetadata(0)); consumer.commitSync(hashMaps); consumer.subscribe(Arrays.asList(topicName)); while (true) { ConsumerRecords<String, Object> records = consumer.poll(100); for (ConsumerRecord<String, Object> record : records){ System.out.println(record.toString()); } } } }
首先咱們在consumer.subscribe(Arrays.asList(topicName));訂閱一個topic以前要設置從這個topic的offset爲0的地方獲取。
注意:這樣的方法要保證這個group.id是新加入,若是是之前存在的,那麼會拋異常。
2.若是之前就存在的groupid想要獲取指定的topic的offset爲0開始以後的消息:apache
public class SimpleConsumerPerSonIndex2 { public static void main(String[] args) throws Exception { //Kafka consumer configuration settings String topicName = "mypartition001"; Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "partitiontest002"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); //要發送自定義對象,須要指定對象的反序列化類 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "com.ys.test.SpringBoot.zktest.encode.DecodeingKafka"); KafkaConsumer<String, Object> consumer = new KafkaConsumer<String, Object>(props); // consumer.subscribe(Arrays.asList(topicName)); consumer.assign(Arrays.asList(new TopicPartition(topicName, 0))); consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));//不改變當前offset // consumer.seek(new TopicPartition(topicName, 0), 10);//不改變當前offset while (true) { ConsumerRecords<String, Object> records = consumer.poll(100); for (ConsumerRecord<String, Object> record : records){ System.out.println(record.toString()); } } } }
使用 consumer.assign(Arrays.asList(new TopicPartition(topicName, 0)));來分配topic和partition,
而consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));指定從這個topic和partition的開始位置獲取。bootstrap
3.存在的groupid獲取指定的topic任意的offset
上面的代碼放開 consumer.seek(new TopicPartition(topicName, 0), 10);//不改變當前offset
並註釋 consumer.seekToBeginning(Arrays.asList(new TopicPartition(topicName, 0)));//不改變當前offset;
其中consumer.seek(new TopicPartition(topicName, 0), 10)中的10是表示從這個topic的partition中的offset爲10的開始獲取消息。session
須要注意的是 consumer.assign()是不會被消費者的組管理功能管理的,他相對因而一個臨時的,不會改變當前group.id的offset,好比:
在使用consumer.subscribe(Arrays.asList(topicName));時offset爲20,若是經過2和3,已經獲取了最新的消息offset是最新的,
在下次經過 consumer.subscribe(Arrays.asList(topicName));來獲取消息時offset仍是20.仍是會獲取20之後的消息。
其實在二、3的結果截圖中咱們也能夠發現沒有1中結果圖的joining group的日誌輸出,表示沒有加入到group中。3d