kafka的一些經常使用工具

環境

如下的操做都是基於kafka_2.11-2.2.0java

工具

新建topic

bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 2 --topic spring-kafka-demo2
  • replication-factor: 指定副本數量
  • partitions:指定分區

查看topic列表

/bin/kafka-topics.sh --zookeeper localhost:2181 --list

刪除某個topic

/bin/kafka-topics.sh --zookeeper localhost:2181 --delete -topic spring-kafka-demo

查看有哪些消費組

./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list

老版本是指定zk的地址,相似這樣:spring

kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list

新版本使用bootstrap,這個是區別。bootstrap

查看某個消費組的詳情

kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092  --group test-consumer-group --describe

這個是查看組名爲test-consumer-group的消費組的狀況。工具

查詢的結果相似下面這樣,spa

屏幕快照 2020-01-29 下午8.43.17.png

簡單對每列進行說明:code

  • TOPIC:

  消費者的topic名稱  server

  • PARTITION:

  分區數的名稱  rem

  • CURRENT-OFFSET:

  consumer group最後一次提交的offsetkafka

  • LOG-END-OFFSET:

  最後提交的生產消息offsetit

  • LAG

  消費offset與生產offset之間的差值

  • CONSUMER-ID

  消費者的ID編號,咱們知道消費者組裏面能夠有最少要有一個消費者,固然也能夠有多個消費者。

  • HOST

  消費者的主機IP地址。

  • CLIENT-ID

  連接的ID編號。

關於offset補充一些知識點。

kafka有個經常使用的設置是 auto.offset.reset,它的意義是,

該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的狀況下(因消費者長
時間失效,包含偏移量的記錄已通過時井被刪除)該做何處理。它的默認值是 latest , 意
思是說,在偏移量無效的狀況下,消費者將從最新的記錄開始讀取數據(在消費者啓動之
後生成的記錄)。另外一個值是 earliest ,意思是說,在偏移量無效的狀況下,消費者將從
起始位置讀取分區的記錄。

這個屬性有如下幾個值,

  • earliest: 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
  • latest: 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
  • none: topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常

我要強調的是,這個設置只有在咱們的消費者(或者消費者羣組)在分區內找不到有效的offset時纔會生效。

舉個例子,

咱們使用java kafka客戶端來操做kafak。

好比咱們在消費組group1有個消費者,消費了5條消息而後節點掛了。

而後咱們重啓這個消費節點,那麼我來問你,這個消費者會從哪裏開始消費?

若是你回答根據auto.offset.reset的配置來決定那就說明你沒理解我上面所說的。

正確的答案是,消費者會繼續從上次掛掉的offset(kafka broker保存)那裏繼續消費,根本不理會auto.offset.reset

再舉個例子,

生產者在某個topic生產了一些消息,而後咱們啓動一個消費組group2,裏面有一個消費者。

若是這個時候kafka沒有這個topic消息的offset信息,那麼auto.offset.reset的值就決定從哪裏消費。

相關文章
相關標籤/搜索