如下的操做都是基於kafka_2.11-2.2.0java
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 2 --topic spring-kafka-demo2
/bin/kafka-topics.sh --zookeeper localhost:2181 --list
/bin/kafka-topics.sh --zookeeper localhost:2181 --delete -topic spring-kafka-demo
./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
老版本是指定zk的地址,相似這樣:spring
kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list
新版本使用bootstrap,這個是區別。bootstrap
kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --group test-consumer-group --describe
這個是查看組名爲test-consumer-group
的消費組的狀況。工具
查詢的結果相似下面這樣,spa
簡單對每列進行說明:code
消費者的topic名稱 server
分區數的名稱 rem
consumer group最後一次提交的offsetkafka
最後提交的生產消息offsetit
消費offset與生產offset之間的差值
消費者的ID編號,咱們知道消費者組裏面能夠有最少要有一個消費者,固然也能夠有多個消費者。
消費者的主機IP地址。
連接的ID編號。
關於offset補充一些知識點。
kafka有個經常使用的設置是 auto.offset.reset
,它的意義是,
該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的狀況下(因消費者長
時間失效,包含偏移量的記錄已通過時井被刪除)該做何處理。它的默認值是 latest , 意
思是說,在偏移量無效的狀況下,消費者將從最新的記錄開始讀取數據(在消費者啓動之
後生成的記錄)。另外一個值是 earliest ,意思是說,在偏移量無效的狀況下,消費者將從
起始位置讀取分區的記錄。
這個屬性有如下幾個值,
我要強調的是,這個設置只有在咱們的消費者(或者消費者羣組)在分區內找不到有效的offset時纔會生效。
舉個例子,
咱們使用java kafka客戶端來操做kafak。
好比咱們在消費組group1
有個消費者,消費了5條消息而後節點掛了。
而後咱們重啓這個消費節點,那麼我來問你,這個消費者會從哪裏開始消費?
若是你回答根據auto.offset.reset
的配置來決定那就說明你沒理解我上面所說的。
正確的答案是,消費者會繼續從上次掛掉的offset(kafka broker保存)那裏繼續消費,根本不理會auto.offset.reset
。
再舉個例子,
生產者在某個topic生產了一些消息,而後咱們啓動一個消費組group2
,裏面有一個消費者。
若是這個時候kafka沒有這個topic消息的offset信息,那麼auto.offset.reset
的值就決定從哪裏消費。