Apache Kafka(三)- Kakfa CLI 使用

1. Topics CLIapache

1.1  首先啓動 zookeeper 與 kafkabootstrap

> zookeeper-server-start.sh config/zookeeper.propertiessession

app

INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)socket

INFO Expiring session 0x100ab41939d0000, timeout of 6000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)工具

INFO Processed session termination for sessionid: 0x100ab41939d0000 (org.apache.zookeeper.server.PrepRequestProcessor)fetch

INFO Creating new log file: log.1d (org.apache.zookeeper.server.persistence.FileTxnLog)this

 

> kafka-server-start.sh config/server.properties.net

命令行

Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)

INFO Cluster ID = D69veaGlS5Ce3aHTsxCHkQ (kafka.server.KafkaServer)

INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)

INFO Creating /brokers/ids/0 (is it secure? false) (kafka.zk.KafkaZkClient)

INFO Registered broker 0 at path /brokers/ids/0 with addresses: ArrayBuffer(EndPoint(ip-10-0-2-70.cn-north-1.compute.internal,9092,ListenerName(PLAINTEXT),PLAINTEXT)), czxid (broker epoch): 44 (kafka.zk.KafkaZkClient)

 

這裏咱們能夠簡單的瞭解到,啓動了一個Kafka broker,id爲 0,監聽的端口爲9092。

 

1.2. 建立一個 topic

這裏須要注意的是 --replication-factor參數,例如:

> kafka-topics.sh --zookeeper 10.0.2.70:2181 --topic first_topic --create --partitions 3 --replication-factor 2

此命令會返回一個報錯:

ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.

 (kafka.admin.TopicCommand$)

此錯誤表示的是:指定的replication-factor的數量超過了broker的數量。

因此咱們使用如下命令建立一個kafka topic:

> kafka-topics.sh --zookeeper 10.0.2.70:2181 --topic first_topic --create --partitions 3 --replication-factor 1

 

而後列出已建立的kafka topics:

>  kafka-topics.sh --zookeeper 10.0.2.70:2181 --list

first_topic

 

若是咱們須要更多有關一個topic的信息,如partitions,replication-factors 等,使用--descriebe:

> kafka-topics.sh --zookeeper 10.0.2.70:2181 --topic first_topic --describe

Topic:first_topic       PartitionCount:3        ReplicationFactor:1     Configs:

        Topic: first_topic      Partition: 0    Leader: 0       Replicas: 0     Isr: 0

        Topic: first_topic      Partition: 1    Leader: 0       Replicas: 0     Isr: 0

        Topic: first_topic      Partition: 2    Leader: 0       Replicas: 0     Isr: 0

 

能夠看到此topic有3個partition,id分別爲0,1,2。每一個partition的leader都是broker 0,replicas也是broker 0,Isr也是broker 0(由於replication-replica 爲1)

 

如今咱們建立第二個topic:

> kafka-topics.sh --zookeeper 10.0.2.70:2181 --topic second_topic --create --partitions 6 --replication-factor 1

> kafka-topics.sh --zookeeper 10.0.2.70:2181 --list

first_topic

second_topic

 

1.3. 刪除一個topic

> kafka-topics.sh --zookeeper 10.0.2.70:2181 --topic second_topic --delete

Topic second_topic is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.

 

能夠看到,second_topic 被標註爲deletion。若是delete.topic.enable沒有被設置爲true,則此topic不會被刪除。

> kafka-topics.sh --zookeeper 10.0.2.70:2181 --list

first_topic

 

根據list的結果,咱們能夠看到second_topic 被刪除,說明delete.topic.enable 默認是true。

 

2. Produer CLI

根據kafka-console-produer.sh 的使用描述,在使用此腳本時,必須提供的參數是--broker-list與 –topic,如今咱們指定這兩個參數後執行:

> kafka-console-producer.sh --broker-list 10.0.2.70:9092 --topic first_topic

 

而後輸入messages:

>hello world

>are you ok?

>learning kafka

>another message :)

Ctrl + C 退出

 

在啓動一個producer時,也能夠指定它的屬性,例如:

> kafka-console-producer.sh --broker-list 10.0.2.70:9092 --topic first_topic --producer-property acks=all

>yep is acked

>hello  ack

>are you ok? acked!

>^C

 

如果咱們指定一個不存在的topic的話會怎麼樣?

> kafka-console-producer.sh --broker-list 10.0.2.70:9092 --topic new_topic

>new topic messages

[2019-08-08 03:37:47,160] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {new_topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

>what about now

>it is ok

>^C

 

能夠看到,在指定一個不存在的topic後,在輸入消息時,第一次返回了一個WARN,這是因爲此topic 沒有一個leader。正如以前提到過的,producer有自動recover的機制,因此會嘗試找到一個leader去發送消息。咱們使用list看一下結果:

> kafka-topics.sh --zookeeper 10.0.2.70 --list

first_topic

new_topic

 

> kafka-topics.sh --zookeeper 10.0.2.70 --topic new_topic --describe

Topic:new_topic PartitionCount:1        ReplicationFactor:1     Configs:

        Topic: new_topic        Partition: 0    Leader: 0       Replicas: 0     Isr: 0

 

能夠看到自動新建立的new_topic,以及建立後的默認配置:partition數目爲1,replication-factor數目也爲1。此默認設置在 server.properties 裏配置,例如:

# The default number of log partitions per topic. More partitions allow greater

# parallelism for consumption, but this will also result in more files across

# the brokers.

num.partitions=1

 

建議永遠都要先建立topic,不要使用默認建立topic

 

3. Consumer CLI

經過查看kafka-console-consumer.sh腳本,能夠看到必須的參數爲:--bootstrap-server 與 --topic。按照規則啓動一個consumer:

> kafka-console-consumer.sh --bootstrap-server 10.0.2.70:9092 --topic first_topic

 

可是能夠看到的是,此consumer並未讀取任何以前producer發送的數據。緣由在於:consumer僅會讀取在它啓動以後的數據。

因此如果咱們此時使用producer向first_topic 發送數據,則會在consumer控制檯輸出接收到的數據。

那如何獲取producer以前發送的全部數據?使用 --from-beginning

> kafka-console-consumer.sh --bootstrap-server 10.0.2.70:9092 --topic first_topic --from-beginning

learning kafka

are you ok? acked!

hello world

another message :)

