Kafka系列之-Kafka Protocol實例分析

  本文基於A Guide To The Kafka Protocol文檔,以及Spark Streaming中實現的org.apache.spark.streaming.kafka.KafkaCluster類。整理出Kafka中有關php

  • Metadata API
  • Produce API
  • Fetch API
  • Offset API(Aka ListOffset)
  • Offset Commit/Fetch API
  • Group Membership API
  • Administrative API
      

零、準備工做

  須要運行如下部分的示例代碼時,須要提早建好須要的topic,寫入一些message,再用consumer消費一下。python

一、新建topic

[hadoop@kafka001 kafka]$ bin/kafka-topics.sh --zookeeper kafka001:2181 --create --topic kafka_protocol_test --replication-factor 3 --partitions 4
Created topic "kafka_protocol_test".
[hadoop@kafka001 kafka]$ bin/kafka-topics.sh --zookeeper kafka001:2181 --describe --topic kafka_protocol_test
Topic:kafka_protocol_test   PartitionCount:4    ReplicationFactor:3 Configs:
    Topic: kafka_protocol_test  Partition: 0    Leader: 1   Replicas: 1,2,3 Isr: 1,2,3
    Topic: kafka_protocol_test  Partition: 1    Leader: 2   Replicas: 2,3,4 Isr: 2,3,4
    Topic: kafka_protocol_test  Partition: 2    Leader: 3   Replicas: 3,4,1 Isr: 3,4,1
    Topic: kafka_protocol_test  Partition: 3    Leader: 4   Replicas: 4,1,2 Isr: 4,1,2

二、produce message

  使用Kafka系列之-自定義Producer中提到的KafkaProducerTool往指定kafka_protocol_test中發送消息,apache

public class ProducerTest2 {
    public static void main(String[] args) throws InterruptedException {
        KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl("D:\\Files\\test\\kafkaconfig.properties");
        int i = 1;
        while(true) {
            kafkaProducerTool.publishMessage("message" + (i++));
        }
    }
}

  運行一段時間後中止寫入。運行一個console-consumerkafka_protocol_test消費messageapi

三、consume message

  運行一個console-consumerkafka_protocol_test消費。注意觀察該topic每一個partition中的messages數。markdown

[hadoop@kafka001 kafka]$ bin/kafka-console-consumer.sh --zookeeper kafka001:2181 --topic kafka_protocol_test --from-beginning

  這裏寫圖片描述

1、Metadata API

  這個API是經過向Kafka集羣發送一個TopicMetadaaRequest請求,獲得MetadataResponse響應後從MetadataResponse中解析出Metadata相關信息。
  TopicMetadataRequest的結構和示例以下
  app

case class TopicMetadataRequest(val versionId: Short,
                                val correlationId: Int,
                                val clientId: String,
                                val topics: Seq[String])

TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics)

  獲得的MetadataResponse包含的信息以下,能夠從PartitionMetadata中獲取到Partition相關信息,從TopicMetadata中獲取到Topic相關信息,Broker中記錄了Brokerip和端口號等。dom

MetadataResponse => [Broker][TopicMetadata]
  Broker => NodeId Host Port  (any number of brokers may be returned)
    NodeId => int32
    Host => string
    Port => int32
  TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
    TopicErrorCode => int16
  PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
    PartitionErrorCode => int16
    PartitionId => int32
    Leader => int32
    Replicas => [int32]
    Isr => [int32]

一、所包含的信息

能夠查詢指定Topic是否存在,
指定topic有多少個partition,
每一個partition當前哪一個broker處於leader狀態,
每一個broker的host和port是什麼socket

  若是設置了auto.create.topics.enable參數,遇到不存在的topic時,就會按默認replicationpartition新建該不存在的topic
  ide

二、示例

  生成一個TopicMetadataRequest對象oop

// 封裝一個TopicMetadataRequest類型的請求對象
val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics)
// 發送該請求
val resp: TopicMetadataResponse = consumer.send(req)
// 其中consumer對象是SimpleConsumer類型的
new SimpleConsumer(host, port, config.socketTimeoutMs,
      config.socketReceiveBufferBytes, config.clientId)

(1)查詢topic是否存在
  因爲在TopicMetadataRequest中能夠發送一組Seq[String]類型的topics,因此獲取到的TopicMetadataResponse.topicsMetadataSet[TopicMetadata]類型的。
  對每一個TopicMetadata對象,若是其errorCode不爲ErrorMapping.NoError即表示該Topic不正常。
  

topicMetadatas.foreach { topic =>
  if (topic.errorCode == ErrorMapping.NoError)
    println(s"topic: ${topic.topic}存在")
  else
    println(s"topic: ${topic.topic}不存在")
}

(2)獲取Topic的Partition個數
  首先將全部TopicMetadata中正常的Topic過濾出來,而後遍歷每個TopicMetadata對象,獲取其partitionsMetadata信息,其長度即Partition的個數

