啓動kafka.shell
a. 要啓動kafka,必須先啓動zookeeper:bash
# 啓動zookeeper
zkServer.sh start
# 查看zookeeper啓動狀態
zkServer.sh status
複製代碼
b. 啓動kafka:spa
./bin/kafka-server-start.sh config/server.properties
複製代碼
在kafka中建立topic:日誌
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
複製代碼
建立一個分區,且分區裏面分配了一個副本,的名叫 test的topic。code
查看kafka中擁有的全部的topic主題:server
./bin/kafka-topics.sh --list --zookeeper localhost:2181
複製代碼
localhost表示當前的機子節點建立topic,也能夠是kafka集羣中其它節點的ip地址索引
查看topic主題的詳細信息:隊列
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
複製代碼
以下是topic的詳情信息:ip
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
複製代碼
刪除某個topic:get
./bin/kafka-topics.sh --zookeeper 192.168.241.20:2181 --delete --topic test
複製代碼
若是kafka的配置文件config/server.properties中,delete.topic.enable=false,這個配置爲false,那麼刪除topic 的命令,就只是經過zookeeper對topic進行標記爲marked for deletion而已沒有真正的刪除,若是是true則是真正的刪除。
建立一個Producer來對kafka進行消息發送:
./bin/kafka-console-producer.sh --broker-list master:9092 --topic test
複製代碼
在kafka集羣列表的master:9092中爲kafka集羣消息隊列建立一個控制檯類型的消息生產者producer,消息的主題 類型是test
建立一個consumer消息消費者來消費kafka隊列中的消息:
./bin/kafka-console-consumer.sh --zookeeper master:2181 --topic test --from-beginning
複製代碼
在zookeeper的master節點中建立一個kafka隊列的控制檯消息消費者,消費的topic主題類型是test,從topic的 partition頭開始消費,即offset爲0
./bin/kafka-console-consumer.sh --zookeeper master:2181 --topic test --consumer-property
group.id=group_test --from-beginning
複製代碼
在zookeeper的master節點中建立一個kafka隊列的控制檯消息消費者,消費的topic主題類型是test,且有一個 消費組爲group_test。
查看kafka隊列中topic的partition的offset信息:
./bin/kafka-consumer-offset-checker.sh --zookeeper master:2181 --topic test --group group_test
broker-info
複製代碼
經過kafka-consumer-offset-checker.sh腳本命令查看topic主題test的group_test消費分組的kafka broker 節點的offset相關信息
Group Topic Pid Offset logSize Lag Owner
group_hyb_tmp hyb 0 15 15 0 group_hyb_tmp_master-1543144080872-eb93ebf0-0
BROKER INFO
2 -> 192.168.241.22:9092
複製代碼
以上就是查看的offset相關信息:
a. Group: 消費者的消費分組id
b. Topic: kafka消息主題
c. Pid: kafka的消息主題的partition id號
d. Offset: partition中維護的消息偏移量首地址索引號大小
e. logSize: kafka隊列消息日誌的總大小
f. Lag: 消息數據積壓個數,若是Lag長時間存在,說明Producer生產消息到kafka隊列中長時間未被消費,說明消費速度慢,數據存在積壓問題。
g. BROKER INFO: kafka的broker節點信息,2表示節點的myid,後面ip爲當前處理消息日誌的節點
在zookeeper的客戶端中查看offset信息:
# 啓動zookeeper的客戶端
./bin/zkCli.sh
複製代碼
經過ls /
可查看根目錄都有哪些文件夾:
[zk: localhost:2181(CONNECTED) 22] ls /
[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, config]
複製代碼
offset信息在consumer中:
ls /consumer
[zk: localhost:2181(CONNECTED) 23] ls /consumers
[group_hyb, group_hyb_tmp]
複製代碼
消費者信息中是消費組
[zk: localhost:2181(CONNECTED) 24] ls /consumers/group_hyb_tmp
[ids, owners, offsets]
[zk: localhost:2181(CONNECTED) 25] ls /consumers/group_hyb_tmp/offsets
[hyb]
[zk: localhost:2181(CONNECTED) 26] ls /consumers/group_hyb_tmp/offsets/hyb
[0]
[zk: localhost:2181(CONNECTED) 27] ls /consumers/group_hyb_tmp/offsets/hyb/0
[]
複製代碼
上面能夠看到,offsets消費組裏面,offset裏是topic,topic裏面是partition,partition裏就沒文件夾了。 能夠經過如下命令來查看具體offset信息: get /consumers/group_hyb_tmp/offsets/hyb/0
具體的信息是:
16
cZxid = 0x4000000a4
ctime = Sun Nov 25 19:09:00 CST 2018
mZxid = 0x4000002d5
mtime = Sun Nov 25 19:44:00 CST 2018
pZxid = 0x4000000a4
cversion = 0
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 2
numChildren = 0
複製代碼
第一行的16,就是offset的大小信息。