轉載請註明出處:http://www.cnblogs.com/xiaodf/java
bin/kafka-topics.sh --zookeeper node01:2181 --create --topic t_cdr --partitions 30 --replication-factor 2
注: partitions指定topic分區數,replication-factor指定topic每一個分區的副本數node
bin/kafka-topics.sh --zookeeper node01:2181 --list
bin/kafka-topics.sh --zookeeper node01:2181 --describe --topic t_cdr
bin/kafka-console-producer.sh --broker-list node86:9092 --topic t_cdr
bin/kafka-console-consumer.sh --zookeeper node01:2181 --topic t_cdr --from-beginning
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic hive-mdatabase-hostsltable --time -1 --broker-list node86:9092 --partitions 0
注: time爲-1時表示最大值,time爲-2時表示最小值shell
爲topic t_cdr 增長10個分區服務器
bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic t_cdr --partitions 10
bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper node01:2181 --topic t_cdr
這個會顯示出consumer group的offset狀況, 必須參數爲--group, 不指定--topic,默認爲全部topic微信
Displays the: Consumer Group, Topic, Partitions, Offset, logSize, Lag, Owner for the specified set of Topics and Consumer Group工具
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker required argument: [group] Option Description ------ ----------- --broker-info Print broker info --group Consumer group. --help Print this message. --topic Comma-separated list of consumer topics (all topics if absent). --zkconnect ZooKeeper connect string. (default: localhost:2181) Example, bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv Group Topic Pid Offset logSize Lag Owner pv page_visits 0 21 21 0 none pv page_visits 1 19 19 0 none pv page_visits 2 20 20 0 none
以上圖中參數含義解釋以下:
topic:建立時topic名稱
pid:分區編號
offset:表示該parition已經消費了多少條message
logSize:表示該partition已經寫了多少條message
Lag:表示有多少條message沒有被消費。
Owner:表示消費者大數據
細看kafka-run-class.sh腳本,它是調用 了ConsumerOffsetChecker的main方法,因此,咱們也能夠經過java代碼來訪問scala的ConsumerOffsetChecker類,代碼以下:ui
import kafka.tools.ConsumerOffsetChecker; /** * kafka自帶不少工具類,其中ConsumerOffsetChecker能查看到消費者消費的狀況, * ConsumerOffsetChecker只是將信息打印到標準的輸出流中 * */ public class RunClass { public static void main(String[] args) { //group-1是消費者的group名稱,能夠在zk中 String[] arr = new String[]{"--zookeeper=192.168.199.129:2181,192.168.199.130:2181,192.168.199.131:2181/kafka","--group=group-1"}; ConsumerOffsetChecker.main(arr); } }
更多大數據技術乾貨,歡迎關注「大數據技術進階」微信公衆號
this