一、建立Topic
./kafka-topics.sh --zookeeper cloud17:2181,cloud18:2181,cloud19:2181 --topic PM_TOPIC_KPI2DB_02 --replication-factor 1 --partitions 3 --createnode
二、Topic列表
./kafka-topics.sh --zookeeper kafka1.sdy:2181,kafka2.sdy:2181,kafka3.sdy:2181 --listshell
三、建立生產者
./kafka-console-producer.sh --broker-list kafka1.sdy:2181,kafka2.sdy:2181,kafka3.sdy:2181 --topic test_wu_2017vim
四、消費者
./kafka-console-consumer.sh --zookeeper kafka1.sdy:2181,kafka2.sdy:2181,kafka3.sdy:2181 --topic PM_TOPIC_07
--from-beginning 每次都是從開始位置消費,在生產環境下不建議這樣使用。工具
五、查看指定topic
./kafka-topics.sh --describe --zookeeper c13-138:2181 --topic mytopic日誌
六、刪除topic
./kafka-topics.sh --delete --zookeeper cloud17:2181,cloud18:2181,cloud19:2181 --topic PM_TOPICserver
七、顯示出Consumer的Group、Topic、分區ID、分區對應已經消費的Offset、logSize大小,Lag以及Owner等信息。
使用腳本:kafka-consumer-offset-checker.sh
./kafka-consumer-offset-checker.sh --zookeeper c13-138:2181,c13-139:2181,c13-141:2181 --topic mytopic --group xb_id --broker-info索引
八、有時候咱們須要驗證日誌索引是否正確,或者僅僅想從log文件中直接打印消息。
使用腳本:kafka-run-class.sh
./kafka-run-class.sh kafka.tools.DumpLogSegments
./kafka-run-class.sh kafka.tools.DumpLogSegments /nodedata/kafka/kafka-logs/xb_topic-0/00000000000000000033.log
./kafka-run-class.sh kafka.tools.DumpLogSegments --files /nodedata/kafka/kafka-logs/xb_topic-0/00000000000000000033.log --print-data-log進程
九、導出Zookeeper中Group相關的偏移量。有時候咱們須要導出某個Consumer group各個分區的偏移量。
使用腳本:kafka-run-class.sh
./kafka-run-class.sh kafka.tools.ExportZkOffsets
./kafka-run-class.sh kafka.tools.ExportZkOffsets --group xb_id --zkconnect c13-138:2181,c13-139:2181,c13-141:2181 --output-file ~/offset
vim ~/offsetkafka
十、這個工具主要做用是從一個Kafka集羣裏面讀取指定Topic的消息,並將這些消息發送到其餘集羣的指定topic中
使用腳本:./kafka-replay-log-producer.sh it
十一、kafka-simple-consumer-shell.sh工具主要是使用Simple Consumer API從指定Topic的分區讀取數據並打印在終端
使用腳本:./kafka-simple-consumer-shell.sh --broker-list c13-141:9092 --topic mytopic --partition 0
十二、kafka.tools.UpdateOffsetsInZK工具能夠更新Zookeeper中指定Topic全部分區的偏移量,能夠指定成 earliest或者latest:
使用工具:./kafka-run-class.sh kafka.tools.UpdateOffsetsInZK
最後再說哈kafka的啓動和中止。 啓動kafka: ./kafka-server-start.sh /kafka/config/server.properties >/dev/null 2>&1 & 中止kafka: 直接kill掉進程就行。