閱讀本教程前最好先嚐試閱讀:PHP下kafka的實踐shell
# kafka安裝目錄的bin目錄下 # * 表明咱們會使用的腳本 connect-distributed.sh kafka-log-dirs.sh kafka-streams-application-reset.sh connect-standalone.sh kafka-mirror-maker.sh kafka-topics.sh* kafka-acls.sh kafka-preferred-replica-election.sh kafka-verifiable-consumer.sh kafka-broker-api-versions.sh kafka-producer-perf-test.sh kafka-verifiable-producer.sh kafka-configs.sh kafka-reassign-partitions.sh trogdor.sh kafka-console-consumer.sh* kafka-replay-log-producer.sh windows kafka-console-producer.sh* kafka-replica-verification.sh zookeeper-security-migration.sh kafka-consumer-groups.sh* kafka-run-class.sh zookeeper-server-start.sh kafka-consumer-perf-test.sh kafka-server-start.sh zookeeper-server-stop.sh kafka-delegation-tokens.sh kafka-server-stop.sh zookeeper-shell.sh* kafka-delete-records.sh kafka-simple-consumer-shell.sh
# 建立1個分區1個副本的test話題,這裏是副本其實能夠理解爲broker裏至少擁有數量,must >=1 # --zookeeper localhost:2181 (kafka的默認端口:2181) bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test # 建立2個分區1個副本的test02話題 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test02 # 列出全部話題 bin/kafka-topics.sh --list --zookeeper localhost:2181 __consumer_offsets test test02 # 注意這裏的__consumer_offsets是kafka默認建立的,用於存儲kafka消費記錄的話題,咱們暫時不用理會 # 列出具體話題的信息 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test02 Topic:test02 PartitionCount:2 ReplicationFactor:1 Configs: Topic: test02 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test02 Partition: 1 Leader: 0 Replicas: 0 Isr: 0 # 從上面的顯示 咱們能夠發現,第一句是顯示整體信息,下面縮進顯示的是分區信息
# 啓動一個消費組消費,這裏咱們須要開啓一個shell終端,由於會等待輸出 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning # 等待輸出
# 這裏咱們須要開啓一個shell終端,由於會等待輸入 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test # 待咱們輸入消息 等待出現> > msg01 > msg02 > msg03 #注意觀察上面的消費者終端,自動輸出了咱們的消息 msg01 msg02 msg03
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list Note: This will not show information about old Zookeeper-based consumers. console-consumer-25379 console-consumer-73410 console-consumer-27127 console-consumer-61887 console-consumer-61324 # 這裏咱們再來起一個消費者再次輸出(由於以前的消費者咱們不知道最新的此次消費組id是多少) bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list Note: This will not show information about old Zookeeper-based consumers. console-consumer-25379 console-consumer-73410 console-consumer-27127 console-consumer-39416 # 這個是咱們新起的消費組id,下面咱們根據這個消費組來作實踐 console-consumer-61887 console-consumer-61324 # 查看消費組的具體信息 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-39416 Note: This will not show information about old Zookeeper-based consumers. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 9 9 0 consumer-1-94afec29-5042-4108-8619-ba94812f10a8 /127.0.0.1 consumer-1 # 查看離線的console-consumer-25379消費組 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-25379 Note: This will not show information about old Zookeeper-based consumers. Consumer group 'console-consumer-25379' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 5 9 4 - - - # 查看離線的console-consumer-27127消費組 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-27127 Note: This will not show information about old Zookeeper-based consumers. Consumer group 'console-consumer-27127' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 6 9 3 - - - # 這裏咱們發現咱們每次都會生成一個消費組一個消費者,不方便咱們作一個消費組多個消費者測試
cp config/consumer.properties config/consumer_g1.properties vim config/consumer_g1.properties # 修改消費組名 group.id=test-consumer-group => group.id=test-g1 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config config/consumer_g1.properties msg01 msg02 msg03 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --consumer.config config/consumer_g1.properties # 無輸出(由於test話題咱們只有一個分區,一個分區只能被同個消費組下面的一個消費者消費,因此這個就閒置了) # 查看消費組 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list Note: This will not show information about old Zookeeper-based consumers. console-consumer-25379 console-consumer-73410 console-consumer-27127 console-consumer-39416 test-g1 console-consumer-61887 console-consumer-61324 # 查看test-g1消費組 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-g1 Note: This will not show information about old Zookeeper-based consumers. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test 0 9 9 0 consumer-1-43922b0c-34e0-47fe-b597-984c9e6a2884 /127.0.0.1 consumer-1 # 下面咱們再開啓test02的2個消費組看看狀況 # 咱們再爲test02話題啓動2個test-g1消費組的消費者 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test02 --from-beginning --consumer.config config/consumer_g1.properties bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test02 --from-beginning --consumer.config config/consumer_g1.properties # 查看test-g1消費組 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-g1 Note: This will not show information about old Zookeeper-based consumers. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test02 0 0 0 0 consumer-1-90ccc960-557a-46ab-a799-58c35ee670d8 /127.0.0.1 consumer-1 test02 1 0 0 0 consumer-1-c6d79321-8212-4594-8a11-353f684c54fc /127.0.0.1 consumer-1 test 0 9 9 0 consumer-1-7a2706f7-a206-4c29-ae1f-3726ad21af96 /127.0.0.1 consumer-1 # 這裏你能夠在話題test產生一個消息看下,而後再test02再多產生幾條消息看下,你會發現test02的2個消費組幾乎是負載的消費了消息
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose Note: This will not show information about old Zookeeper-based consumers. CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT consumer-1-b8eeb8bc-6aa6-439b-84d8-0fcbed4a2899 /127.0.0.1 consumer-1 1 test02(1) consumer-1-7109f789-a5cf-4862-94d0-976146dbc769 /127.0.0.1 consumer-1 1 test(0) consumer-1-90ccc960-557a-46ab-a799-58c35ee670d8 /127.0.0.1 consumer-1 1 test02(0) bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state Note: This will not show information about old Zookeeper-based consumers. COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS localhost:9092 (0) range Stable 3 bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group console-consumer-25379
bin/zookeeper-shell.sh 127.0.0.1:2181 Connecting to 127.0.0.1:2181 Welcome to ZooKeeper! JLine support is disabled WATCHER:: WatchedEvent state:SyncConnected type:None path:null
ls / [cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]