kafka自身沒有監控管理頁面,不管是進行一些管理操做仍是狀態的監控都要命令加一大堆記不住的參數,實在是很不方便,不過好在在github上開源了一些工具,在kafka的生態系統中也有說起到:java
除了前面兩個,其它幾個都沒試用過,就算在網上查也是推薦前兩個而已,kafka manager基於jmx功能比較強大,利用它作管理方面;而KafkaOffsetMonitor從它的啓動參數來看應該是定時從zookeeper上獲取消費者的offset,以圖的形式展現,比較直觀(對於一些實現Exactly once的系統,offset並不保存在zookeeper上面,它將不能使用),二者結合使用,相得益彰。linux
kafka manager的源碼:https://github.com/yahoo/kafka-manager
官方要求的kafka版本:Kafka 0.8.1.1 或者 0.8.2.x 或者 0.9.0.x,不過使用kafka_0.10.1.0時也能正常。
java版本要求:Java 8+git
爲了獲得部署包kafka-manager-xxxx.zip,先根據上面的地址下載源碼再編譯(不想這麼麻煩的話,能夠去一些kafka的qq羣,通常羣共享裏都會有這個包)。kafka-manager工程是利用SBT進行構建的,因此編譯以前還須要安裝SBT,安裝Java 8。最後執行命令編譯:github
sbt clean dist
網絡很差的話可能須要重複編譯,成功後在target/universal目錄下能夠看到kafka-manager-1.3.2.1.zipweb
把編譯獲得的zip包解壓,在conf目錄中有一個application.conf文件,最小化的配置只須要在該文件中修改kafka-manager.zkhosts
參數便可:apache
kafka-manager.zkhosts="master:2181,slave1:2181,slave2:2181"
kafka服務必須要開啓JMX,不然在下一步啓動kafka-manager時會出現:
java.lang.IllegalArgumentException: requirement failed: No jmx port but jmx polling enabled!
啓動kafka服務時指定JMX_PORT值:json
JMX_PORT=9999 bin/kafka-server-start.sh -daemon config/server.properties
或者修改kafka-server-start.sh,在前面加上:bootstrap
export JMX_PORT=9999
以運行在linux上爲例,啓動腳本爲bin/kafkak-manager,該腳本會默認佔用9000端口,也能夠經過參數修改端口以及指定java版本運行:api
nohup bin/kafka-manager -java-home /usr/java/jdk1.8.0_101/ -Dhttp.port=8081 &
啓動成功後,便可以經過http://ip:8081來訪問網絡
源碼地址:https://github.com/quantifind/KafkaOffsetMonitor
jar包下載:https://github.com/quantifind/KafkaOffsetMonitor/releases/tag/v0.2.1
KafkaOffsetMonitor使用比較方便,將會被打成一個jar包,直接運行便可,做者已經把打好的包上傳到github上面,執行如下命令啓動:
java -cp KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk master,slave1,slave2:2181 --port 8082 --refresh 10.seconds --retain 2.days
啓動成功後,便可以經過http://ip:8082來訪問
基本全部腳本都是調用kafka-run-class.sh腳本去執行一個命令模式的class.建議使用腳本時參考腳本的使用說明。
建立、刪除、查看或者改變一個topic.
bin/kafka-topics.sh --zookeeper master:2181 --create --topic test --partitions 3 --replication-factor 2 --config flush.ms=1000 --config flush.messages=1
建立一個名稱爲test的topic,它有3個分區,每一個分區兩個replica,經過–config給topic設置屬性,格式爲key=value,若是有多個配置屬性則如上命令。這種建立方式kafka會自動把各個分區的replica分配到相應的broker,也能夠在建立topic時手動指定哪一個分區的哪一個replica落在指定的broker,示例命令以下:
bin/kafka-topics.sh --zookeeper master:2181 --create --topic test --config flush.ms=1000 --config flush.messages=1 --replica-assignment 0:1,1:2
關鍵的配置參數爲–replica-assignment,該參數不能與–partitions和–replication-factor同時出現,參數的使用格式以下:
broker_id_for_part0_replica1: broker_id_for_part0_replica2,
broker_id_for_part1_replica1: broker_id_for_part1_replica2,
broker_id_for_part2_replica1: broker_id_for_part2_replica2
–replica-assignment 0:1,1:2表示有兩個分區,分區0的replica1在broker.id=0的kafka服務上,分區0的replica2在broker.id=1的kafka服務上;分區1的replica1在broker.id=1的kafka服務上,分區1的replica2在broker.id=2的kafka服務上。
當使用delete命令刪除topic,默認只是進行標記,並無真正的刪除
Note: This will have no impact if delete.topic.enable is not set to true.
須要在config/server.properties配置文件中開啓delete.topic.enable=true
bin/kafka-topics.sh --zookeeper master:2181 --describe --topic test
describe名稱爲test的topic,將會顯示topic的分區數、replica因子、配置參數以及各分區的分佈狀況(leader,replica,isr),以下圖:
使用–describe時還能夠結合其它一些參數對結果進行過濾:
根據–alter參數的說明,能夠修改topic的分區數(目前只能是增長),修改配置config,以及修改replica(這裏貌似不許確,根據官網的文檔說明,若是想要增長replication factor,應該使用kafka-reassign-partitions.sh腳本)。
bin/kafka-topics.sh --zookeeper master:2181 --alter --topic test --partitions 3
成功後describe一下topic:
添加分區不能改變現有的數據
bin/kafka-topics.sh --zookeeper master:2181 --alter --topic test --config flush.ms=2000
成功後:
bin/kafka-topics.sh --zookeeper master:2181 --alter --topic test --delete-config flush.ms
成功後:
PS:對於使用–alter增長、修改和刪除config,從0.9.0.0版本後建議使用kafka-configs.sh腳本。
這個腳本專門是用來添加,修改和刪除實體的config的,其中操做的實體對象有:topic, client, user 和 broker。
添加和修改均可以統一說成更新,沒有則添加,存在即修改。結合alter,add-config以及其它一些配置,例如修改broker的某個config:
bin/kafka-configs.sh --zookeeper master:2181 --entity-type brokers --entity-name 0 --alter --add-config 'leader.replication.throttled.rate=700000000'
執行上面命令給id=0的broker添加配置leader.replication.throttled.rate後,接着查看一下該broker的config:
bin/kafka-configs.sh --zookeeper master:2181 --entity-type brokers --entity-name 0 --describe
結果:
接上,刪除id=0的broker的配置leader.replication.throttled.rate
bin/kafka-configs.sh --zookeeper master:2181 --entity-type brokers --entity-name 0 --alter --delete-config 'leader.replication.throttled.rate'
結果:
能夠列出全部消費者組,查看某個消費者組的詳細狀況以及刪除消費者組的信息(刪除只適用於舊版本基於zookeeper的消費都組)。
Kafka默認一直會有一個「KafkaManagerOffsetCache」的消費者組,爲了更加直觀,先用kafka-console-consumer.sh啓動一個消費都,並加入一個叫作「test_group」的組:
bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic test --consumer-property group.id=test_group
接着使用如下命令列出全部的消費都組:
bin/kafka-consumer-groups.sh --bootstrap-server master:9092 --list
已經能夠看到「test_group」的消費都組了:
查看消費者組的具體消費情況,結合來分析目前集羣的穩健程度,執行如下命令:
bin/kafka-consumer-groups.sh --bootstrap-server master:9092 --describe --group test_group
結果:
每一個分區的全部replicas叫作」assigned replicas」,」assigned replicas」中的第一個replicas叫」preferred replica」,剛建立的topic通常」preferred replica」(優先副本)是leader。
各分區的讀取寫請求都是由leader來接收處理的,那麼固然但願各分區的leader能夠均衡地分佈在各個broker之上,作到均衡負載,提升集羣穩定性以及充分利用資源。通常在建立topic時,kafka都會默認把leader平均分配,但當某個broker宕掉後,會致使該broker上的leader轉移到其它的broker上去,致使機羣的負載不均衡,就算宕掉的broker恢復正常,它上面已經沒有leader。可使用kafka-preferred-replica-election.sh工具令到恢復後的broker上的優先副本從新選舉成爲leader,這樣又恢復到了宕掉以前的狀態。
下面來模擬一下整個過程,首先建立一個topic,3個分區,每一個分區的leader分別在3個broker上面:
分區0的leader已經從broker0轉移到了broker1了,在Isr中也看不到本來broker0的兩個replica。最後從新啓動broker0並執行如下命令:
bin/kafka-preferred-replica-election.sh --zookeeper master:2181
再觀察test的分區狀況:
能夠看到test已經恢復到最初的leader分佈狀況了。默認是對全部分區進行優先副本選舉(preferred replica election),若是想指定操做某些分區,則須要配合–path-to-json-file參數,例如test有0,1,2三個分區,只想操做1,2分區,首先編譯test_election.json文件,內容以下:
{「partitions」:[{「topic」: 「test」, 「partition」: 1}, {「topic」: 「test」,
「partition」: 2}]}
而後執行如下命令:
bin/kafka-preferred-replica-election.sh --zookeeper master:2181 --path-to-json-file test_election.json
PS:其實能夠配置kafka自動平衡leader的,在server.properties文件中設置:auto.leader.rebalance.enable=true便可,而該配置默認是已經打開的,想驗證的話能夠重啓一個broker,隔一段時間後會發現leader會自動恢復。
當有新的broker節點加入到已經在使用的集羣,kafka是不會自動均衡本來的數據到新節點的,須要手動對分區進行遷移,使得新節點能夠對外提供服務。(對於後來建立和topic則不須要)。
首先確定要知道須要對哪些topic進行遷移,且明確哪一個分區須要遷移到哪一個broker節點。現有一個分區test,具體以下圖:
手動編輯一個json文件(例如命名爲topics-to-move.json),表示哪些topic是須要遷移的,內容以下(能夠指定多個topics):
{「topics」: [{「topic」: 「test」}], 「version」:1 }
想要把test的全部分區遷移到broker1,2,執行如下命令生成遷移計劃:
bin/kafka-reassign-partitions.sh --zookeeper master:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate
顯示了當前的分配規則(能夠用做回滾)以及新生成的分配規則,把內容保存到文件(expand-cluster-reassignment.json),固然,也能夠手動修改裏面的內容,只要符合格式便可:
{「version」:1,」partitions」:[{「topic」:」test」,」partition」:1,」replicas」:[2,1]},{「topic」:」test」,」partition」:2,」replicas」:[1,2]},{「topic」:」test」,」partition」:0,」replicas」:[1,2]}]}
根據上一步生成的分配規則expand-cluster-reassignment.json啓動遷移,執行如下命令:
bin/kafka-reassign-partitions.sh --zookeeper master:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
而後describe一下,查看新的分區分配狀況:
能夠看到如今全部分區的replica都已經所有遷移到了broker1,2上面。
仍是根據分配規則expand-cluster-reassignment.json驗證分區是否分配成功,執行如下命令:
bin/kafka-reassign-partitions.sh --zookeeper master:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
執行結果以下:
至此,分區的遷移已經完成。其實已經對分區規則熟悉的話,能夠跳過生成遷移計劃這步,直接編寫expand-cluster-reassignment.json,而後執行驗證。
爲分區增長副本,仍是使用kafka-reassign-partitions.sh命令,而後編輯副本規則json文件便可。現有如下topic:
分區0有兩個replica,分別在broker1,2上,如今準備在broker0上添加一個replica,先建立副本分配json文件(increase-replication-factor.json),內容以下:
{「version」:1,
「partitions」:[{「topic」:」test」,」partition」:0,」replicas」:[0,1,2]}]}
而後指定increase-replication-factor.json執行下面的命令:
bin/kafka-reassign-partitions.sh --zookeeper master:2181 --reassignment-json-file increase-replication-factor.json --execute
接着,一樣使用increase-replication-factor.json來驗證是否成功:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
執行結果以下:
或者describe一下topic: