在kafka 中,提供了一個 ___consumer_ offsets_* 的一個內置topicbootstrap
- __consumer_offsets 默認分區50
- 建立consumer_offsets_group 消費組ID 稍後會用到
- 經過以下公式便可獲取:
- Math.abs("consumer_offsets_group".hashCode()) % 50
- 結果放在了11分區上
- 查看該消費羣offsets:
- bin/kafka-console-consumer.sh --topic __consumer_offsets --partition 11 --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
- 結果:
- [consumer_offsets_group/*消費組ID*/,consumer_offsets_test/*topic*/,1/*partition*/]::OffsetAndMetadata(offset=4/*提交的消費位移信息*/, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1543567461031, expireTimestamp=None) [consumer_offsets_group,consumer_offsets_test,0]::OffsetAndMetadata(offset=3, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1543567461031, expireTimestamp=None) [consumer_offsets_group,consumer_offsets_test,2]::OffsetAndMetadata(offset=4, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1543567461031, expireTimestamp=None) [consumer_offsets_group,consumer_offsets_test,1]::OffsetAndMetadata(offset=4, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1543567466027, expireTimestamp=None) [consumer_offsets_group,consumer_offsets_test,0]::OffsetAndMetadata(offset=3, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1543567466027, expireTimestamp=None) [consumer_offsets_group,consumer_offsets_test,2]::OffsetAndMetadata(offset=4, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1543567466027, expireTimestamp=None) [consumer_offsets_group,consumer_offsets_test,1]::OffsetAndMetadata(offset=4, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1543567468638, expireTimestamp=None) [consumer_offsets_group,consumer_offsets_test,0]::OffsetAndMetadata(offset=3, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1543567468638, expireTimestamp=None) [consumer_offsets_group,consumer_offsets_test,2]::OffsetAndMetadata(offset=4, leaderEpoch=Optional.empty, metadata=, commitTimestamp=1543567468638, expireTimestamp=None)