PHP下kafka的經常使用腳本實踐

閱讀本教程前最好先嚐試閱讀:PHP下kafka的實踐shell

自帶命令實踐

嘗試實踐的kafka知識:

  1. 建立話題
  2. 生產消息
  3. 消費消息
  4. 話題信息
  5. 獲取消費組
  6. 獲取消費組的offset

自帶的命令

# kafka安裝目錄的bin目錄下
# * 表明咱們會使用的腳本
connect-distributed.sh        kafka-log-dirs.sh                    kafka-streams-application-reset.sh
connect-standalone.sh         kafka-mirror-maker.sh                kafka-topics.sh*
kafka-acls.sh                 kafka-preferred-replica-election.sh  kafka-verifiable-consumer.sh
kafka-broker-api-versions.sh  kafka-producer-perf-test.sh          kafka-verifiable-producer.sh
kafka-configs.sh              kafka-reassign-partitions.sh         trogdor.sh
kafka-console-consumer.sh*    kafka-replay-log-producer.sh         windows
kafka-console-producer.sh*    kafka-replica-verification.sh        zookeeper-security-migration.sh
kafka-consumer-groups.sh*     kafka-run-class.sh                   zookeeper-server-start.sh
kafka-consumer-perf-test.sh   kafka-server-start.sh                zookeeper-server-stop.sh
kafka-delegation-tokens.sh    kafka-server-stop.sh                 zookeeper-shell.sh*
kafka-delete-records.sh       kafka-simple-consumer-shell.sh

建立話題(kafka-topics.sh)

# 建立1個分區1個副本的test話題,這裏是副本其實能夠理解爲broker裏至少擁有數量,must >=1
# --zookeeper localhost:2181 (kafka的默認端口:2181)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

# 建立2個分區1個副本的test02話題
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test02

# 列出全部話題
bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
test
test02
# 注意這裏的__consumer_offsets是kafka默認建立的,用於存儲kafka消費記錄的話題,咱們暫時不用理會


# 列出具體話題的信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0


bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test02
Topic:test02    PartitionCount:2        ReplicationFactor:1     Configs:
        Topic: test02   Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: test02   Partition: 1    Leader: 0       Replicas: 0     Isr: 0

# 從上面的顯示 咱們能夠發現,第一句是顯示整體信息,下面縮進顯示的是分區信息

消費者(kafka-console-consumer.sh)

# 啓動一個消費組消費,這裏咱們須要開啓一個shell終端,由於會等待輸出
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
# 等待輸出

生產者(kafka-console-consumer.sh)

# 這裏咱們須要開啓一個shell終端,由於會等待輸入
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
# 待咱們輸入消息 等待出現>
> msg01
> msg02
> msg03

#注意觀察上面的消費者終端,自動輸出了咱們的消息
msg01
msg02
msg03

查看消費組

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
Note: This will not show information about old Zookeeper-based consumers.
console-consumer-25379
console-consumer-73410
console-consumer-27127
console-consumer-61887
console-consumer-61324

# 這裏咱們再來起一個消費者再次輸出(由於以前的消費者咱們不知道最新的此次消費組id是多少)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
Note: This will not show information about old Zookeeper-based consumers.
console-consumer-25379
console-consumer-73410
console-consumer-27127
console-consumer-39416 # 這個是咱們新起的消費組id,下面咱們根據這個消費組來作實踐
console-consumer-61887
console-consumer-61324

# 查看消費組的具體信息
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-39416
Note: This will not show information about old Zookeeper-based consumers.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
test            0          9               9               0               consumer-1-94afec29-5042-4108-8619-ba94812f10a8 /127.0.0.1      consumer-1

# 查看離線的console-consumer-25379消費組
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-25379
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'console-consumer-25379' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test            0          5               9               4               -               -               -

# 查看離線的console-consumer-27127消費組
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-27127
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'console-consumer-27127' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
test            0          6               9               3               -               -               -

# 這裏咱們發現咱們每次都會生成一個消費組一個消費者,不方便咱們作一個消費組多個消費者測試

啓動一個消費組多個消費者

cp config/consumer.properties config/consumer_g1.properties
vim config/consumer_g1.properties
# 修改消費組名
group.id=test-consumer-group => group.id=test-g1

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config config/consumer_g1.properties
msg01
msg02
msg03

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config config/consumer_g1.properties
# 無輸出(由於test話題咱們只有一個分區,一個分區只能被同個消費組下面的一個消費者消費,因此這個就閒置了)

# 查看消費組
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
Note: This will not show information about old Zookeeper-based consumers.
console-consumer-25379
console-consumer-73410
console-consumer-27127
console-consumer-39416
test-g1
console-consumer-61887
console-consumer-61324

# 查看test-g1消費組
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-g1
Note: This will not show information about old Zookeeper-based consumers.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
test            0          9               9               0               consumer-1-43922b0c-34e0-47fe-b597-984c9e6a2884 /127.0.0.1      consumer-1

# 下面咱們再開啓test02的2個消費組看看狀況

# 咱們再爲test02話題啓動2個test-g1消費組的消費者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test02 --from-beginning --consumer.config config/consumer_g1.properties

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test02 --from-beginning --consumer.config config/consumer_g1.properties

# 查看test-g1消費組
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-g1
Note: This will not show information about old Zookeeper-based consumers.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
test02          0          0               0               0               consumer-1-90ccc960-557a-46ab-a799-58c35ee670d8 /127.0.0.1      consumer-1
test02          1          0               0               0               consumer-1-c6d79321-8212-4594-8a11-353f684c54fc /127.0.0.1      consumer-1
test            0          9               9               0               consumer-1-7a2706f7-a206-4c29-ae1f-3726ad21af96 /127.0.0.1      consumer-1

# 這裏你能夠在話題test產生一個消息看下,而後再test02再多產生幾條消息看下,你會發現test02的2個消費組幾乎是負載的消費了消息

消費組的一些信息命令

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
Note: This will not show information about old Zookeeper-based consumers.

CONSUMER-ID                                     HOST            CLIENT-ID       #PARTITIONS     ASSIGNMENT
consumer-1-b8eeb8bc-6aa6-439b-84d8-0fcbed4a2899 /127.0.0.1      consumer-1      1               test02(1)
consumer-1-7109f789-a5cf-4862-94d0-976146dbc769 /127.0.0.1      consumer-1      1               test(0)
consumer-1-90ccc960-557a-46ab-a799-58c35ee670d8 /127.0.0.1      consumer-1      1               test02(0)


bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state
Note: This will not show information about old Zookeeper-based consumers.

COORDINATOR (ID)          ASSIGNMENT-STRATEGY       STATE                #MEMBERS
localhost:9092 (0)        range                     Stable               3


bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group console-consumer-25379

zookeeper(zookeeper-shell.sh)

鏈接

bin/zookeeper-shell.sh 127.0.0.1:2181
Connecting to 127.0.0.1:2181
Welcome to ZooKeeper!
JLine support is disabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null

經常使用命令

ls /
[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
相關文章
相關標籤/搜索