kafka 0.9 以前的版本偏移量信息是經過 zookeeper 管理的;爲了不對 zookeeper 的過分依賴,每次從 kafka 上讀取 topic 偏移量信息,鏈接消耗仍是比較大的,從 kafka 0.9 開始,kafka 已接管了偏移量信息管理功能,並將各消費組的偏移量寫入了 __consumer_offsets 主題(默認50個分區);
api 方式獲取某消費組的消費偏移量信息:
經過 ConsumerGroupCommand 做爲入口,而後調用 ConsumerGroupService接口,該接口有以下兩個實現:bootstrap
該接口以下方法:api
val listStrZk = "--zookeeper 192.168.xx.xx:2181 --list" //列出全部的消費組 val listStr = "--bootstrap-server 192.168.xx.xx:9092 --list --new-consumer" val listArgs = listStr.split(" ") val describeStrzk = "--zookeeper 192.168.xx.xx:2181 --describe --group 3" // 描述消費組信息 val describeStr = "--bootstrap-server 192.168.xx.xx:9092 --describe --group 6 --new-consumer" val describeArgs = describeStr.split(" ") val args = Array[String](topic,bootstrap,group,describe) // ConsumerGroupCommand.main(listArgs) ConsumerGroupCommand.main(describeArgs) /* 主題 分區 消費位移 最高日誌位移 消費滯後offset * TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 2282954 2743178 460224 - - - test 1 1500 860230 858730 - - - test 4 500 860231 859731 - - - test 3 0 860228 860228 - - - test 2 0 860226 860226 - - - * */