kafka_0.10.1.0監控及管理

kafka_0.10.1.0監控及管理

1. kafka監控

kafka自身沒有監控管理頁面,不管是進行一些管理操做仍是狀態的監控都要命令加一大堆記不住的參數,實在是很不方便,不過好在在github上開源了一些工具,在kafka的生態系統中也有說起到:java

  • Kafka Manager: 都是以表格的形式展示數據,比較方便用來管理kafka,例如topic的建立、刪除以及分區的管理等。
  • Kafka Offset Monitor: 監控消費者以及所在分區的offset,幫助分析當前的消費以及生產是否順暢,功能比較單調但界面還能夠。
  • Kafka Web Console:github上已經說明了再也不更新了,且建議使用Kafka manager
  • kafkat: 一個簡化了的命令行工具,用來管理kafka的broker,partition,topic.
  • Capillary: 若是是kafka+storm集成使用,能夠選擇該工具,是一個web應用

除了前面兩個,其它幾個都沒試用過,就算在網上查也是推薦前兩個而已,kafka manager基於jmx功能比較強大,利用它作管理方面;而KafkaOffsetMonitor從它的啓動參數來看應該是定時從zookeeper上獲取消費者的offset,以圖的形式展現,比較直觀(對於一些實現Exactly once的系統,offset並不保存在zookeeper上面,它將不能使用),二者結合使用,相得益彰。linux

2.1. Kafka-manager

kafka manager的源碼:https://github.com/yahoo/kafka-manager 
官方要求的kafka版本:Kafka 0.8.1.1 或者 0.8.2.x 或者 0.9.0.x,不過使用kafka_0.10.1.0時也能正常。 
java版本要求:Java 8+
git

2.1.1 編譯源碼

爲了獲得部署包kafka-manager-xxxx.zip,先根據上面的地址下載源碼再編譯(不想這麼麻煩的話,能夠去一些kafka的qq羣,通常羣共享裏都會有這個包)。kafka-manager工程是利用SBT進行構建的,因此編譯以前還須要安裝SBT,安裝Java 8。最後執行命令編譯:github

sbt clean dist
  • 1

網絡很差的話可能須要重複編譯,成功後在target/universal目錄下能夠看到kafka-manager-1.3.2.1.zipweb

2.1.2 修改配置

把編譯獲得的zip包解壓,在conf目錄中有一個application.conf文件,最小化的配置只須要在該文件中修改kafka-manager.zkhosts參數便可:apache

kafka-manager.zkhosts="master:2181,slave1:2181,slave2:2181"
  • 1

2.1.3 kafka啓動jmx

kafka服務必須要開啓JMX,不然在下一步啓動kafka-manager時會出現: 
java.lang.IllegalArgumentException: requirement failed: No jmx port but jmx polling enabled! 
啓動kafka服務時指定JMX_PORT值:
json

JMX_PORT=9999 bin/kafka-server-start.sh -daemon config/server.properties
  • 1

或者修改kafka-server-start.sh,在前面加上:bootstrap

export JMX_PORT=9999
  • 1

2.1.4 啓動kafka-manager

以運行在linux上爲例,啓動腳本爲bin/kafkak-manager,該腳本會默認佔用9000端口,也能夠經過參數修改端口以及指定java版本運行:api

nohup bin/kafka-manager -java-home /usr/java/jdk1.8.0_101/ -Dhttp.port=8081 &
  • 1

啓動成功後,便可以經過http://ip:8081來訪問網絡

2.2. KafkaOffsetMonitor

源碼地址:https://github.com/quantifind/KafkaOffsetMonitor 
jar包下載:https://github.com/quantifind/KafkaOffsetMonitor/releases/tag/v0.2.1 
KafkaOffsetMonitor使用比較方便,將會被打成一個jar包,直接運行便可,做者已經把打好的包上傳到github上面,執行如下命令啓動:

java -cp KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk master,slave1,slave2:2181 --port 8082 --refresh 10.seconds --retain 2.days
  • 1

啓動成功後,便可以經過http://ip:8082來訪問

