Kafka簡單使用

  1. 啓動kafka.shell

    a. 要啓動kafka,必須先啓動zookeeper:bash

    # 啓動zookeeper
    zkServer.sh start
    # 查看zookeeper啓動狀態
    zkServer.sh status
    複製代碼

    b. 啓動kafka:spa

    ./bin/kafka-server-start.sh config/server.properties
    複製代碼
  2. 在kafka中建立topic:日誌

    ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    複製代碼

    建立一個分區,且分區裏面分配了一個副本,的名叫 test的topic。code

  3. 查看kafka中擁有的全部的topic主題:server

    ./bin/kafka-topics.sh --list --zookeeper localhost:2181
    複製代碼

    localhost表示當前的機子節點建立topic,也能夠是kafka集羣中其它節點的ip地址索引

  4. 查看topic主題的詳細信息:隊列

    ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
    複製代碼

    以下是topic的詳情信息:ip

    Topic:test		PartitionCount:1	ReplicationFactor:1		Configs:
    Topic: test		Partition: 0		Leader: 0			Replicas: 0		Isr: 0
    複製代碼
  5. 刪除某個topic:get

    ./bin/kafka-topics.sh --zookeeper 192.168.241.20:2181 --delete --topic test
    複製代碼

    若是kafka的配置文件config/server.properties中,delete.topic.enable=false,這個配置爲false,那麼刪除topic 的命令,就只是經過zookeeper對topic進行標記爲marked for deletion而已沒有真正的刪除,若是是true則是真正的刪除。

  6. 建立一個Producer來對kafka進行消息發送:

    ./bin/kafka-console-producer.sh --broker-list master:9092 --topic test
    複製代碼

    在kafka集羣列表的master:9092中爲kafka集羣消息隊列建立一個控制檯類型的消息生產者producer,消息的主題 類型是test

  7. 建立一個consumer消息消費者來消費kafka隊列中的消息:

    ./bin/kafka-console-consumer.sh --zookeeper master:2181 --topic test --from-beginning
    複製代碼

    在zookeeper的master節點中建立一個kafka隊列的控制檯消息消費者,消費的topic主題類型是test,從topic的 partition頭開始消費,即offset爲0

    ./bin/kafka-console-consumer.sh --zookeeper master:2181 --topic test --consumer-property 
    group.id=group_test --from-beginning
    複製代碼

    在zookeeper的master節點中建立一個kafka隊列的控制檯消息消費者,消費的topic主題類型是test,且有一個 消費組爲group_test。

  8. 查看kafka隊列中topic的partition的offset信息:

    ./bin/kafka-consumer-offset-checker.sh --zookeeper master:2181 --topic test --group group_test 
    broker-info
    複製代碼

    經過kafka-consumer-offset-checker.sh腳本命令查看topic主題test的group_test消費分組的kafka broker 節點的offset相關信息

    Group           Topic		Pid 	Offset		logSize		Lag		Owner
    group_hyb_tmp   hyb		0   	15		15		0		group_hyb_tmp_master-1543144080872-eb93ebf0-0
    
    BROKER INFO
    2 -> 192.168.241.22:9092
    複製代碼

    以上就是查看的offset相關信息:

    a. Group: 消費者的消費分組id

    b. Topic: kafka消息主題

    c. Pid: kafka的消息主題的partition id號

    d. Offset: partition中維護的消息偏移量首地址索引號大小

    e. logSize: kafka隊列消息日誌的總大小

    f. Lag: 消息數據積壓個數,若是Lag長時間存在,說明Producer生產消息到kafka隊列中長時間未被消費,說明消費速度慢,數據存在積壓問題。

    g. BROKER INFO: kafka的broker節點信息,2表示節點的myid,後面ip爲當前處理消息日誌的節點

  9. 在zookeeper的客戶端中查看offset信息:

    # 啓動zookeeper的客戶端
    ./bin/zkCli.sh
    複製代碼

    經過ls /可查看根目錄都有哪些文件夾:

    [zk: localhost:2181(CONNECTED) 22] ls /
    [cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, config]
    複製代碼

    offset信息在consumer中:

    ls /consumer
    [zk: localhost:2181(CONNECTED) 23] ls /consumers
    [group_hyb, group_hyb_tmp]
    複製代碼

    消費者信息中是消費組

    [zk: localhost:2181(CONNECTED) 24] ls /consumers/group_hyb_tmp
    [ids, owners, offsets]
    [zk: localhost:2181(CONNECTED) 25] ls /consumers/group_hyb_tmp/offsets
    [hyb]
    [zk: localhost:2181(CONNECTED) 26] ls /consumers/group_hyb_tmp/offsets/hyb
    [0]
    [zk: localhost:2181(CONNECTED) 27] ls /consumers/group_hyb_tmp/offsets/hyb/0
    []
    複製代碼

    上面能夠看到,offsets消費組裏面,offset裏是topic,topic裏面是partition,partition裏就沒文件夾了。 能夠經過如下命令來查看具體offset信息: get /consumers/group_hyb_tmp/offsets/hyb/0 具體的信息是:

    16
    cZxid = 0x4000000a4
    ctime = Sun Nov 25 19:09:00 CST 2018
    mZxid = 0x4000002d5
    mtime = Sun Nov 25 19:44:00 CST 2018
    pZxid = 0x4000000a4
    cversion = 0
    dataVersion = 2
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 2
    numChildren = 0
    複製代碼

    第一行的16,就是offset的大小信息。

相關文章
相關標籤/搜索