如何獲取kafka的broker保存的消費者信息?

如何獲取kafka的broker保存的消費者信息?

浪院長 浪尖聊大數據
kafka的消費者對於kafka 082版本,有
高階API (例子:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example)

低階API (例子:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example)
之分。html

二者的細節,能夠對比上面連接的例子。java

高階API消費者會有一個後臺線程單獨負責按照auto.commit.enable=true;
auto.commit.interval.ms={時間間隔}週期性提交offset到zk。
zk保存的offset信息以下:
如何獲取kafka的broker保存的消費者信息?git

kafka010的版本,例子連接:
http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
consumers在zookeeper上的信息已經沒有:github

如何獲取kafka的broker保存的消費者信息?
消費者的組和offset信息提交到broker的topic上了,topic名字__consumer_offsets。apache

kafka 010的__consumer_offsets topic的schema信息;bootstrap

val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64),
   new Field("metadata", STRING, "Associated metadata.", ""),
   new Field("commit_timestamp", INT64),
   new Field("expire_timestamp", INT64))

topic具體數據以下:ide

(test1,test,0)
[OffsetMetadata[26231,NO_METADATA],CommitTime 1537587480006,ExpirationTime 1537673880006]

獲取消費者offset的信息主要是爲了監控kafka消費者消費的lag 進而把控消費者的處理狀況,本文主要是幫助你們獲取kafka-0.10.+版本的消費者已經提交的offset信息,而後後面會再出文章去幫助你們獲取broker上指定topic分區的最大offset。函數

主函數完整代碼。大數據

package bigdata.spark.kafka

import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent.TimeUnit

import kafka.common.{KafkaException, OffsetAndMetadata}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import bigdata.spark.kafka.GroupMetadataManager._
import com.github.benmanes.caffeine.cache.{Cache, Caffeine, RemovalCause, RemovalListener}

object monitor {
 def createKafkaConsumer(): Consumer[Array[Byte], Array[Byte]] = {
   val props: Properties = new Properties()
   props.put("bootstrap.servers", "mt-mdh.local:9093")
   props.put("group.id", "test2")
   props.put("enable.auto.commit", "false")
   props.put("auto.offset.reset", "earliest")
   props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
   props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
   new KafkaConsumer[Array[Byte], Array[Byte]](props)
 }
 def readMessageKey(buffer: ByteBuffer): BaseKey = {
   val version = buffer.getShort
   val keySchema = schemaForKey(version)
   val key = keySchema.read(buffer)
//
   if (version <= CURRENT_OFFSET_KEY_SCHEMA_VERSION) {
     // version 0 and 1 refer to offset
     val group = key.get("group").asInstanceOf[String]
     val topic = key.get("topic").asInstanceOf[String]
     val partition = key.get("partition").asInstanceOf[Int]

     OffsetKey(version, GroupTopicPartition(group, new TopicPartition(topic, partition)))
   }else if (version == CURRENT_GROUP_KEY_SCHEMA_VERSION) {
     // version 2 refers to offset
     val group = key.get("group").asInstanceOf[String]

     GroupMetadataKey(version, group)
   } else {
     throw new IllegalStateException("Unknown version " + version + " for group metadata message")
   }
 }
 def readOffsetMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
   if (buffer == null) { // tombstone
     null
   } else {
     val version = buffer.getShort
     val valueSchema = schemaForOffset(version)
     val value = valueSchema.read(buffer)

     if (version == 0) {
       val offset = value.get("offset").asInstanceOf[Long]
       val metadata = value.get("metadata").asInstanceOf[String]
       val timestamp = value.get("timestamp").asInstanceOf[Long]

       OffsetAndMetadata(offset, metadata, timestamp)
     } else if (version == 1) {
       val offset = value.get("offset").asInstanceOf[Long]
       val metadata = value.get("metadata").asInstanceOf[String]
       val commitTimestamp = value.get("commit_timestamp").asInstanceOf[Long]
       val expireTimestamp = value.get("expire_timestamp").asInstanceOf[Long]

       OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
     } else {
       throw new IllegalStateException("Unknown offset message version")
     }
   }
 }
// 主要類是 kafkaStateActor
 def main(args: Array[String]): Unit = {
   val groupTopicPartitionOffsetMap:Cache[(String, String, Int), OffsetAndMetadata] = Caffeine
     .newBuilder()
     .maximumSize(1025)
     .expireAfterAccess(10, TimeUnit.DAYS)
     //    .removalListener(new RemovalListener[(String, String, Int), OffsetAndMetadata] {
     //      override def onRemoval(key: (String, String, Int), value: OffsetAndMetadata, cause: RemovalCause): Unit = {
     //        println("remove !")
     //      }
     //    })
     .build[(String, String, Int), OffsetAndMetadata]()
   val consumer = createKafkaConsumer()
   consumer.subscribe(java.util.Arrays.asList("__consumer_offsets"))
   while (true){
     val records: ConsumerRecords[Array[Byte], Array[Byte]] = consumer.poll(100)
     val iterator = records.iterator()
     while (iterator.hasNext) {
       val record = iterator.next()
       readMessageKey(ByteBuffer.wrap(record.key()))match {
         case OffsetKey(version, key) =>
           val orgnal = record.value()
           if(orgnal!=null){
             val value: OffsetAndMetadata = readOffsetMessageValue(ByteBuffer.wrap(record.value()))
             val newKey = (key.group, key.topicPartition.topic, key.topicPartition.partition)
             println(newKey)
             println(value)
             groupTopicPartitionOffsetMap.put(newKey, value)
           }
         case GroupMetadataKey(version, key) =>
           Nil
       }
     }
   }
 }

}
相關文章
相關標籤/搜索