Kafka集羣擴展以及從新分佈分區

咱們往已經部署好的Kafka集羣裏面添加機器是最正常不過的需求,並且添加起來很是地方便,咱們須要作的事是從已經部署好的Kafka節點中複製相應的配置文件,而後把裏面的broker id修改爲全局惟一的,最後啓動這個節點便可將它加入到現有Kafka集羣中。json

  可是問題來了,新添加的Kafka節點並不會自動地分配數據,因此沒法分擔集羣的負載,除非咱們新建一個topic。可是如今咱們想手動將部分分區移到新添加的Kafka節點上,Kafka內部提供了相關的工具來從新分佈某個topic的分區。在從新分佈topic分區以前,咱們先來看看如今topic的各個分區的分佈位置:ruby

. /bin/kafka-topics .sh --topic iteblog --describe --zookeeper www.iteblog.com:2181
Topic:iteblog PartitionCount:7  ReplicationFactor:2 Configs:
   Topic: iteblog  Partition: 0  Leader: 1 Replicas: 1,2 Isr: 1,2
   Topic: iteblog  Partition: 1  Leader: 2 Replicas: 2,3 Isr: 2,3
   Topic: iteblog  Partition: 2  Leader: 3 Replicas: 3,4 Isr: 3,4
   Topic: iteblog  Partition: 3  Leader: 4 Replicas: 4,1 Isr: 4,1
   Topic: iteblog  Partition: 4  Leader: 1 Replicas: 1,3 Isr: 1,3
   Topic: iteblog  Partition: 5  Leader: 2 Replicas: 2,4 Isr: 2,4
   Topic: iteblog  Partition: 6  Leader: 3 Replicas: 3,1 Isr: 3,1

從上面的輸出能夠看出,iteblog主題一共有7個分區,可是咱們broker的個數只有4個,因此會致使某些broker維護更多的分區。如今咱們在現有集羣的基礎上再添加一個Kafka節點,而後使用Kafka自帶的kafka-reassign-partitions.sh工具來從新分佈分區。該工具備三種使用模式:bash

  一、generate模式,給定須要從新分配的Topic,自動生成reassign plan(並不執行)
  二、execute模式,根據指定的reassign plan從新分配Partition
  三、verify模式,驗證從新分配Partition是否成功工具

如今咱們須要將原先分佈在broker 1-4節點上的分區從新分佈到broker 1-5節點上,藉助kafka-reassign-partitions.sh工具生成reassign plan,不過咱們先得按照要求定義一個文件,裏面說明哪些topic須要從新分區,文件內容以下:this

[iteblog@www.iteblog.com ~]$ cat topics-to-move.json
{ "topics" : [{ "topic" : "iteblog" }],
  "version" :1
}

而後使用kafka-reassign-partitions.sh工具生成reassign planspa

[iteblog@www.iteblog.com ~]$ bin/kafka-reassign-partitions.sh --zookeeper www.iteblog.com:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2,3,4,5" --generate Current partition replica assignment {"version":1,"partitions":[{"topic":"iteblog","partition":3,"replicas":[4,1]},{"topic":"iteblog","partition":5,"replicas":[2,4]},{"topic":"iteblog","partition":4,"replicas":[1,3]},{"topic":"iteblog","partition":0,"replicas":[1,2]},{"topic":"iteblog","partition":6,"replicas":[3,1]},{"topic":"iteblog","partition":1,"replicas":[2,3]},{"topic":"iteblog","partition":2,"replicas":[3,4]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"iteblog","partition":3,"replicas":[3,5]},{"topic":"iteblog","partition":5,"replicas":[5,3]},{"topic":"iteblog","partition":4,"replicas":[4,1]},{"topic":"iteblog","partition":0,"replicas":[5,2]},{"topic":"iteblog","partition":6,"replicas":[1,4]},{"topic":"iteblog","partition":1,"replicas":[1,3]},{"topic":"iteblog","partition":2,"replicas":[2,4]}]} 

Proposed partition reassignment configuration下面生成的就是將分區從新分佈到broker 1-5上的結果。咱們將這些內容保存到名爲result.json文件裏面(文件名不重要,文件格式也不必定要以json爲結尾,只要保證內容是json便可),而後執行這些reassign plan:code

[iteblog@www.iteblog.com ~]$ bin/kafka-reassign-partitions.sh --zookeeper www.iteblog.com:2181 --reassignment-json-file result.json --execute Current partition replica assignment {"version":1,"partitions":[{"topic":"iteblog","partition":3,"replicas":[4,1]},{"topic":"iteblog","partition":5,"replicas":[2,4]},{"topic":"iteblog","partition":4,"replicas":[1,3]},{"topic":"iteblog","partition":0,"replicas":[1,2]},{"topic":"iteblog","partition":6,"replicas":[3,1]},{"topic":"iteblog","partition":1,"replicas":[2,3]},{"topic":"iteblog","partition":2,"replicas":[3,4]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"iteblog","partition":1,"replicas":[1,3]},{"topic":"iteblog","partition":5,"replicas":[5,3]},{"topic":"iteblog","partition":4,"replicas":[4,1]},{"topic":"iteblog","partition":6,"replicas":[1,4]},{"topic":"iteblog","partition":2,"replicas":[2,4]},{"topic":"iteblog","partition":0,"replicas":[5,2]},{"topic":"iteblog","partition":3,"replicas":[3,5]}]} 

