kafka在zookeeper中的存儲結構

參考site:http://kafka.apache.org/documentation.html#impl_zookeeperhtml

一、zookeeper客戶端相關命令java

在確保zookeeper服務啓動狀態下,經過 bin/zkCli.sh -server 127.0.0.1:2181 該命令來鏈接客戶端node

 

簡單操做以下:apache

  1. 顯示根目錄下、文件: ls /  使用 ls 命令來查看當前 ZooKeeper 中所包含的內容負載均衡

  2. 顯示根目錄下、文件: ls2 / 查看當前節點數據並能看到更新次數等數據dom

  3. 建立文件,並設置初始內容: create /zk "test" 建立一個新的 znode節點「 zk 」以及與它關聯的字符串ui

  4. 獲取文件內容: get /zk 確認 znode 是否包含咱們所建立的字符串spa

  5. 修改文件內容: set /zk "zkbak" 對 zk 所關聯的字符串進行設置線程

  6. 刪除文件: delete /zk 將剛纔建立的 znode 刪除3d

  7. 退出客戶端: quit

  8. 幫助命令: help 

 

二、topic註冊信息

/brokers/topics/[topic] :

 

存儲某個topic的partitions全部分配信息

Schema:
{
    "version": "版本編號目前固定爲數字1",
    "partitions": {
        "partitionId編號": [
            同步副本組brokerId列表
        ],
        "partitionId編號": [
            同步副本組brokerId列表
        ],
        .......
    }
}
Example:
{
    "version": 1,
    "partitions": {
	"0":[0,1,2]
     }
}

以下圖:

 

 

3.partition狀態信息

 

/brokers/topics/[topic]/partitions/[0...N]  其中[0..N]表示partition索引號

 

/brokers/topics/[topic]/partitions/[partitionId]/state

 

Schema:
{
    "controller_epoch": 表示kafka集羣中的中央控制器選舉次數,
    "leader": 表示該partition選舉leader的brokerId,
    "version": 版本編號默認爲1,
    "leader_epoch": 該partition leader選舉次數,
    "isr": [同步副本組brokerId列表]
}
 
Example:
{
    "controller_epoch":20,
    "leader":0,
    "version":1,
    "leader_epoch":0,
    "isr":[0,1,2]
}

 

如圖:

 

 

4. broker註冊信息

 

/brokers/ids/[0...N]                

 

每一個broker的配置文件中都須要指定一個數字類型的id(全局不可重複),此節點爲臨時znode(EPHEMERAL)

 

Schema:
{
    "jmx_port": jmx端口號,
    "timestamp": kafka broker初始啓動時的時間戳,
    "host": 主機名或ip地址,
    "version": 版本編號默認爲1,
    "port": kafka broker的服務端端口號,由server.properties中參數port肯定
}
 
Example:
{
    "jmx_port":1,
    "timestamp":"1452068227537",
    "host":"h1",
    "version":1,
    "port":9092
}

 

如圖:

 

 

 

5. controller epoch

 

/controller_epoch -> int (epoch)   

 

此值爲一個數字,kafka集羣中第一個broker第一次啓動時爲1,之後只要集羣中center controller中央控制器所在broker變動或掛掉,就會從新選舉新的center controller,每次center controller變動controller_epoch值就會 + 1; 

如圖:

 

 

6. controller註冊信息

/controller -> int (broker id of the controller)  存儲center controller中央控制器所在kafka broker的信息

Schema:
{
    "version": 版本編號默認爲1,
    "brokerid": kafka集羣中broker惟一編號,
    "timestamp": kafka broker中央控制器變動時的時間戳
}
 
Example:
{
    "version":1,
    "brokerid":0,
    "timestamp":"1452068227409"
}

如圖:

 

7. consumer註冊信息

 

每一個consumer都有一個惟一的ID(consumerId能夠經過配置文件指定,也能夠由系統生成),此id用來標記消費者信息.

 

/consumers/[groupId]/ids/[consumerIdString]

 

是一個臨時的znode,此節點的值爲請看consumerIdString產生規則,即表示此consumer目前所消費的topic + partitions列表. 

 

consumerId產生規則: 

   StringconsumerUuid = null; 
       if(config.consumerId!=null && config.consumerId){ 
           consumerUuid = consumerId; 
       }else { 
           String uuid = UUID.randomUUID() 
           consumerUuid = "%s-%d-%s".format( 
                InetAddress.getLocalHost.getHostName, System.currentTimeMillis, 
                uuid.getMostSignificantBits().toHexString.substring(0,8)); 

       } 
  String consumerIdString = config.groupId + "_" + consumerUuid;  

Schema:
{
    "version": 版本編號默認爲1,
    "subscription": { //訂閱topic列表
        "topic名稱": consumer中topic消費者線程數
    },
    "pattern": "static",
    "timestamp": "consumer啓動時的時間戳"
}
 
Example:
{
    "version":1,
    "subscription":{
        "replicatedtopic":1
    },
    "pattern":"white_list",
    "timestamp":"1452134230082"
}

 

如圖:

 

 

 

8. consumer owner

/consumers/[groupId]/owners/[topic]/[partitionId] -> consumerIdString + threadId索引編號

當consumer啓動時,所觸發的操做:

a) 首先進行"Consumer Id註冊";

b) 而後在"Consumer id 註冊"節點下注冊一個watch用來監聽當前group中其餘consumer的"退出"和"加入";只要此znode path下節點列表變動,

    都會觸發此group下consumer的負載均衡.(好比一個consumer失效,那麼其餘consumer接管partitions).

c) 在"Broker id 註冊"節點下,註冊一個watch用來監聽broker的存活狀況;若是broker列表變動,將會觸發全部的groups下的consumer從新balance.

 

 

9. consumer offset

 

/consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset)

 

用來跟蹤每一個consumer目前所消費的partition中最大的offset

 

此znode爲持久節點,能夠看出offset跟group_id有關,以代表當消費者組(consumer group)中一個消費者失效,

 

從新觸發balance,其餘consumer能夠繼續消費. 

 

 

10. topic 配置

 

/config/topics/[topic_name]

相關文章
相關標籤/搜索