3. kafka自帶腳本工具

基本全部腳本都是調用kafka-run-class.sh腳本去執行一個命令模式的class.建議使用腳本時參考腳本的使用說明。

3.1. 主題管理:kafka-topics.sh

建立、刪除、查看或者改變一個topic.

3.1.1. 建立topic

bin/kafka-topics.sh --zookeeper master:2181 --create --topic test --partitions 3 --replication-factor 2 --config flush.ms=1000 --config flush.messages=1
  • 1

建立一個名稱爲test的topic,它有3個分區,每一個分區兩個replica,經過–config給topic設置屬性,格式爲key=value,若是有多個配置屬性則如上命令。這種建立方式kafka會自動把各個分區的replica分配到相應的broker,也能夠在建立topic時手動指定哪一個分區的哪一個replica落在指定的broker,示例命令以下:

bin/kafka-topics.sh --zookeeper master:2181 --create --topic test --config flush.ms=1000 --config flush.messages=1 --replica-assignment 0:1,1:2
  • 1

關鍵的配置參數爲–replica-assignment,該參數不能與–partitions和–replication-factor同時出現,參數的使用格式以下:

broker_id_for_part0_replica1: broker_id_for_part0_replica2, 
broker_id_for_part1_replica1: broker_id_for_part1_replica2, 
broker_id_for_part2_replica1: broker_id_for_part2_replica2

–replica-assignment 0:1,1:2表示有兩個分區,分區0的replica1在broker.id=0的kafka服務上,分區0的replica2在broker.id=1的kafka服務上;分區1的replica1在broker.id=1的kafka服務上,分區1的replica2在broker.id=2的kafka服務上。

3.1.2. 刪除topic

當使用delete命令刪除topic,默認只是進行標記,並無真正的刪除

這裏寫圖片描述

Note: This will have no impact if delete.topic.enable is not set to true. 
須要在config/server.properties配置文件中開啓delete.topic.enable=true

3.1.3. 查看topic

bin/kafka-topics.sh --zookeeper master:2181 --describe --topic test
  • 1

describe名稱爲test的topic,將會顯示topic的分區數、replica因子、配置參數以及各分區的分佈狀況(leader,replica,isr),以下圖:

這裏寫圖片描述 
使用–describe時還能夠結合其它一些參數對結果進行過濾:

  • topics-with-overrides:加上該參數時,只顯示config被覆蓋過的topic(例如使用下面的–alter修改config,或者建立topic時指定–config也算)。
  • unavailable-partitions:加上該參數時,只顯示leader不可用的分區。
  • under-replicated-partitions:加上該參數時,只顯示replica不足的分區。

3.1.4. 修改topic

根據–alter參數的說明,能夠修改topic的分區數(目前只能是增長),修改配置config,以及修改replica(這裏貌似不許確,根據官網的文檔說明,若是想要增長replication factor,應該使用kafka-reassign-partitions.sh腳本)。

  • 增長分區:從2個分區增長到3個
bin/kafka-topics.sh --zookeeper master:2181 --alter --topic test --partitions 3
  • 1

成功後describe一下topic: 
這裏寫圖片描述

添加分區不能改變現有的數據

  • 添加修改配置 
    根據上圖test主題的Configs:flush.messages=1,flush.ms=1000,嘗試把flush.ms修改成2000,命令以下:
bin/kafka-topics.sh --zookeeper master:2181 --alter --topic test --config flush.ms=2000
  • 1

成功後: 
這裏寫圖片描述

  • 刪除配置 
    –alter結合–delete-config一塊兒使用能夠刪除某項配置,嘗試刪除flush.ms配置項目:
bin/kafka-topics.sh --zookeeper master:2181 --alter --topic test --delete-config flush.ms
  • 1

成功後: 
這裏寫圖片描述

PS:對於使用–alter增長、修改和刪除config,從0.9.0.0版本後建議使用kafka-configs.sh腳本。

3.2. 配置管理:kafka-configs.sh

