kafka 消費者offset記錄位置和方式

 

咱們你們都知道,kafka消費者在會保存其消費的進度,也就是offset,存儲的位置根據選用的kafka api不一樣而不一樣。java

首先來講說消費者若是是根據javaapi來消費,也就是【kafka.javaapi.consumer.ConsumerConnector,咱們會配置參數【zookeeper.connect】來消費。這種狀況下,消費者的offset會更新到zookeeper的【consumers/{group}/offsets/{topic}/{partition}】目錄下,例如:apache

 

[zk: localhost(CONNECTED) 0] get /kafka/consumers/zoo-consumer-group/offsets/my-topic/0
5662
cZxid = 0x20006d28a
ctime = Wed Apr 12 18:20:51 CST 2017
mZxid = 0x30132b0ed
mtime = Tue Aug 22 18:53:22 CST 2017
pZxid = 0x20006d28a
cversion = 0
dataVersion = 5758
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 0

 

若是是根據kafka默認的api來消費,即【org.apache.kafka.clients.consumer.KafkaConsumer】,咱們會配置參數【bootstrap.servers】來消費。而其消費者的offset會更新到一個kafka自帶的topic【__consumer_offsets】下面,查看當前group的消費進度,則要依靠kafka自帶的工具【kafka-consumer-offset-checker】,例如:bootstrap

[root@localhost data]# kafka-consumer-offset-checker --zookeeper localhost :2181/kafka --group test-consumer-group  --topic stable-test
[2017-08-22 19:24:24,222] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group           Topic                          Pid Offset          logSize         Lag             Owner
test-consumer-group stable-test                    0   601808          601808          0               none
test-consumer-group stable-test                    1   602826          602828          2               none
test-consumer-group stable-test                    2   602136          602136          0               none

上面結果的說明:api

  • Group : 消費者組
  • Topic : topic的名字
  • Pid : partition的ID
  • Offset : kafka消費者在對應分區上已經消費的消息數【位置】
  • logSize : 已經寫到該分區的消息數【位置】
  • Lag : 還有多少消息未讀取(Lag = logSize - Offset)
  • Owner : 分區建立在哪一個broker

offset更新的方式,不區分是用的哪一種api,大體分爲兩類:工具

  1. 自動提交,設置enable.auto.commit=true,更新的頻率根據參數【auto.commit.interval.ms】來定。這種方式也被稱爲【at most once】,fetch到消息後就能夠更新offset,不管是否消費成功。
  2. 手動提交,設置enable.auto.commit=false,這種方式稱爲【at least once】。fetch到消息後,等消費完成再調用方法【consumer.commitSync()】,手動更新offset;若是消費失敗,則offset也不會更新,此條消息會被重複消費一次。
相關文章
相關標籤/搜索