這樣Kafka就在執行reassign plan,咱們能夠校驗reassign plan是否執行完成:blog

[iteblog@www.iteblog.com ~]$ bin/kafka-reassign-partitions.sh --zookeeper www.iteblog.com:2181 --reassignment-json-file result.json --verify Status of partition reassignment: Reassignment of partition [iteblog,1] completed successfully Reassignment of partition [iteblog,5] is still in progress Reassignment of partition [iteblog,4] completed successfully Reassignment of partition [iteblog,6] completed successfully Reassignment of partition [iteblog,2] completed successfully Reassignment of partition [iteblog,0] is still in progress Reassignment of partition [iteblog,3] completed successfully [iteblog@www.iteblog.com ~]$ bin/kafka-reassign-partitions.sh --zookeeper www.iteblog.com:2181 --reassignment-json-file result.json --verify Status of partition reassignment: Reassignment of partition [iteblog,1] completed successfully Reassignment of partition [iteblog,5] completed successfully Reassignment of partition [iteblog,4] completed successfully Reassignment of partition [iteblog,6] completed successfully Reassignment of partition [iteblog,2] completed successfully Reassignment of partition [iteblog,0] completed successfully Reassignment of partition [iteblog,3] completed successfully 

能夠看出,分區正在Reassignment的狀態是still in progress;若是分區Reassignment完成則completed successfully,而後咱們就能夠看到分區已經按照生成的reassign plan進行,咱們能夠看下topic各個分區如今的分佈狀況:coffeescript

[iteblog@www.iteblog.com ~]$ ./bin/kafka-topics.sh --topic iteblog --describe --zookeeper www.iteblog.com:2181 Topic:iteblog PartitionCount:7 ReplicationFactor:2 Configs: Topic: iteblog Partition: 0 Leader: 5 Replicas: 5,2 Isr: 2,5 Topic: iteblog Partition: 1 Leader: 1 Replicas: 1,3 Isr: 3,1 Topic: iteblog Partition: 2 Leader: 2 Replicas: 2,4 Isr: 4,2 Topic: iteblog Partition: 3 Leader: 3 Replicas: 3,5 Isr: 3,5 Topic: iteblog Partition: 4 Leader: 1 Replicas: 4,1 Isr: 1,4 Topic: iteblog Partition: 5 Leader: 5 Replicas: 5,3 Isr: 3,5 Topic: iteblog Partition: 6 Leader: 1 Replicas: 1,4 Isr: 1,4 

分區的分佈的確和操做以前不同了,broker 5上已經有分區分佈上去了。可是仔細的同窗應該能夠發現,broker 4上竟然沒有分區的Leader,這確定不是咱們想要的!因此使用kafka-reassign-partitions.sh工具生成的reassign plan只是一個建議,方便你們而已。其實咱們本身徹底能夠編輯一個reassign plan,而後執行它,以下:ip

{
     "version" : 1,
     "partitions" : [
         {
             "topic" : "iteblog" ,
             "partition" : 0,
             "replicas" : [
                 1,
                 2
             ]
         },
         {
             "topic" : "iteblog" ,
             "partition" : 1,
             "replicas" : [
                 2,
                 3
             ]
         },
         {
             "topic" : "iteblog" ,
             "partition" : 2,
             "replicas" : [
                 3,
                 4
             ]
         },
         {
             "topic" : "iteblog" ,
             "partition" : 3,
             "replicas" : [
                 4,
                 5
             ]
         },
         {
             "topic" : "iteblog" ,
             "partition" : 4,
             "replicas" : [
                 5,
                 1
             ]
         },
         {
             "topic" : "iteblog" ,
             "partition" : 5,
             "replicas" : [
                 1,
                 3
             ]
         },
         {
             "topic" : "iteblog" ,
             "partition" : 6,
             "replicas" : [
                 2,
                 4
             ]
         }
     ]
}

將上面的json數據文件保存到result.json文件中,而後也是執行它:

[iteblog@www.iteblog.com ~]$ bin/kafka-reassign-partitions.sh --zookeeper www.iteblog.com:2181 --reassignment-json-file result.json --execute 

等這個reassign plan執行完,咱們再來看看分區的分佈:

[iteblog@www.iteblog.com ~]$ ./bin/kafka-topics.sh --topic iteblog --describe --zookeeper www.iteblog.com:2181 Topic:iteblog PartitionCount:7 ReplicationFactor:2 Configs: Topic: iteblog Partition: 0 Leader: 1 Replicas: 1,2 Isr: 2,1 Topic: iteblog Partition: 1 Leader: 2 Replicas: 2,3 Isr: 3,2 Topic: iteblog Partition: 2 Leader: 3 Replicas: 3,4 Isr: 4,3 Topic: iteblog Partition: 3 Leader: 4 Replicas: 4,5 Isr: 5,4 Topic: iteblog Partition: 4 Leader: 5 Replicas: 5,1 Isr: 1,5 Topic: iteblog Partition: 5 Leader: 1 Replicas: 1,3 Isr: 3,1 Topic: iteblog Partition: 6 Leader: 2 Replicas: 2,4 Isr: 4,2 

果真已經按照咱們需求分佈了。。

相關文章
相關標籤/搜索