這個腳本專門是用來添加,修改和刪除實體的config的,其中操做的實體對象有:topic, client, user 和 broker。

3.2.1. 更新配置

添加和修改均可以統一說成更新,沒有則添加,存在即修改。結合alter,add-config以及其它一些配置,例如修改broker的某個config:

bin/kafka-configs.sh --zookeeper master:2181 --entity-type brokers --entity-name 0 --alter --add-config 'leader.replication.throttled.rate=700000000'
  • 1
  • entity-type:實體類型,必須爲(topics/clients/users/brokers)其中之一。
  • entity-name:實體名稱(topic的名稱,client的id,user的principal名稱,broker的id)。
  • add-config:格式爲’ key1:value1,key2:[v21,v22],key3:value3’,多個配置能夠寫在一塊兒,比kafka-topic.sh的修改更人性化。

3.2.2. 查看配置

執行上面命令給id=0的broker添加配置leader.replication.throttled.rate後,接着查看一下該broker的config:

bin/kafka-configs.sh --zookeeper master:2181 --entity-type brokers --entity-name 0 --describe
  • 1

結果: 
這裏寫圖片描述

3.2.3. 刪除配置

接上,刪除id=0的broker的配置leader.replication.throttled.rate

bin/kafka-configs.sh --zookeeper master:2181 --entity-type brokers --entity-name 0 --alter --delete-config 'leader.replication.throttled.rate'
  • 1

結果: 
這裏寫圖片描述

3.3. 消費者組管理:kafka-consumer-groups.sh

能夠列出全部消費者組,查看某個消費者組的詳細狀況以及刪除消費者組的信息(刪除只適用於舊版本基於zookeeper的消費都組)。

3.3.1. 列出消費者組

Kafka默認一直會有一個「KafkaManagerOffsetCache」的消費者組,爲了更加直觀,先用kafka-console-consumer.sh啓動一個消費都,並加入一個叫作「test_group」的組:

bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic test --consumer-property group.id=test_group
  • 1

接着使用如下命令列出全部的消費都組:

bin/kafka-consumer-groups.sh --bootstrap-server master:9092 --list
  • 1

已經能夠看到「test_group」的消費都組了: 
這裏寫圖片描述

3.3.2. 查看消費者組

查看消費者組的具體消費情況,結合來分析目前集羣的穩健程度,執行如下命令:

bin/kafka-consumer-groups.sh --bootstrap-server master:9092 --describe --group test_group
  • 1

結果: 
這裏寫圖片描述

3.4. 均衡leader:kafka-preferred-replica-election.sh

每一個分區的全部replicas叫作」assigned replicas」,」assigned replicas」中的第一個replicas叫」preferred replica」,剛建立的topic通常」preferred replica」(優先副本)是leader。 
各分區的讀取寫請求都是由leader來接收處理的,那麼固然但願各分區的leader能夠均衡地分佈在各個broker之上,作到均衡負載,提升集羣穩定性以及充分利用資源。通常在建立topic時,kafka都會默認把leader平均分配,但當某個broker宕掉後,會致使該broker上的leader轉移到其它的broker上去,致使機羣的負載不均衡,就算宕掉的broker恢復正常,它上面已經沒有leader。可使用kafka-preferred-replica-election.sh工具令到恢復後的broker上的優先副本從新選舉成爲leader,這樣又恢復到了宕掉以前的狀態。

下面來模擬一下整個過程,首先建立一個topic,3個分區,每一個分區的leader分別在3個broker上面: 
這裏寫圖片描述

分區0的leader已經從broker0轉移到了broker1了,在Isr中也看不到本來broker0的兩個replica。最後從新啓動broker0並執行如下命令:

bin/kafka-preferred-replica-election.sh --zookeeper master:2181
  • 1

再觀察test的分區狀況: 
這裏寫圖片描述

能夠看到test已經恢復到最初的leader分佈狀況了。默認是對全部分區進行優先副本選舉(preferred replica election),若是想指定操做某些分區,則須要配合–path-to-json-file參數,例如test有0,1,2三個分區,只想操做1,2分區,首先編譯test_election.json文件,內容以下:

