kafka維護工具使用指南

一.前言

  1. kafka-manager是Yahoo開源的Kafka管理工具,並在持續提交。是目前最好用kafka監控開源軟件。
  2. 因爲文檔較少,scala開發,對於有些術語的確沒法理解,請抱歉。
  3. 本文也介紹運維經常使用kafka命令。
  4. 文檔根據你們反饋,會持續更新,請關注.

二.kafka-manager介紹

  1. 管理多個集羣
  2. 方便查看集羣狀態
  3. 執行preferred replica election
  4. 批量爲多個Topic生成並執行Partition分配方案
  5. 建立Topic
  6. 刪除Topic(只支持0.8.2及以上版本,同時要求在Broker中將delete.topic.enable設置爲true)
  7. 爲已有Topic添加Partition
  8. 更新Topic配置
  9. 在Broker JMX Reporter開啓的前提下,輪詢Broker級別和Topic級別的Metrics
  10. 監控Consumer Group及其消費狀態
  11. 支持添加和查看LogKafka

三. topic 監控管理


topic listcss

broker


  1. topic支持Search屬性,重點解釋屬性意思。
  2. partitions:建立的partition的個數。
  3. brokers:使用到broker個數。
  4. brokers spread: Spread : As you can see in the source code again app/kafka/manager/model/ActorModel.scala#L400, brokers spread is the percentage of brokers in the cluster that has partitions for the given topic.
    Example: 3 brokers share a topic that has 2 partitions, so 66% of the brokers have partitions for this topic
    對於該topic的partition分佈狀況,只使用到66%的broker。html

  5. brokers skew: Skewed : As you can see in the source code app/kafka/manager/model/ActorModel.scala#L380, a broker is skewed if its number of partitions is greater that the average of partitions per broker on the given topic.
    Example: 2 brokers share 4 partitions, if one of them has 3 partitions, it is skewed (3 > 2)web

  6. brokers Leader skew :Percentage of brokers having more partitions as leader than the averageapache

  7. replicas: 複製數
  8. under replicated: unknown
  9. Producer Message/Sec :每秒鐘生產消息數量
  10. Summed Recent OffSet: 全部partition的offset的總和。

topic viewjson

broker

broker


  1. TopicSummary:相似列表的屬性描述。
  2. Metrics:Kafka使用Yammer Metrics來報告服務端和客戶端的Metric信息。Yammer Metrics 3.1.0提供6種形式的Metrics收集——Meters,Gauges,Counters,Histograms,Timers,Health Checks。與此同時,Yammer Metrics將Metric的收集與報告(或者說發佈)分離,能夠根據須要自由組合。目前它支持的Reporter有Console Reporter,JMX Reporter,HTTP Reporter,CSV Reporter,SLF4J Reporter,Ganglia Reporter,Graphite Reporter。所以,Kafka也支持經過以上幾種Reporter輸出其Metrics信息
  3. 採集producer,consumer的Metrics,發送JMX裏。有每秒消息數量,流入流出byte,
  4. partitionInformation:partition複製,leader,及最後偏移量值。
  5. add partition:只可以增大數量,不可以減小。
  6. delete topic:能夠刪除topic。
  7. 對於Reassign Partitions;Generate Partition Assignments;Manual Partition Assignments; 後面將深刻介紹。

consumer groupapp

broker


  1. logSize: partition總的數量。
  2. consumer offset:對該partition當前的已消費的量。
  3. Lag:對該partition剩餘的消息量。
  4. Consumer Instance Owner:能夠根據ip知道是那些機器線程綁定partition的。

三. Preferred-replica-election 優先複製選舉,

  1. 官方文檔這樣描述這個功能,balancing leadership,從新平衡partitionLeader。意思當一個broker中止或者崩潰,在broker上的leader將轉移到partition其它複製的broker上去。當該broker重啓後broker上全部的partition僅僅作爲複製的follower來使用,不能支持client的讀寫操做,形成資源浪費。
  2. 在使用0.8的kafka時,遇到過該問題。這樣對於複製來講是不平衡的。kafka支持採用下面的commod來從新平衡關係。
  3. kafka 10.0後重啓broker後自動觸發rebanacing leadership。由增長auto.leader.rebalance.enable=true控制機制,默認爲true。
-- 直接平衡全部的topic

 ./bin/kafka-preferred-replica-election.sh --zookeeper 172.16.30.13:2181/kafkanew

-- 也能夠指定某個topic的平衡,先建立一個test_two_leader json文件。


 {"version":1,
 "partitions":[{"topic":"dev_infra_andre_test_two","partition":0}]}

 ./bin/kafka-preferred-replica-election.sh --zookeeper 172.16.30.13:2181/kafkanew -path-to-json-file test_two_leader