val existsTopicMetadatas = topicMetadatas.filter(tm => tm.errorCode == ErrorMapping.NoError)
existsTopicMetadatas.foreach { topic =>
   val numPartitions = topic.partitionsMetadata.length
   println(s"topic: ${topic.topic} 有${numPartitions}個partition")
}

(3)獲取Partition具體狀況
  如下代碼能夠獲取到Topic的每一個Partition中的Leader Partition以及replication節點的信息。

existsTopicMetadatas.foreach { topic =>
    println(s"topic:${topic.topic}的Partition信息:")
    topic.partitionsMetadata.foreach { pm =>
    val leaderPartition = pm.leader
    println(s"\tpartition: ${pm.partitionId}")
    println(s"\tleader節點:$leaderPartition")
    val replicas = pm.replicas
    println(s"\treplicas節點:$replicas")
  }
}

三、運行結果

  傳入上面新建的kafka_protocol_test以及一個不存在的topic kafka_protocol_test1,以上代碼的運行結果以下:

=============Topic相關信息===========
topic: kafka_protocol_test存在
topic: kafka_protocol_test1不存在
topic: kafka_protocol_test 有4個partition
=============Partition相關信息===========
topic:kafka_protocol_test的Partition信息:
    partition: 0
    leader節點:Some(id:1,host:kafka001,port:9092)
    replicas節點:Vector(id:1,host:kafka001,port:9092, id:2,host:kafka002,port:9092, id:3,host:kafka003,port:9092)
    partition: 1
    leader節點:Some(id:2,host:kafka002,port:9092)
    replicas節點:Vector(id:2,host:kafka002,port:9092, id:3,host:kafka003,port:9092, id:4,host:kafka004,port:9092)
    partition: 2
    leader節點:Some(id:3,host:kafka003,port:9092)
    replicas節點:Vector(id:3,host:kafka003,port:9092, id:4,host:kafka004,port:9092, id:1,host:kafka001,port:9092)
    partition: 3
    leader節點:Some(id:4,host:kafka004,port:9092)
    replicas節點:Vector(id:4,host:kafka004,port:9092, id:1,host:kafka001,port:9092, id:2,host:kafka002,port:9092)

2、Produce API

3、Fetch API

4、Offset API(Aka ListOffset)

  這個API經過向Kafka集羣發送一個OffsetRequest對象,從返回的OffsetResponse對象中獲取Offset相關信息。
  OffsetRequest對象描述以下

OffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]]
  ReplicaId => int32
  TopicName => string
  Partition => int32
  Time => int64
  MaxNumberOfOffsets => int32

  上面Time的做用是,獲取特定時間(單位爲ms)以前的全部messages。若是設置爲-1則獲取最新的offset,即下一條messagesoffset位置;若是設置爲-2則獲取第一條messageoffset位置,即當前partition中的offset起始位置。

  OffsetResponse對象描述以下

OffsetResponse => [TopicName [PartitionOffsets]]
  PartitionOffsets => Partition ErrorCode [Offset]
  Partition => int32
  ErrorCode => int16
  Offset => int64

一、所包含的信息

  經過該API能夠獲取指定topic-partition集合的合法offset的範圍,須要直接鏈接到PartitionLeader節點。

二、示例

  獲取指定topic下全部partitionoffset範圍
  封裝一個getLeaderOffsets方法,在此方法的基礎上分別封裝一個getEarliestLeaderOffsets方法用於獲取最小offsetgetLatestLeaderOffsets用於獲取最大offset
  分別傳入的關鍵參數是前面提到的Time

def getLatestLeaderOffsets(
       topicAndPartitions: Set[TopicAndPartition]
       ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
    getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime) // -1L
def getEarliestLeaderOffsets(
       topicAndPartitions: Set[TopicAndPartition]
       ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
    getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime) // -2L

  在getLeaderOffsets中,查詢到當前partition的leader節點,
  def findLeaders(topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
    // 獲取當前topicAndPartitions中的全部topic
    val topics = topicAndPartitions.map(_.topic)
    // 獲取topic對應的MetadataResp對象,以前已過濾不存在的topic,因此這裏無需進一步過濾
    val topicMetadatas = getMetadataResp(topics.toSeq).left.get

    val leaderMap = topicMetadatas.flatMap { topic =>
      topic.partitionsMetadata.flatMap { pm =>
        val tp = TopicAndPartition(topic.topic, pm.partitionId)
        // 獲取對應PartitionMedatada的leader節點信息
        pm.leader.map { l =>
          tp -> (l.host -> l.port)
        }
      }
    }.toMap
    Right(leaderMap)
  }

  而後在這些節點中,封裝一個OffsetRequest對象,向Kafka集羣得到OffsetResponse對象。

