Kafka 經過 KafkaConsumer 構造器初始化生產者客戶端的配置。
經常使用的重要配置,詳見官網。html
// 基礎配置 Map<String, Object> configs = new HashMap<>(); configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094"); configs.put(ConsumerConfig.GROUP_ID_CONFIG, "my_test"); configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
Kafka 消費者提供4種方式訂閱主題,1種方式指定分區。正則表達式
// 指定主題 public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) public void subscribe(Collection<String> topics) public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) public void subscribe(Pattern pattern) // 指定分區 public void assign(Collection<TopicPartition> partitions)
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs); consumer.subscribe(Collections.singletonList("test")); // 指定主題 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3));
TopicPartition tp = new TopicPartition("test", 0); consumer.assign(Collections.singletonList(tp)); // 訂閱指定分區 consumer.seek(tp, 4L); // 指定分區偏移量值爲4 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3));
TopicPartition tp = new TopicPartition("test", 0); consumer.assign(Collections.singletonList(tp)); // 訂閱指定分區 Map<TopicPartition, Long> tpTime = new HashMap<>(); tpTime.put(tp, 1563027475113L); // 指定時間戳 Map<TopicPartition, OffsetAndTimestamp> tpOffsetAndTime = consumer.offsetsForTimes(tpTime); long offset = tpOffsetAndTime.get(tp).offset(); // 獲取偏移量 consumer.seek(tp, offset); // 指定偏移量 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(3));
參數說明apache
public void commitSync() public void commitSync(Duration timeout) public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout)
參數說明bootstrap
public void commitAsync() public void commitAsync(OffsetCommitCallback callback) public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
// 獲取分配給當前消費者的分區集合 public Set<TopicPartition> assignment() // 取消訂閱 public void unsubscribe() // 找到指定分區的第一個偏移量 public void seekToBeginning(Collection<TopicPartition> partitions) // 找到指定分區的最後一個偏移量 public void seekToEnd(Collection<TopicPartition> partitions) // 獲取指定分區即將消費的下一個偏移量 public long position(TopicPartition partition) // 獲取指定分區最後提交的偏移量 public OffsetAndMetadata committed(TopicPartition partition) // 獲取指定主題的分區列表 public List<PartitionInfo> partitionsFor(String topic) // 獲取全部主題的信息 public Map<String, List<PartitionInfo>> listTopics() // 暫停消費 public void pause(Collection<TopicPartition> partitions) // 恢復被暫停的消費 public void resume(Collection<TopicPartition> partitions) // 獲取暫停的分區列表 public Set<TopicPartition> paused() // 獲取指定分區第一個偏移量 public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) // 獲取指定分區最後一個偏移量 public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) // 喚醒消費者 public void wakeup()