本文基於A Guide To The Kafka Protocol文檔,以及Spark Streaming中實現的org.apache.spark.streaming.kafka.KafkaCluster
類。整理出Kafka中有關php
須要運行如下部分的示例代碼時,須要提早建好須要的topic
,寫入一些message
,再用consumer
消費一下。python
[hadoop@kafka001 kafka]$ bin/kafka-topics.sh --zookeeper kafka001:2181 --create --topic kafka_protocol_test --replication-factor 3 --partitions 4
Created topic "kafka_protocol_test".
[hadoop@kafka001 kafka]$ bin/kafka-topics.sh --zookeeper kafka001:2181 --describe --topic kafka_protocol_test
Topic:kafka_protocol_test PartitionCount:4 ReplicationFactor:3 Configs:
Topic: kafka_protocol_test Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: kafka_protocol_test Partition: 1 Leader: 2 Replicas: 2,3,4 Isr: 2,3,4
Topic: kafka_protocol_test Partition: 2 Leader: 3 Replicas: 3,4,1 Isr: 3,4,1
Topic: kafka_protocol_test Partition: 3 Leader: 4 Replicas: 4,1,2 Isr: 4,1,2
使用Kafka系列之-自定義Producer中提到的KafkaProducerTool往指定kafka_protocol_test中發送消息,apache
public class ProducerTest2 {
public static void main(String[] args) throws InterruptedException {
KafkaProducerTool kafkaProducerTool = new KafkaProducerToolImpl("D:\\Files\\test\\kafkaconfig.properties");
int i = 1;
while(true) {
kafkaProducerTool.publishMessage("message" + (i++));
}
}
}
運行一段時間後中止寫入。運行一個console-consumer
從kafka_protocol_test
消費message
。api
運行一個console-consumer
從kafka_protocol_test
消費。注意觀察該topic
每一個partition
中的messages
數。markdown
[hadoop@kafka001 kafka]$ bin/kafka-console-consumer.sh --zookeeper kafka001:2181 --topic kafka_protocol_test --from-beginning
這個API是經過向Kafka集羣發送一個TopicMetadaaRequest
請求,獲得MetadataResponse
響應後從MetadataResponse
中解析出Metadata相關信息。
TopicMetadataRequest
的結構和示例以下
app
case class TopicMetadataRequest(val versionId: Short,
val correlationId: Int,
val clientId: String,
val topics: Seq[String])
TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics)
獲得的MetadataResponse
包含的信息以下,能夠從PartitionMetadata
中獲取到Partition
相關信息,從TopicMetadata
中獲取到Topic
相關信息,Broker
中記錄了Broker
的ip
和端口號等。dom
MetadataResponse => [Broker][TopicMetadata]
Broker => NodeId Host Port (any number of brokers may be returned)
NodeId => int32
Host => string
Port => int32
TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
TopicErrorCode => int16
PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
PartitionErrorCode => int16
PartitionId => int32
Leader => int32
Replicas => [int32]
Isr => [int32]
能夠查詢指定Topic是否存在,
指定topic有多少個partition,
每一個partition當前哪一個broker處於leader狀態,
每一個broker的host和port是什麼socket
若是設置了auto.create.topics.enable
參數,遇到不存在的topic
時,就會按默認replication
和partition
新建該不存在的topic
。
ide
生成一個TopicMetadataRequest
對象oop
// 封裝一個TopicMetadataRequest類型的請求對象
val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics)
// 發送該請求
val resp: TopicMetadataResponse = consumer.send(req)
// 其中consumer對象是SimpleConsumer類型的
new SimpleConsumer(host, port, config.socketTimeoutMs,
config.socketReceiveBufferBytes, config.clientId)
(1)查詢topic是否存在
因爲在TopicMetadataRequest
中能夠發送一組Seq[String]
類型的topics
,因此獲取到的TopicMetadataResponse.topicsMetadata
是Set[TopicMetadata]
類型的。
對每一個TopicMetadata
對象,若是其errorCode
不爲ErrorMapping.NoError
即表示該Topic
不正常。
topicMetadatas.foreach { topic =>
if (topic.errorCode == ErrorMapping.NoError)
println(s"topic: ${topic.topic}存在")
else
println(s"topic: ${topic.topic}不存在")
}
(2)獲取Topic的Partition個數
首先將全部TopicMetadata
中正常的Topic
過濾出來,而後遍歷每個TopicMetadata
對象,獲取其partitionsMetadata
信息,其長度即Partition
的個數
val existsTopicMetadatas = topicMetadatas.filter(tm => tm.errorCode == ErrorMapping.NoError)
existsTopicMetadatas.foreach { topic =>
val numPartitions = topic.partitionsMetadata.length
println(s"topic: ${topic.topic} 有${numPartitions}個partition")
}
(3)獲取Partition具體狀況
如下代碼能夠獲取到Topic
的每一個Partition
中的Leader Partition
以及replication
節點的信息。
existsTopicMetadatas.foreach { topic =>
println(s"topic:${topic.topic}的Partition信息:")
topic.partitionsMetadata.foreach { pm =>
val leaderPartition = pm.leader
println(s"\tpartition: ${pm.partitionId}")
println(s"\tleader節點:$leaderPartition")
val replicas = pm.replicas
println(s"\treplicas節點:$replicas")
}
}
傳入上面新建的kafka_protocol_test
以及一個不存在的topic kafka_protocol_test1
,以上代碼的運行結果以下:
=============Topic相關信息===========
topic: kafka_protocol_test存在
topic: kafka_protocol_test1不存在
topic: kafka_protocol_test 有4個partition
=============Partition相關信息===========
topic:kafka_protocol_test的Partition信息:
partition: 0
leader節點:Some(id:1,host:kafka001,port:9092)
replicas節點:Vector(id:1,host:kafka001,port:9092, id:2,host:kafka002,port:9092, id:3,host:kafka003,port:9092)
partition: 1
leader節點:Some(id:2,host:kafka002,port:9092)
replicas節點:Vector(id:2,host:kafka002,port:9092, id:3,host:kafka003,port:9092, id:4,host:kafka004,port:9092)
partition: 2
leader節點:Some(id:3,host:kafka003,port:9092)
replicas節點:Vector(id:3,host:kafka003,port:9092, id:4,host:kafka004,port:9092, id:1,host:kafka001,port:9092)
partition: 3
leader節點:Some(id:4,host:kafka004,port:9092)
replicas節點:Vector(id:4,host:kafka004,port:9092, id:1,host:kafka001,port:9092, id:2,host:kafka002,port:9092)
這個API經過向Kafka集羣發送一個OffsetRequest
對象,從返回的OffsetResponse
對象中獲取Offset
相關信息。
OffsetRequest
對象描述以下
OffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]] ReplicaId => int32 TopicName => string Partition => int32 Time => int64 MaxNumberOfOffsets => int32
上面Time
的做用是,獲取特定時間(單位爲ms)以前的全部messages
。若是設置爲-1
則獲取最新的offset
,即下一條messages
的offset
位置;若是設置爲-2
則獲取第一條message
的offset
位置,即當前partition
中的offset
起始位置。
OffsetResponse
對象描述以下
OffsetResponse => [TopicName [PartitionOffsets]] PartitionOffsets => Partition ErrorCode [Offset] Partition => int32 ErrorCode => int16 Offset => int64
經過該API能夠獲取指定topic-partition
集合的合法offset
的範圍,須要直接鏈接到Partition
的Leader
節點。
獲取指定topic
下全部partition
的offset
範圍
封裝一個getLeaderOffsets
方法,在此方法的基礎上分別封裝一個getEarliestLeaderOffsets
方法用於獲取最小offset
,getLatestLeaderOffsets
用於獲取最大offset
。
分別傳入的關鍵參數是前面提到的Time
,
def getLatestLeaderOffsets(
topicAndPartitions: Set[TopicAndPartition]
): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime) // -1L
def getEarliestLeaderOffsets(
topicAndPartitions: Set[TopicAndPartition]
): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime) // -2L
在getLeaderOffsets中,查詢到當前partition的leader節點,
def findLeaders(topicAndPartitions: Set[TopicAndPartition]): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
// 獲取當前topicAndPartitions中的全部topic
val topics = topicAndPartitions.map(_.topic)
// 獲取topic對應的MetadataResp對象,以前已過濾不存在的topic,因此這裏無需進一步過濾
val topicMetadatas = getMetadataResp(topics.toSeq).left.get
val leaderMap = topicMetadatas.flatMap { topic =>
topic.partitionsMetadata.flatMap { pm =>
val tp = TopicAndPartition(topic.topic, pm.partitionId)
// 獲取對應PartitionMedatada的leader節點信息
pm.leader.map { l =>
tp -> (l.host -> l.port)
}
}
}.toMap
Right(leaderMap)
}
而後在這些節點中,封裝一個OffsetRequest
對象,向Kafka集羣得到OffsetResponse
對象。
val resp = consumer.getOffsetsBefore(req)
val respMap = resp.partitionErrorAndOffsets
最後從OffsetResponse
對象中獲取offset
範圍,
val resp = getMetadataResp(topics.toSeq)
// 若是獲取的resp是left,則處理返回的Set[TopicMetadata]
val topicAndPartitions = processRespInfo(resp) { resp =>
val topicMetadatas = resp.left.get.asInstanceOf[Set[TopicMetadata]]
val existsTopicMetadatas = topicMetadatas.filter(tm => tm.errorCode == ErrorMapping.NoError)
getPartitions(existsTopicMetadatas)
}.asInstanceOf[Set[TopicAndPartition]]
// 獲取指定topic-partition最先的offset
val offsetBegin = getEarliestLeaderOffsets(topicAndPartitions).right.get
// 獲取指定topic-partition最晚的offset
val offsetEnd = getLatestLeaderOffsets(topicAndPartitions).right.get
print("=============Offset範圍信息===========")
topicAndPartitions.foreach { tp =>
println(s"topic: ${tp.topic}, Partition: ${tp.partition} 的Offset範圍:")
println(s"\t${offsetBegin(tp).offset} ~ ${offsetEnd(tp).offset}")
}
鏈接到kafka_protocol_test
,運行結果以下
topic: kafka_protocol_test, Partition: 0 的Offset範圍:
0 ~ 9000
topic: kafka_protocol_test, Partition: 1 的Offset範圍:
0 ~ 598134
topic: kafka_protocol_test, Partition: 2 的Offset範圍:
0 ~ 0
topic: kafka_protocol_test, Partition: 3 的Offset範圍:
0 ~ 91000
和第零節中圖片顯示結果一致。
首先參考Offset Management文檔中的描述,分析一下Kafka中有關Offset管理的文檔。
在這篇文檔中主要提供了OffsetFetch
和OffsetCommit
兩個API,其中
這個API能夠獲取一個Consumer
讀取message
的offset
信息。發送的請求是OffsetFetchRequest
類型的對象,接收到的是OffsetFetchResponse
類型的響應。具體offset
信息能夠從OffsetFetchResponse
對象中解析。
發送的Request
請求爲,須要指定consumer
所屬的group
,以及須要獲取offset
的全部TopicAndPartitions
。
val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, 0)
或獲得的響應爲OffsetFetchResponse類型的對象。
val resp = consumer.fetchOffsets(req)
其中consumer對象是SimpleConsumer類型的
new SimpleConsumer(host, port, config.socketTimeoutMs,
config.socketReceiveBufferBytes, config.clientId)
具體獲取offset的邏輯以下,
withBrokers(Random.shuffle(config.seedBrokers)) { consumer =>
// 鏈接consumer,發送該OffsetFetchRequest請求
val resp = consumer.fetchOffsets(req)
val respMap = resp.requestInfo
// 從傳入的topicAndPartitions中取出不包含在result中的topicAndPartition
val needed = topicAndPartitions.diff(result.keySet)
// 遍歷每個須要獲取offset的topic-partition
needed.foreach { tp: TopicAndPartition =>
respMap.get(tp).foreach { ome: OffsetMetadataAndError =>
// 若是沒有錯誤
if (ome.error == ErrorMapping.NoError) {
result += tp -> ome
} else {
errs.append(ErrorMapping.exceptionFor(ome.error))
}
}
}
if (result.keys.size == topicAndPartitions.size) {
return Right(result)
}
}
當最終調用commit()
方法,或者若是啓用了autocommit
參數時,這個API可使consumer
保存其消費的offset
信息。
發送的Request
請求爲OffsetCommitRequest
類型。
OffsetCommitRequest
須要傳入的參數以下,
val offsetEnd = getLatestLeaderOffsets(topicAndPartitions).right.get
val resetOffsets = offsetsFetch.right.get.map { offsetInfo =>
val plus10Offset = offsetInfo._2.offset + 10
offsetInfo._1 -> OffsetAndMetadata(if (offsetEnd(offsetInfo._1).offset >= plus10Offset) plus10Offset else offsetEnd(offsetInfo._1).offset)
} // resetOffsets類型爲Map[TopicAndPartition, OffsetAndMetadata]
val req = OffsetCommitRequest(groupId, resetOffsets, 0) // 發送該請求的方式以下
val resp = consumer.commitOffsets(req)
須要注意的是這個API在Kafka-0.9之後的版本中才提供。指定Consumer Group
的offsets
數據保存在某個特定的Broker
中。
向Kafka
集羣發送一個GroupCoordinatorRequest
類型的請求參數,該request
對象中只須要指定一個groupId
便可。以下所示,
val req = new GroupCoordinatorRequest(groupId)
val resp = consumer.send(req)
獲取到的Response
對象是GroupCoordinatorResponse
類型的,在resp.coordinatorOpt
中返回一個BrokerEndpoint
對象,能夠獲取該Broker
對應的Id, Ip, Port
等信息。
(1) 運行OffsetFetch API
(a) 獲取kafka_protocol_test的consumer group消費狀態
啓動一個console-consumer
從kafka_protocol_test topic
消費messages
。須要指定一個特定的group.id
參數,以下所示,使用默認的consumer.properties
配置文件便可。
bin/kafka-console-consumer.sh --zookeeper kafka001:2181 --topic kafka_protocol_test --from-beginning --consumer.config ./config/consumer.properties
運行後,將其中止,查看當前console-consumer
的消費狀態
[hadoop@kafka001 kafka]$ bin/kafka-consumer-offset-checker.sh --zookeeper kafka001:2181 --topic kafka_protocol_test --group test-consumer-group
Group Topic Pid Offset logSize Lag Owner
test-consumer-group kafka_protocol_test 0 9000 9000 0 none
test-consumer-group kafka_protocol_test 1 26886 598134 571248 none
test-consumer-group kafka_protocol_test 2 0 0 0 none
test-consumer-group kafka_protocol_test 3 18296 91000 72704 none
(b) 運行OffsetFetch代碼,查看運行結果
運行時仍然傳入test-consumer-group
,運行結果以下
Topic: kafka_protocol_test, Partition: 0
Offset: 9000
Topic: kafka_protocol_test, Partition: 1
Offset: 26886
Topic: kafka_protocol_test, Partition: 2
Offset: 0
Topic: kafka_protocol_test, Partition: 3
Offset: 18296
對比後發現,兩個offset
信息保持一致。
(2)運行OffsetCommit API
在這裏,將OffsetFetch
獲取到的每一個TopicAndPartition
對應的Offset
加10
,若是加10
後超過其最大Offset
,則取最大Offset
。
在Commit
先後,兩次調用OffsetFetch API的代碼,先後運行結果以下,
更新前的offset
:
Topic: kafka_protocol_test, Partition: 0
Offset: 9000
Topic: kafka_protocol_test, Partition: 1
Offset: 26886
Topic: kafka_protocol_test, Partition: 2
Offset: 0
Topic: kafka_protocol_test, Partition: 3
Offset: 18296
更新後的offset:(partition 0和partition 2沒有變化是因爲加10後超過了該partition的offset範圍最大值)
Topic: kafka_protocol_test, Partition: 0
Offset: 9000
Topic: kafka_protocol_test, Partition: 1
Offset: 26896
Topic: kafka_protocol_test, Partition: 2
Offset: 0
Topic: kafka_protocol_test, Partition: 3
Offset: 18306
(3)運行Group Coordinator API
傳入一個consumer group
後,查看其運行結果
Comsuner Group : test-consumer-group, coordinator broker is:
id: 1, host: kafka001, port: 9092
這個API從Kafka-0.9.0.0版本開始出現。
在0.9之前的client api中,consumer是要依賴Zookeeper的。由於同一個consumer group中的全部consumer須要進行協同,進行下面所講的rebalance。可是由於zookeeper的「herd」與「split brain」,致使一個group裏面,不一樣的consumer擁有了同一個partition,進而會引發消息的消費錯亂。爲此,在0.9中,再也不用zookeeper,而是Kafka集羣自己來進行consumer之間的同步。下面引自kafka設計的原文:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Failuredetectionprotocol
相關知識點能夠參考Kafka源碼深度解析-序列7 -Consumer -coordinator協議與heartbeat實現原理。
注意,這個API也是從Kafka-0.9以後的client版本中才提供。經過這個API能夠對Kafka集羣進行一些管理方面的操做,好比獲取全部的Consumer Groups
信息。想要獲取集羣中全部Consumer Groups
信息,須要發送一個ListGroupRequest
請求到全部的Brokers
節點。
還能夠經過發送一個DescribeGroupsRequest
類型的請求對象,獲取對特定Consumer Group
的描述。
在Kafka-0.9以後的client
中,提供了一個kafka.admin.AdminClient
類,調用createSimplePlaintext
方法,傳入一個broker list字val client = AdminClient.createSimplePlaintext(「kafka001:9092,kafka002:9092,kafka003:9092,kafka004:9092」)AdminClient`提供了不少方法,好比
def findCoordinator(groupId: String): Node
def findAllBrokers(): List[Node]
def listAllGroups(): Map[Node, List[GroupOverview]]
def listAllConsumerGroups(): Map[Node, List[GroupOverview]]
等等。