{「partitions」:[{「topic」: 「test」, 「partition」: 1}, {「topic」: 「test」, 
「partition」: 2}]}

而後執行如下命令:

bin/kafka-preferred-replica-election.sh --zookeeper master:2181 --path-to-json-file test_election.json
  • 1

PS:其實能夠配置kafka自動平衡leader的,在server.properties文件中設置:auto.leader.rebalance.enable=true便可,而該配置默認是已經打開的,想驗證的話能夠重啓一個broker,隔一段時間後會發現leader會自動恢復。

3.5. 擴容重分區:kafka-reassign-partitions.sh

當有新的broker節點加入到已經在使用的集羣,kafka是不會自動均衡本來的數據到新節點的,須要手動對分區進行遷移,使得新節點能夠對外提供服務。(對於後來建立和topic則不須要)。

3.5.1. 生成遷移計劃

首先確定要知道須要對哪些topic進行遷移,且明確哪一個分區須要遷移到哪一個broker節點。現有一個分區test,具體以下圖: 
這裏寫圖片描述

手動編輯一個json文件(例如命名爲topics-to-move.json),表示哪些topic是須要遷移的,內容以下(能夠指定多個topics):

{「topics」: [{「topic」: 「test」}], 「version」:1 }

想要把test的全部分區遷移到broker1,2,執行如下命令生成遷移計劃:

bin/kafka-reassign-partitions.sh --zookeeper master:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate
  • 1
  • topics-to-move-json-file:上面建立的topics-to-move.js文件
  • broker-list:分區須要遷移到的broker,格式如命令
  • generate:表示生成分配規則,內容是json串 
    執行結果以下: 
    這裏寫圖片描述

顯示了當前的分配規則(能夠用做回滾)以及新生成的分配規則,把內容保存到文件(expand-cluster-reassignment.json),固然,也能夠手動修改裏面的內容,只要符合格式便可:

{「version」:1,」partitions」:[{「topic」:」test」,」partition」:1,」replicas」:[2,1]},{「topic」:」test」,」partition」:2,」replicas」:[1,2]},{「topic」:」test」,」partition」:0,」replicas」:[1,2]}]}

3.5.2. 執行遷移計劃

根據上一步生成的分配規則expand-cluster-reassignment.json啓動遷移,執行如下命令:

bin/kafka-reassign-partitions.sh --zookeeper master:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
  • 1

而後describe一下,查看新的分區分配狀況: 
這裏寫圖片描述

能夠看到如今全部分區的replica都已經所有遷移到了broker1,2上面。

3.5.3. 驗證遷移計劃

仍是根據分配規則expand-cluster-reassignment.json驗證分區是否分配成功,執行如下命令:

bin/kafka-reassign-partitions.sh --zookeeper master:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
  • 1

執行結果以下: 
這裏寫圖片描述

至此,分區的遷移已經完成。其實已經對分區規則熟悉的話,能夠跳過生成遷移計劃這步,直接編寫expand-cluster-reassignment.json,而後執行驗證。

3.6. 增長副本:kafka-reassign-partitions.sh

爲分區增長副本,仍是使用kafka-reassign-partitions.sh命令,而後編輯副本規則json文件便可。現有如下topic: 
這裏寫圖片描述

分區0有兩個replica,分別在broker1,2上,如今準備在broker0上添加一個replica,先建立副本分配json文件(increase-replication-factor.json),內容以下:

{「version」:1, 
「partitions」:[{「topic」:」test」,」partition」:0,」replicas」:[0,1,2]}]}

而後指定increase-replication-factor.json執行下面的命令:

bin/kafka-reassign-partitions.sh --zookeeper master:2181 --reassignment-json-file increase-replication-factor.json --execute
  • 1

接着,一樣使用increase-replication-factor.json來驗證是否成功:

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
  • 1

執行結果以下: 
這裏寫圖片描述

或者describe一下topic: 
這裏寫圖片描述

相關文章
相關標籤/搜索