Kafka - Zookeeper/Kafka集羣的部署

下載kafka,自帶 zookeeper。apache

搭建Zookeeper集羣

zookeeper 集羣使用 Raft 選舉模式,故至少要三個節點(生產中應部署在三個不一樣的服務器實例上,這裏用於演示就不那麼作了)。bootstrap

# 複製三分節點配置
cp config/zookeeper.properties config/zookeeper.2181.properties
cp config/zookeeper.properties config/zookeeper.2182.properties
cp config/zookeeper.properties config/zookeeper.2183.properties

修改配置
config/zookeeper.2181.properties服務器

# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper/2181
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
tickTime=2000
initLimit=10
syncLimit=5
server.1=localhost:12888:13888
server.2=localhost:22888:23888
server.3=localhost:32888:33888

config/zookeeper.2182.properties 修改clientPort=2182 dataDir=/tmp/zookeeper/2182 其餘一致
config/zookeeper.2183.properties 修改clientPort=2183 dataDir=/tmp/zookeeper/2183 其餘一致less

主要是修改服務端口clientPort和數據目錄dataDir,其餘參數表徵以下:
tickTime=2000爲zk的基本時間單元,毫秒
initLimit=10Leader-Follower初始通訊時限(tickTime*10)
syncLimit=5Leader-Follower同步通訊時限(tickTime*5)
server.實例集羣標識=實例地址:數據通訊端口:選舉通訊端口this

爲實例添加集羣標識url

echo 1 >> /tmp/zookeeper/2181/myid
echo 2 >> /tmp/zookeeper/2182/myid
echo 3 >> /tmp/zookeeper/2183/myid

啓動集羣服務日誌

bin/zookeeper-server-start.sh config/zookeeper.2181.properties
bin/zookeeper-server-start.sh config/zookeeper.2182.properties
bin/zookeeper-server-start.sh config/zookeeper.2183.properties

搭建Kafka集羣

Kafka集羣節點>=2時即可對外提供高可用服務code

cp config/server.properties config/server.9092.properties
cp config/server.properties config/server.9093.properties

修改節點標識、服務端口、數據目錄和zk集羣節點列表
vi config/server.9092.propertiesorm

broker.id=1
...
listeners=PLAINTEXT://:9092
...
log.dirs=/tmp/kafka-logs/1
...
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183

vi config/server.9093.propertiesserver

broker.id=2
...
listeners=PLAINTEXT://:9093
...
log.dirs=/tmp/kafka-logs/2
...
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183

啓動集羣

bin/kafka-server-start.sh config/server.9092.properties
bin/kafka-server-start.sh config/server.9093.properties

Topic管理

建立topic

bin/kafka-topics.sh --create \
--zookeeper localhost:2181,localhost:2182,localhost:2183 \
--replication-factor 2 \
--partition 4 \
--topic topic_1

--replication-factor 2:副本集數量,不能大於 broker 節點數量,多了也沒用,1個節點放>=2個副本掛了都完蛋。
--partition 4:分區數

查看topic列表

bin/kafka-topics.sh \
--zookeeper localhost:2181,localhost:2182,localhost:2183 --list
topic_1
topic_2

查看Topic詳情

能夠描述Topic分區數/副本數/副本Leader/副本ISR等信息:

bin/kafka-topics.sh \
--zookeeper localhost:2181,localhost:2182,localhost:2183 \
--describe --topic topic_1
Topic:topic_1    PartitionCount:4    ReplicationFactor:2    Configs:
    Topic: topic_1    Partition: 0    Leader: 2    Replicas: 2,1    Isr: 2,1
    Topic: topic_1    Partition: 1    Leader: 1    Replicas: 1,2    Isr: 1,2
    Topic: topic_1    Partition: 2    Leader: 2    Replicas: 2,1    Isr: 2,1
    Topic: topic_1    Partition: 3    Leader: 1    Replicas: 1,2    Isr: 1,2

刪除Topic

注意,只是刪除Topiczk的元數據,日誌數據仍需手動刪除。

bin/kafka-topics.sh \
--zookeeper localhost:2181,localhost:2182,localhost:2183 \
--delete --topic topic_2

#Topic topic_2 is marked for deletion.
#Note: This will have no impact if delete.topic.enable is not set to true.

#再查看topic列表
bin/kafka-topics.sh \
--zookeeper localhost:2181,localhost:2182,localhost:2183 --list

#topic_1
#topic_2 - marked for deletion

生產者

bin/kafka-console-producer.sh \
--broker-list localhost:9092,localhost:9093 \
--topic topic_1
# 進入 cli 輸入消息回車發送
# hello kafka [enter]
# send message [enter]

消費者

新模式,offset存儲在borker
--new-consumer Use new consumer. This is the default.
--bootstrap-server <server to connectto> REQUIRED (unless old consumer is used): The server to connect to.
老消費模式,offset存儲在zk
--zookeeper <urls> REQUIRED (only when using old consumer): The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.

建立消費者

bin/kafka-console-consumer.sh \
--new-consumer \
--bootstrap-server localhost:9092,localhost:9093 \
--from-beginning \
--topic topic_1

能夠嘗試建立多個不一樣消費組的消費者(這裏的sh腳本建立的都是不一樣消費組的),訂閱同一個topic來實現發佈訂閱模式。

查看消費組/消費者

bin/kafka-consumer-groups.sh \
--new-consumer \
--bootstrap-server localhost:9092,localhost:9093 \
--list
#這裏有兩個消費組的消費者
console-consumer-47566
console-consumer-50875

查看消費詳情

能夠查看到消費的訂閱的 topic,負責的 partition,消費進度 offset, 積壓的消息LAG

bin/kafka-consumer-groups.sh \
--new-consumer \
--bootstrap-server localhost:9092,localhost:9093 \
--group console-consumer-47566 \
--describe

GROUP                          TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
console-consumer-47566         topic_1                        0          2               2               0               consumer-1_/127.0.0.1
console-consumer-47566         topic_1                        1          3               3               0               consumer-1_/127.0.0.1
console-consumer-47566         topic_1                        2          2               3               1               consumer-1_/127.0.0.1
console-consumer-47566         topic_1                        3          0               3               3               consumer-1_/127.0.0.1
相關文章
相關標籤/搜索