yep is acked

hi

are you ok?

hello  ack

 

能夠看到,以上消息輸出的順序並不爲咱們輸入的順序。這是由於僅在同一個partition中的消息是有序的,而first_topic 中有3個partitions。如果一個topic中僅有一個partition,則此topic中的所有消息都是有序的。

 

3. Consumers in Group

3.1. 使用consumer group

> kafka-console-consumer.sh --bootstrap-server 10.0.2.70:9092 --topic first_topic --group my-first-app

使用此方法,能夠讀取到producer寫入的每條消息。

 

可是若是咱們再次啓動一個 consumer,使用一樣的 --group my-first-app:

 

最左邊的爲producer,能夠看到的是,第一個consumer先獲取一條message,而後第二個consumer獲取兩條message,而後依次類推。

 

這是因爲:consumer group裏當前有兩個consumer,而topic有3個partition,因此此時consumer group中的一個consumer會負責2個partition的讀,而另外一個consumer會負責剩餘1個partition的讀。

若此時再爲同一個consumer group啓動一個consumer,則每一個partition對應於一個consumer,此時發送3條message,會由3個consumer依次讀取。

 

3.2. 使用--from-beginning

對第二個 consumer group使用 --from-beginning:

> kafka-console-consumer.sh --bootstrap-server 10.0.2.70:9092 --topic first_topic --group my-second-app --from-beginning

learning kafka

are you ok? acked!

 

能夠看到此consumer 列出了全部以前的消息。如果咱們再次執行此命令,則會發現不會打印任何消息。

這是由於每一個group的offsets都會由Kafka記錄下來。因此再次使用此group讀數據時,會使用記錄的offsets繼續讀取數據。

 

4. Consumer Group CLI

查看 kafka-consumer-groups的用途:

This tool helps to list all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.

 

必須的參數是 --bootstrap-server

 

首先列出全部groups:

> kafka-consumer-groups.sh --bootstrap-server 10.0.2.70:9092 --list

my-first-app

my-first-application

my-second-app

 

查看一個group的詳細信息:

> kafka-consumer-groups.sh --bootstrap-server 10.0.2.70:9092 --describe --group my-first-app

 

這裏首先打出的是:consumer group ‘my-first-app’ has no active members。這是由於咱們已經中止了這個consumer group 下的全部 consumers,因此此consumer group 下面沒有一個active members。

接下打出的信息顯示了每一個partition,當前的offset;log裏最終的 offset;以及 LAG,它表示的是最終還未被消費的message數量(也就是cur-offset與log-end-offset的差)。

咱們再往 my-first-app 寫入幾條數據,而後對consumer group 作describe:

 

能夠看到 LAG 增長。

而後使用consumer-group 讀此topic:

> kafka-console-consumer.sh --bootstrap-server 10.0.2.70:9092 --topic first_topic --group my-first-app

help

yep

 

再 describe:

 

能夠看到LAG爲0,且列出了當前consumers 的 id

 

5. Reset Offset

咱們看到 consumer groups 的offset 能夠被kafka記錄,那如何重置一個consumer group 的offset?使用:

> kafka-consumer-groups.sh --bootstrap-server 10.0.2.70:9092 --reset-offsets --group my-first-app --topic first_topic --to-earliest --execute

 

GROUP                          TOPIC                          PARTITION  NEW-OFFSET

my-first-app                   first_topic                    0          0

my-first-app                   first_topic                    2          0

my-first-app                   first_topic                    1          0

 

使用consumer 檢查:

> kafka-console-consumer.sh --bootstrap-server 10.0.2.70:9092 --topic first_topic --group my-first-app

learning kafka

are you ok? acked!

 

也能夠使用--shift-by將offsets作移動,而不是重置:

 

這裏咱們用正數作--shift-by 的參數,能夠發現 offset是向後移動。因此如果須要向前移動,則須要使用負數,例如:
> kafka-consumer-groups.sh --bootstrap-server 10.0.2.70:9092 --reset-offsets --group my-first-app --topic first_topic --shift-by -2 --execute

 

GROUP                          TOPIC                          PARTITION  NEW-OFFSET

my-first-app                   first_topic                    0          12

my-first-app                   first_topic                    2          13

my-first-app                   first_topic                    1          13

 

而後使用 consumer 驗證:

> kafka-console-consumer.sh --bootstrap-server 10.0.2.70:9092 --topic first_topic --group my-first-app

help

yep

 

6. Kafka UI

以上命令均基於命令行,也能夠使用圖形化界面配置並訪問kafka,如Kafka Tool:

 

此工具官網地址以下:

http://www.kafkatool.com/

相關文章
相關標籤/搜索