使用kafka-topic.sh工具能夠執行大部分操做 建立/修改/刪除/查看集羣裏的主題。要使用所有功能,須要經過--zookeeper參數提供zookeerper鏈接字符串json
建立主題:bootstrap
建立主題須要3個參數: 主題名字 複製係數 分區數量ide
格式: kafka-topic.sh --zookeeper <zookeeper connect> --create --topic <string> --replication-factor <integer> --partitions <integer>工具
若是不須要基於機架信息的分配策略,使用參數--disable-rack-aware日誌
增長主題分區的數量至16:server
kafka-topic.sh --zookeeper <zookeeper connect> --alter --topic my-topic --partition 16blog
減小主題分區數量: 會致使消息亂序,只能刪除分區數量,從新建立索引
刪除主題:字符串
配置參數 delete.topic.enable=true kafka
kafka-topic.sh --zookeeper <zookeeper connect> --delete --topic my-topic
列出集羣全部主題
kafka-topic.sh --zookeeper <zookeeper connect> --list
列出主題詳細信息
列出集羣全部主題詳細信息
kafka-topic.sh --zookeeper <zookeeper connect> -describe
找出全部包含覆蓋配置的主題 --topic-with-overrides
列出全部包含不一樣步副本的分區 --under-replicated-partitions
kafka-topic.sh --zookeeper <zookeeper connect> --describe --under-replicated-partitions
列出全部沒有首領的分區 --unavailable-partitions
列出新版本的消費者羣組
Kafka-consumer-groups.sh --new-consumer --bootstrap-server <kafka集羣主機:port/kafka-cluster> --list
獲取舊版本消費者羣組testgroup詳細信息
kafka-consumer-group.sh --zookeeper <zookeeper connect> --describe --group testgroup
刪除消費者羣組
kafka-consumer-groups.sh --zookeeper <zookeeper connect> --delete --group testgroup
刪除消費者羣組testgroup中my-topic 主題的偏移量
kafka-consumer-groups.sh --zookeeper <zookeeper connect> --delete --group testgroup --topic my-topic
導出羣組testgroup的偏移量到offsets文件
kafka-run-class.sh kafka.tools.ExportZkOffsets --zkconnect <zookeeper connect> --group testgroup --output-file offsets
導入偏移量:
先關閉消費者
kafka-run-class.sh kafka.tools.ImportZkOffsets --zkconnect <zookeeper connect> --inpiut-file offsets
更改主題配置的命令格式:
kafka-configs.sh --zookeeper <zookeeper connect> --alter --entity-type topics --entity-name <topic name > -add-config <key>=<value>[,<key>=<value>...]
將主題my-topic 消息保留時間設置爲1小時
kafka-confihs.sh --zookeeper <zookeeper connect> --alter --entity-type topic --entity-name my-topic -add-config retention.ms=3600000
更改客戶端配置命令格式:
kafka-configs.sh --zookeeper <zookeeper connect> --alter --entity-type clients --entity-name <client ID> -add-config <key>=<value>....
列出主題my-topic 全部被覆蓋的配置:
kafka-configs.sh --zookeeper <zookeeper connect> --describe --entity-type topics --entity-name my-topic
刪除主題my-topic的retention.ms覆蓋配置
kafka-config.sh --zookeeper <zookeeper connect> --alter --entity-type topics --entity-name my-topic --delete-config retention.ms
在一個包含1主題和8個分區集羣裏啓動首選的副本選舉
kafka-preferred-replica-election.sh --zookeeper <zookeeper connect>
經過partitions.json 文件裏指定分區清單來啓動副本的選舉
kafka-prefered-replica-election.sh --zookeeper <zookeeper connect> --path-to-json-file partitions.json
修改分區副本:
爲topic.json文件裏的主題生成遷移步驟,以便將這些主題遷移至broker0 和 broker1上
kafka-reassign-partitions.sh --zookeeper <zookeeper connect> --generate --topics-to-move-json-file topics.json --broker-list 0,1
使用reassign.json 來執行建議的分區分配方案:
kafka-reassign-partitions.sh --zookeeper <zookeeper connect> --execute --reassignment-json-file reassign.json
驗證reassign.json文件裏指定的分區重分配狀況:
kafka-reassign-partitions.sh --zookeeper <zookeeper connect> --verify --reassignment-json-file reassign.json
解碼日誌片斷000052368601.log ,顯示消息的概要信息
kafka-run-class.sh kafka.tools.DumpLogSegments --files 000052368601.log
解碼日誌片斷000001.log,顯示消息內容
kafka-run-class.sh kafka.tools.DumpLogSegments --files 000001.log --print-data-log
驗證日誌片斷00001.log索引文件的正確性
kafka-run-class.sh kafka.tools.DumpLogSegments --files 00001.index,000001.log --index-sanity-check
// --verify-index-only 將會檢查索引的匹配度
對broker1和broker2上以my-開頭的主題副本進行驗證
kafka-replica-verification.sh --broker-list kafka1.com:9092,kafka2.com:9092 --topic-white-list 'my-*'
使用舊版消費者讀取單個主題
kafka-console-consumer.sh --zookeeper <zookeeper connect> --topic my-topic
向主題my-topic 生成2個消息
kafka-console-producer.sh --broker-list kafka1.com:9092,kafka2.com:9092 --topic my-topic