val resp = consumer.getOffsetsBefore(req)
val respMap = resp.partitionErrorAndOffsets

  最後從OffsetResponse對象中獲取offset範圍,

val resp = getMetadataResp(topics.toSeq)
    // 若是獲取的resp是left,則處理返回的Set[TopicMetadata]
val topicAndPartitions = processRespInfo(resp) { resp =>
val topicMetadatas = resp.left.get.asInstanceOf[Set[TopicMetadata]]
val existsTopicMetadatas = topicMetadatas.filter(tm => tm.errorCode == ErrorMapping.NoError)
   getPartitions(existsTopicMetadatas)
}.asInstanceOf[Set[TopicAndPartition]]

// 獲取指定topic-partition最先的offset
val offsetBegin = getEarliestLeaderOffsets(topicAndPartitions).right.get
// 獲取指定topic-partition最晚的offset
val offsetEnd = getLatestLeaderOffsets(topicAndPartitions).right.get

print("=============Offset範圍信息===========")
topicAndPartitions.foreach { tp =>
   println(s"topic: ${tp.topic}, Partition: ${tp.partition} 的Offset範圍:")
   println(s"\t${offsetBegin(tp).offset} ~ ${offsetEnd(tp).offset}")
}

三、運行結果

  鏈接到kafka_protocol_test,運行結果以下

topic: kafka_protocol_test, Partition: 0Offset範圍:
    0 ~ 9000
topic: kafka_protocol_test, Partition: 1Offset範圍:
    0 ~ 598134
topic: kafka_protocol_test, Partition: 2Offset範圍:
    0 ~ 0
topic: kafka_protocol_test, Partition: 3Offset範圍:
    0 ~ 91000

  和第零節中圖片顯示結果一致。

5、Offset Commit/Fetch API

  首先參考Offset Management文檔中的描述,分析一下Kafka中有關Offset管理的文檔。
  在這篇文檔中主要提供了OffsetFetchOffsetCommit兩個API,其中
  

一、OffsetFetch API

  這個API能夠獲取一個Consumer讀取messageoffset信息。發送的請求是OffsetFetchRequest類型的對象,接收到的是OffsetFetchResponse類型的響應。具體offset信息能夠從OffsetFetchResponse對象中解析。
  發送的Request請求爲,須要指定consumer所屬的group,以及須要獲取offset的全部TopicAndPartitions

val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, 0)

  或獲得的響應爲OffsetFetchResponse類型的對象。
val resp = consumer.fetchOffsets(req)
  其中consumer對象是SimpleConsumer類型的
new SimpleConsumer(host, port, config.socketTimeoutMs,
      config.socketReceiveBufferBytes, config.clientId)

  具體獲取offset的邏輯以下,
withBrokers(Random.shuffle(config.seedBrokers)) { consumer =>
  // 鏈接consumer,發送該OffsetFetchRequest請求
  val resp = consumer.fetchOffsets(req)
  val respMap = resp.requestInfo
  // 從傳入的topicAndPartitions中取出不包含在result中的topicAndPartition
  val needed = topicAndPartitions.diff(result.keySet)
  // 遍歷每個須要獲取offset的topic-partition
  needed.foreach { tp: TopicAndPartition =>
    respMap.get(tp).foreach { ome: OffsetMetadataAndError =>
      // 若是沒有錯誤
      if (ome.error == ErrorMapping.NoError) {
        result += tp -> ome
      } else {
        errs.append(ErrorMapping.exceptionFor(ome.error))
      }
    }
  }
  if (result.keys.size == topicAndPartitions.size) {
    return Right(result)
  }
}

二、OffsetCommit API

  當最終調用commit()方法,或者若是啓用了autocommit參數時,這個API可使consumer保存其消費的offset信息。
  發送的Request請求爲OffsetCommitRequest類型。

  OffsetCommitRequest須要傳入的參數以下,

val offsetEnd = getLatestLeaderOffsets(topicAndPartitions).right.get
val resetOffsets = offsetsFetch.right.get.map { offsetInfo =>
val plus10Offset = offsetInfo._2.offset + 10
   offsetInfo._1 -> OffsetAndMetadata(if (offsetEnd(offsetInfo._1).offset >= plus10Offset) plus10Offset else offsetEnd(offsetInfo._1).offset)
    } // resetOffsets類型爲Map[TopicAndPartition, OffsetAndMetadata]
val req = OffsetCommitRequest(groupId, resetOffsets, 0) // 發送該請求的方式以下
val resp = consumer.commitOffsets(req)

三、GroupCoordinator API

  須要注意的是這個API在Kafka-0.9之後的版本中才提供。指定Consumer Groupoffsets數據保存在某個特定的Broker中。
  向Kafka集羣發送一個GroupCoordinatorRequest類型的請求參數,該request對象中只須要指定一個groupId便可。以下所示,

val req = new GroupCoordinatorRequest(groupId)
val resp = consumer.send(req)

  獲取到的Response對象是GroupCoordinatorResponse類型的,在resp.coordinatorOpt中返回一個BrokerEndpoint對象,能夠獲取該Broker對應的Id, Ip, Port等信息。

四、示例

(1) 運行OffsetFetch API
(a) 獲取kafka_protocol_test的consumer group消費狀態
  啓動一個console-consumerkafka_protocol_test topic消費messages。須要指定一個特定的group.id參數,以下所示,使用默認的consumer.properties配置文件便可。

bin/kafka-console-consumer.sh --zookeeper kafka001:2181 --topic kafka_protocol_test --from-beginning --consumer.config ./config/consumer.properties

  運行後,將其中止,查看當前console-consumer的消費狀態

[hadoop@kafka001 kafka]$  bin/kafka-consumer-offset-checker.sh --zookeeper kafka001:2181 --topic kafka_protocol_test --group test-consumer-group
Group           Topic                          Pid Offset          logSize         Lag             Owner
test-consumer-group kafka_protocol_test            0   9000            9000            0               none
test-consumer-group kafka_protocol_test            1   26886           598134          571248          none
test-consumer-group kafka_protocol_test            2   0               0               0               none
test-consumer-group kafka_protocol_test            3   18296           91000           72704           none

(b) 運行OffsetFetch代碼,查看運行結果

  運行時仍然傳入test-consumer-group,運行結果以下

Topic: kafka_protocol_test, Partition: 0
    Offset: 9000
Topic: kafka_protocol_test, Partition: 1
    Offset: 26886
Topic: kafka_protocol_test, Partition: 2
    Offset: 0
Topic: kafka_protocol_test, Partition: 3
    Offset: 18296

  對比後發現,兩個offset信息保持一致。

(2)運行OffsetCommit API
  在這裏,將OffsetFetch獲取到的每一個TopicAndPartition對應的Offset10,若是加10後超過其最大Offset,則取最大Offset
  在Commit先後,兩次調用OffsetFetch API的代碼,先後運行結果以下,
更新前的offset

Topic: kafka_protocol_test, Partition: 0
    Offset: 9000
Topic: kafka_protocol_test, Partition: 1
    Offset: 26886
Topic: kafka_protocol_test, Partition: 2
    Offset: 0
Topic: kafka_protocol_test, Partition: 3
    Offset: 18296
更新後的offset:(partition 0和partition 2沒有變化是因爲加10後超過了該partition的offset範圍最大值)
Topic: kafka_protocol_test, Partition: 0
    Offset: 9000
Topic: kafka_protocol_test, Partition: 1
    Offset: 26896
Topic: kafka_protocol_test, Partition: 2
    Offset: 0
Topic: kafka_protocol_test, Partition: 3
    Offset: 18306

(3)運行Group Coordinator API
  傳入一個consumer group後,查看其運行結果

Comsuner Group : test-consumer-group, coordinator broker is:
    id: 1, host: kafka001, port: 9092

6、Group Membership API

  這個API從Kafka-0.9.0.0版本開始出現。
  在0.9之前的client api中,consumer是要依賴Zookeeper的。由於同一個consumer group中的全部consumer須要進行協同,進行下面所講的rebalance。可是由於zookeeper的「herd」與「split brain」,致使一個group裏面,不一樣的consumer擁有了同一個partition,進而會引發消息的消費錯亂。爲此,在0.9中,再也不用zookeeper,而是Kafka集羣自己來進行consumer之間的同步。下面引自kafka設計的原文:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Failuredetectionprotocol

  相關知識點能夠參考Kafka源碼深度解析-序列7 -Consumer -coordinator協議與heartbeat實現原理

7、Administrative API

  注意,這個API也是從Kafka-0.9以後的client版本中才提供。經過這個API能夠對Kafka集羣進行一些管理方面的操做,好比獲取全部的Consumer Groups信息。想要獲取集羣中全部Consumer Groups信息,須要發送一個ListGroupRequest請求到全部的Brokers節點。
  還能夠經過發送一個DescribeGroupsRequest類型的請求對象,獲取對特定Consumer Group的描述。

  在Kafka-0.9以後的client中,提供了一個kafka.admin.AdminClient類,調用createSimplePlaintext方法,傳入一個broker list字val client = AdminClient.createSimplePlaintext(「kafka001:9092,kafka002:9092,kafka003:9092,kafka004:9092」)AdminClient`提供了不少方法,好比

def findCoordinator(groupId: String): Node
def findAllBrokers(): List[Node]
def listAllGroups(): Map[Node, List[GroupOverview]]
def listAllConsumerGroups(): Map[Node, List[GroupOverview]]

  等等。

相關文章
相關標籤/搜索