1.topic註冊信息node
/brokers/topics/[topic] :算法
存儲某個topic的partitions全部分配信息併發
Schema:
{
Example:
{ 說明:紫紅色爲patitions編號,藍色爲同步副本組brokerId列表ui |
/brokers/topics/[topic]/partitions/[0...N] 其中[0..N]表示partition索引號
spa
/brokers/topics/[topic]/partitions/[partitionId]/state線程
Schema:
{
Example:
{
|
/brokers/ids/[0...N]
每一個broker的配置文件中都須要指定一個數字類型的id(全局不可重複),此節點爲臨時znode(EPHEMERAL)
Schema:
{
Example:
{ "timestamp":"1403061899859" |
/controller_epoch -> int (epoch)
此值爲一個數字,kafka集羣中第一個broker第一次啓動時爲1,之後只要集羣中center controller中央控制器所在broker變動或掛掉,就會從新選舉新的center controller,每次center controller變動controller_epoch值就會 + 1;
/controller -> int (broker id of the controller) 存儲center controller中央控制器所在kafka broker的信息
Schema:
{
Example:
{ "version": 1, "brokerid": 3, "timestamp": "1403061802981" }
|
a.每一個consumer客戶端被建立時,會向zookeeper註冊本身的信息;
b.此做用主要是爲了"負載均衡".
c.同一個Consumer Group中的Consumers,Kafka將相應Topic中的每一個消息只發送給其中一個Consumer。
d.Consumer Group中的每一個Consumer讀取Topic的一個或多個Partitions,而且是惟一的Consumer;
e.一個Consumer group的多個consumer的全部線程依次有序地消費一個topic的全部partitions,若是Consumer group中全部consumer總線程大於partitions數量,則會出現空閒狀況;舉例說明:kafka集羣中建立一個topic爲report-log 4 partitions 索引編號爲0,1,2,3假若有目前有三個消費者node:注意-->一個consumer中一個消費線程能夠消費一個或多個partition.若是每一個consumer建立一個consumer thread線程,各個node消費狀況以下,node1消費索引編號爲0,1分區,node2費索引編號爲2,node3費索引編號爲3總結 :若是每一個consumer建立2個consumer thread線程,各個node消費狀況以下(是從consumer node前後啓動狀態來肯定的),node1消費索引編號爲0,1分區;node2費索引編號爲2,3;node3爲空閒狀態
從以上可知,Consumer Group中各個consumer是根據前後啓動的順序有序消費一個topic的全部partitions的。若是Consumer Group中全部consumer的總線程數大於partitions數量,則可能consumer thread或consumer會出現空閒狀態。
Consumer均衡算法
當一個group中,有consumer加入或者離開時,會觸發partitions均衡.均衡的最終目的,是提高topic的併發消費能力.
1) 假如topic1,具備以下partitions: P0,P1,P2,P3
2) 加入group中,有以下consumer: C0,C1
3) 首先根據partition索引號對partitions排序: P0,P1,P2,P3
4) 根據(consumer.id + '-'+ thread序號)排序: C0,C1
5) 計算倍數: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
6) 而後依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]
每一個consumer都有一個惟一的ID(consumerId能夠經過配置文件指定,也能夠由系統生成),此id用來標記消費者信息.
/consumers/[groupId]/ids/[consumerIdString]
是一個臨時的znode,此節點的值爲請看consumerIdString產生規則,即表示此consumer目前所消費的topic + partitions列表.
consumerId產生規則:
StringconsumerUuid = null;
if(config.consumerId!=null && config.consumerId)
consumerUuid = consumerId;
else {
String uuid = UUID.randomUUID()
consumerUuid = "%s-%d-%s".format(
InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
uuid.getMostSignificantBits().toHexString.substring(0,8));
}
String consumerIdString = config.groupId + "_" + consumerUuid;
Schema:
{
Example:
{ "version": 1, "subscription": { "open_platform_opt_push_plus1": 5 }, "pattern": "static", "timestamp": "1411294187842" } |
/consumers/[groupId]/owners/[topic]/[partitionId] -> consumerIdString + threadId索引編號
當consumer啓動時,所觸發的操做:
a) 首先進行"Consumer Id註冊";
b) 而後在"Consumer id 註冊"節點下注冊一個watch用來監聽當前group中其餘consumer的"退出"和"加入";只要此znode path下節點列表變動,都會觸發此group下consumer的負載均衡.(好比一個consumer失效,那麼其餘consumer接管partitions).
c) 在"Broker id 註冊"節點下,註冊一個watch用來監聽broker的存活狀況;若是broker列表變動,將會觸發全部的groups下的consumer從新balance.
/consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset)
用來跟蹤每一個consumer目前所消費的partition中最大的offset
此znode爲持久節點,能夠看出offset跟group_id有關,以代表當消費者組(consumer group)中一個消費者失效,
從新觸發balance,其餘consumer能夠繼續消費.
9. Re-assign partitions
/admin/reassign_partitions
{
"fields" :[
{
"name" : "version" ,
"type" : "int" ,
"doc" : "version id"
},
{
"name" : "partitions" ,
"type" :{
"type" : "array" ,
"items" :{
"fields" :[
{
"name" : "topic" ,
"type" : "string" ,
"doc" : "topic of the partition to be reassigned"
},
{
"name" : "partition" ,
"type" : "int" ,
"doc" : "the partition to be reassigned"
},
{
"name" : "replicas" ,
"type" : "array" ,
"items" : "int" ,
"doc" : "a list of replica ids"
}
],
}
"doc" : "an array of partitions to be reassigned to new replicas"
}
}
]
}
Example:
{
"version" : 1 ,
"partitions" :
[
{
"topic" : "Foo" ,
"partition" : 1 ,
"replicas" : [ 0 , 1 , 3 ]
}
]
}
|
10. Preferred replication election
/admin/preferred_replica_election
{
"fields" :[
{
"name" : "version" ,
"type" : "int" ,
"doc" : "version id"
},
{
"name" : "partitions" ,
"type" :{
"type" : "array" ,
"items" :{
"fields" :[
{
"name" : "topic" ,
"type" : "string" ,
"doc" : "topic of the partition for which preferred replica election should be triggered"
},
{
"name" : "partition" ,
"type" : "int" ,
"doc" : "the partition for which preferred replica election should be triggered"
}
],
}
"doc" : "an array of partitions for which preferred replica election should be triggered"
}
}
]
}
例子:
{
"version" : 1 ,
"partitions" :
[
{
"topic" : "Foo" ,
"partition" : 1
},
{
"topic" : "Bar" ,
"partition" : 0
}
]
}
|
11. 刪除topics
/admin/delete_topics
Schema:
{
"fields" :
[ { "name" : "version" , "type" : "int" , "doc" : "version id" },
{ "name" : "topics" ,
"type" : { "type" : "array" , "items" : "string" , "doc" : "an array of topics to be deleted" }
} ]
}
例子:
{
"version" : 1 ,
"topics" : [ "foo" , "bar" ]
}
|
Topic配置
/config/topics/[topic_name]
例子
{
"version" : 1 ,
"config" : {
"config.a" : "x" ,
"config.b" : "y" ,
...
}
}
|