--- 重啓broker日誌

 [2017-08-03 14:13:51,240] INFO [ReplicaFetcherThread-0-3], Starting  (kafka.server.ReplicaFetcherThread)
[2017-08-03 14:13:51,241] INFO [ReplicaFetcherManager on broker 5] Added fetcher for partitions List([[dev_infra_andre_test_two,0], initOffset 127425 to broker BrokerEndPoint(3,172.16.30.13,9095)] , [[dev_infra_andre_test_two,2], initOffset 130260 to broker BrokerEndPoint(4,172.16.30.13,9096)] , [[ci_online_pay_log,0], initOffset 0 to broker BrokerEndPoint(3,172.16.30.13,9095)] , [[ci_login,0], initOffset 0 to broker BrokerEndPoint(3,172.16.30.13,9095)] ) (kafka.server.ReplicaFetcherManager)
[2017-08-03 14:14:53,197] INFO [ReplicaFetcherManager on broker 5] Removed fetcher for partitions [dev_infra_andre_test_two,2],[ci_login,0],[ci_online_pay_log,0] (kafka.server.ReplicaFetcherManager)
[2017-08-03 14:14:53,248] INFO [ReplicaFetcherThread-0-4], Shutting down (kafka.server.ReplicaFetcherThread)
[2017-08-03 14:14:53,251] INFO [ReplicaFetcherThread-0-4], Stopped  (kafka.server.ReplicaFetcherThread)
[2017-08-03 14:14:53,252] INFO [ReplicaFetcherThread-0-4], Shutdown completed (kafka.server.ReplicaFetcherThread)
  1. 點擊kafka-manager header上面的Preferred Replica Election,點擊Run Preferred Replica Election 按鈕也能觸發。運維

  2. 測試環境的例子,先關閉broker5的auto.leader.rebalance.enable=falsesvg


1.重啓broker 5前,topic的partition分佈狀況工具

broker

2.重啓broker 5後,topic的partition分佈狀況測試

broker

3 執行上面的Preferred Replica Election,後leader關係被調整。

broker


三. kafka-reassign-partitions操做

  1. 當kafka集成添加新的broker節點時候,新broker會沒有任何topic與負載。須要手動遷移一些topic到新broker,即用到該工具。
  2. reassignment tools提供三種模式
  3. –generate: 它是用來很是便捷的生產分配方案json的工具,用來遷移一些topic到指定的broker上,它用來生產reassign的json文件,不會實際的執行。
  4. –execute: 它是用來執行用戶自定reassignment計劃的,它經過執行一個json配置文件來實現。
  5. –verify: In this mode, the tool verifies the status of the reassignment for all partitions listed during the last –execute. The status can be either of successfully completed, failed or in progress 。它是用來驗證上一次執行的commod,能夠查看到每一個partition的處理狀態。

1.reassign 自動遷移topic的數據到新的broker機器

  1. 直接採用官網的例子,移動topic f001,foo2到新的brokers5,6。移動後topic f001,foo2的partition只在broker5,6中。
  2. 官網例子文檔
--- 建立分配json 

> cat topics-to-move.json
{"topics": [{"topic": "foo1"},
            {"topic": "foo2"}],
 "version":1
}



--- 生成一個候選任務,候選任務expand-cluster-reassignment.json。

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
               {"topic":"foo1","partition":0,"replicas":[3,4]},
               {"topic":"foo2","partition":2,"replicas":[1,2]},
               {"topic":"foo2","partition":0,"replicas":[3,4]},
               {"topic":"foo1","partition":1,"replicas":[2,3]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Proposed partition reassignment configuration

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
               {"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":2,"replicas":[5,6]},
               {"topic":"foo2","partition":0,"replicas":[5,6]},
               {"topic":"foo1","partition":1,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[5,6]}]
}



-------  實際執行該分配任務,

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute



----- 校驗任務的執行狀況:

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
Reassignment of partition [foo2,2] completed successfully

2. 自定義topic的partition的複製移動指定broker上。

1.例子移動topic foo1的partition 0移到 5,6 broker上,移動topic foo2的partition 1移動到brokers 2,3

> cat custom-reassignment.json
{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}



> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
               {"topic":"foo2","partition":1,"replicas":[3,4]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

3. 爲topic的partition調整複製節點數。

1.例子:爲topic foo的partition 0把複製從1個調整爲3個節點,之前節點只在broker5上,如今將增長brokers 6,7.

-- 構建json文件

> cat increase-replication-factor.json
{"version":1,
 "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}


--- 執行

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}


-- 查詢result

> bin/kafka-topics.sh --zookeeper localhost:2181 --topic foo --describe
Topic:foo   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: foo  Partition: 0    Leader: 5   Replicas: 5,6,7 Isr: 5,6,7