下載kafka,自帶 zookeeper。apache
zookeeper 集羣使用 Raft 選舉模式,故至少要三個節點(生產中應部署在三個不一樣的服務器實例上,這裏用於演示就不那麼作了)。bootstrap
# 複製三分節點配置 cp config/zookeeper.properties config/zookeeper.2181.properties cp config/zookeeper.properties config/zookeeper.2182.properties cp config/zookeeper.properties config/zookeeper.2183.properties
修改配置config/zookeeper.2181.properties
服務器
# the directory where the snapshot is stored. dataDir=/tmp/zookeeper/2181 # the port at which the clients will connect clientPort=2181 # disable the per-ip limit on the number of connections since this is a non-production config maxClientCnxns=0 tickTime=2000 initLimit=10 syncLimit=5 server.1=localhost:12888:13888 server.2=localhost:22888:23888 server.3=localhost:32888:33888
config/zookeeper.2182.properties
修改clientPort=2182
dataDir=/tmp/zookeeper/2182
其餘一致config/zookeeper.2183.properties
修改clientPort=2183
dataDir=/tmp/zookeeper/2183
其餘一致less
主要是修改服務端口clientPort
和數據目錄dataDir
,其餘參數表徵以下:tickTime=2000
爲zk的基本時間單元,毫秒initLimit=10
Leader-Follower初始通訊時限(tickTime*10
)syncLimit=5
Leader-Follower同步通訊時限(tickTime*5
)server.實例集羣標識=實例地址:數據通訊端口:選舉通訊端口
this
爲實例添加集羣標識url
echo 1 >> /tmp/zookeeper/2181/myid echo 2 >> /tmp/zookeeper/2182/myid echo 3 >> /tmp/zookeeper/2183/myid
啓動集羣服務日誌
bin/zookeeper-server-start.sh config/zookeeper.2181.properties bin/zookeeper-server-start.sh config/zookeeper.2182.properties bin/zookeeper-server-start.sh config/zookeeper.2183.properties
Kafka
集羣節點>=2
時即可對外提供高可用服務code
cp config/server.properties config/server.9092.properties cp config/server.properties config/server.9093.properties
修改節點標識、服務端口、數據目錄和zk集羣節點列表vi config/server.9092.properties
orm
broker.id=1 ... listeners=PLAINTEXT://:9092 ... log.dirs=/tmp/kafka-logs/1 ... zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
vi config/server.9093.properties
server
broker.id=2 ... listeners=PLAINTEXT://:9093 ... log.dirs=/tmp/kafka-logs/2 ... zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
啓動集羣
bin/kafka-server-start.sh config/server.9092.properties bin/kafka-server-start.sh config/server.9093.properties
topic
bin/kafka-topics.sh --create \ --zookeeper localhost:2181,localhost:2182,localhost:2183 \ --replication-factor 2 \ --partition 4 \ --topic topic_1
--replication-factor 2
:副本集數量,不能大於 broker 節點數量,多了也沒用,1個節點放>=2個副本掛了都完蛋。--partition 4
:分區數
topic
列表bin/kafka-topics.sh \ --zookeeper localhost:2181,localhost:2182,localhost:2183 --list topic_1 topic_2
能夠描述Topic分區數/副本數/副本Leader/副本ISR
等信息:
bin/kafka-topics.sh \ --zookeeper localhost:2181,localhost:2182,localhost:2183 \ --describe --topic topic_1 Topic:topic_1 PartitionCount:4 ReplicationFactor:2 Configs: Topic: topic_1 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: topic_1 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: topic_1 Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1 Topic: topic_1 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
注意,只是刪除Topic
在zk
的元數據,日誌數據仍需手動刪除。
bin/kafka-topics.sh \ --zookeeper localhost:2181,localhost:2182,localhost:2183 \ --delete --topic topic_2 #Topic topic_2 is marked for deletion. #Note: This will have no impact if delete.topic.enable is not set to true. #再查看topic列表 bin/kafka-topics.sh \ --zookeeper localhost:2181,localhost:2182,localhost:2183 --list #topic_1 #topic_2 - marked for deletion
bin/kafka-console-producer.sh \ --broker-list localhost:9092,localhost:9093 \ --topic topic_1 # 進入 cli 輸入消息回車發送 # hello kafka [enter] # send message [enter]
新模式,offset
存儲在borker
--new-consumer Use new consumer. This is the default.
--bootstrap-server <server to connectto> REQUIRED (unless old consumer is used): The server to connect to.
老消費模式,offset
存儲在zk
--zookeeper <urls> REQUIRED (only when using old consumer): The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.
bin/kafka-console-consumer.sh \ --new-consumer \ --bootstrap-server localhost:9092,localhost:9093 \ --from-beginning \ --topic topic_1
能夠嘗試建立多個不一樣消費組的消費者
(這裏的sh
腳本建立的都是不一樣消費組的),訂閱同一個topic
來實現發佈訂閱模式。
bin/kafka-consumer-groups.sh \ --new-consumer \ --bootstrap-server localhost:9092,localhost:9093 \ --list #這裏有兩個消費組的消費者 console-consumer-47566 console-consumer-50875
能夠查看到消費的訂閱的 topic,負責的 partition,消費進度 offset, 積壓的消息LAG
。
bin/kafka-consumer-groups.sh \ --new-consumer \ --bootstrap-server localhost:9092,localhost:9093 \ --group console-consumer-47566 \ --describe GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER console-consumer-47566 topic_1 0 2 2 0 consumer-1_/127.0.0.1 console-consumer-47566 topic_1 1 3 3 0 consumer-1_/127.0.0.1 console-consumer-47566 topic_1 2 2 3 1 consumer-1_/127.0.0.1 console-consumer-47566 topic_1 3 0 3 3 consumer-1_/127.0.0.1