kafka如何保存消費端的消費位置

在kafka 中,提供了一個 ___consumer_ offsets_* 的一個內置topicbootstrap

  • __consumer_offsets 默認分區50
  • 建立consumer_offsets_group 消費組ID 稍後會用到
    • 經過以下公式便可獲取:
      • Math.abs("consumer_offsets_group".hashCode()) % 50
      • 結果放在了11分區上
  • 查看該消費羣offsets:
    • bin/kafka-console-consumer.sh --topic __consumer_offsets --partition 11 --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
  • 結果:
    • [consumer_offsets_group/*消費組ID*/,consumer_offsets_test/*topic*/,1/*partition*/]::OffsetAndMetadata(offset=4/*提交的消費位移信息*/, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1543567461031, expireTimestamp=None) [consumer_offsets_group,consumer_offsets_test,0]::OffsetAndMetadata(offset=3, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1543567461031, expireTimestamp=None) [consumer_offsets_group,consumer_offsets_test,2]::OffsetAndMetadata(offset=4, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1543567461031, expireTimestamp=None) [consumer_offsets_group,consumer_offsets_test,1]::OffsetAndMetadata(offset=4, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1543567466027, expireTimestamp=None) [consumer_offsets_group,consumer_offsets_test,0]::OffsetAndMetadata(offset=3, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1543567466027, expireTimestamp=None) [consumer_offsets_group,consumer_offsets_test,2]::OffsetAndMetadata(offset=4, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1543567466027, expireTimestamp=None) [consumer_offsets_group,consumer_offsets_test,1]::OffsetAndMetadata(offset=4, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1543567468638, expireTimestamp=None) [consumer_offsets_group,consumer_offsets_test,0]::OffsetAndMetadata(offset=3, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1543567468638, expireTimestamp=None) [consumer_offsets_group,consumer_offsets_test,2]::OffsetAndMetadata(offset=4, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1543567468638, expireTimestamp=None)
相關文章
相關標籤/搜索