Kafka是一個開源的分佈式消息引擎/消息中間件,同時Kafka也是一個流處理平臺。Kakfa支持以發佈/訂閱的方式在應用間傳遞消息,同時並基於消息功能添加了Kafka Connect、Kafka Streams以支持鏈接其餘系統的數據(Elasticsearch、Hadoop等)node
Kafka最核心的最成熟的仍是他的消息引擎,因此Kafka大部分應用場景仍是用來做爲消息隊列削峯平谷。另外,Kafka也是目前性能最好的消息中間件。apache
在Kafka集羣(Cluster)中,一個Kafka節點就是一個Broker,消息由Topic來承載,能夠存儲在1個或多個Partition中。發佈消息的應用爲Producer、消費消息的應用爲Consumer,多個Consumer能夠促成Consumer Group共同消費一個Topic中的消息。bootstrap
概念/對象 | 簡單說明 |
---|---|
Broker | Kafka節點 |
Topic | 主題,用來承載消息 |
Partition | 分區,用於主題分片存儲 |
Producer | 生產者,向主題發佈消息的應用 |
Consumer | 消費者,從主題訂閱消息的應用 |
Consumer Group | 消費者組,由多個消費者組成 |
準備3臺CentOS服務器,並配置好靜態IP、主機名bash
服務器名 | IP | 說明 |
---|---|---|
kafka01 | 192.168.88.51 | Kafka節點1 |
kafka02 | 192.168.88.52 | Kafka節點2 |
kafka03 | 192.168.88.53 | Kafka節點3 |
軟件版本說明服務器
項 | 說明 |
---|---|
Linux Server | CentOS 7 |
Kafka | 2.3.0 |
Kakfa集羣須要依賴ZooKeeper存儲Broker、Topic等信息,這裏咱們部署三臺ZK架構
服務器名 | IP | 說明 |
---|---|---|
zk01 | 192.168.88.21 | ZooKeeper節點 |
zk02 | 192.168.88.22 | ZooKeeper節點 |
zk03 | 192.168.88.23 | ZooKeeper節點 |
部署過程參考:https://ken.io/note/zookeeper...tcp
#建立應用目錄 mkdir /usr/kafka #建立Kafka數據目錄 mkdir /kafka mkdir /kafka/logs chmod 777 -R /kafka
Kafka官方下載地址:https://kafka.apache.org/down...
此次我下載的是2.3.0版本分佈式
#建立並進入下載目錄 mkdir /home/downloads cd /home/downloads #下載安裝包 wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz #解壓到應用目錄 tar -zvxf kafka_2.12-2.3.0.tgz -C /usr/kafka
kafka_2.12-2.3.0.tgz 其中2.12是Scala編譯器的版本,2.3.0纔是Kafka的版本
#進入應用目錄 cd /usr/kafka/kafka_2.12-2.3.0/ #修改配置文件 vi config/server.properties
配置日誌目錄、指定ZooKeeper服務器ide
# A comma separated list of directories under which to store log files log.dirs=/kafka/logs # root directory for all kafka znodes. zookeeper.connect=192.168.88.21:2181,192.168.88.22:2181,192.168.88.23:2181
broker.id=0 #listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://192.168.88.51:9092
broker.id=1 #listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://192.168.88.52:9092
broker.id=2 #listeners=PLAINTEXT://:9092 listeners=PLAINTEXT://192.168.88.53:9092
#開放端口 firewall-cmd --add-port=9092/tcp --permanent #從新加載防火牆配置 firewall-cmd --reload
#進入kafka根目錄 cd /usr/kafka/kafka_2.12-2.3.0/ #啓動 /bin/kafka-server-start.sh config/server.properties & #啓動成功輸出示例(最後幾行) [2019-06-26 21:48:57,183] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser) [2019-06-26 21:48:57,183] INFO Kafka startTimeMs: 1561531737175 (org.apache.kafka.common.utils.AppInfoParser) [2019-06-26 21:48:57,185] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
在kafka01(Broker)上建立測試Tpoic:test-ken-io,這裏咱們指定了3個副本、1個分區oop
bin/kafka-topics.sh --create --bootstrap-server 192.168.88.51:9092 --replication-factor 3 --partitions 1 --topic test-ken-io
Topic在kafka01上建立後也會同步到集羣中另外兩個Broker:kafka0二、kafka03
咱們能夠經過命令列出指定Broker的
bin/kafka-topics.sh --list --bootstrap-server 192.168.88.52:9092
這裏咱們向Broker(id=0)的Topic=test-ken-io發送消息
bin/kafka-console-producer.sh --broker-list 192.168.88.51:9092 --topic test-ken-io #消息內容 > test by ken.io
在Kafka02上消費Broker03的消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning
在Kafka03上消費Broker02的消息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning
而後均能收到消息
test by ken.io
這是由於這兩個消費消息的命令是創建了兩個不一樣的Consumer
若是咱們啓動Consumer指定Consumer Group Id就能夠做爲一個消費組協同工,1個消息同時只會被一個Consumer消費到
bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning --group testgroup_ken bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning --group testgroup_ken
Kafka經常使用Broker配置說明:
配置項 | 默認值/示例值 | 說明 |
---|---|---|
broker.id | 0 | Broker惟一標識 |
listeners | PLAINTEXT://192.168.88.53:9092 | 監聽信息,PLAINTEXT表示明文傳輸 |
log.dirs | kafka/logs | kafka數據存放地址,能夠填寫多個。用","間隔 |
message.max.bytes | message.max.bytes | 單個消息長度限制,單位是字節 |
num.partitions | 1 | 默認分區數 |
log.flush.interval.messages | Long.MaxValue | 在數據被寫入到硬盤和消費者可用前最大累積的消息的數量 |
log.flush.interval.ms | Long.MaxValue | 在數據被寫入到硬盤前的最大時間 |
log.flush.scheduler.interval.ms | Long.MaxValue | 檢查數據是否要寫入到硬盤的時間間隔。 |
log.retention.hours | 24 | 控制一個log保留時間,單位:小時 |
zookeeper.connect | 192.168.88.21:2181 | ZooKeeper服務器地址,多臺用","間隔 |
本文首發於個人獨立博客:https://ken.io/note/kafka-cluster-deploy-guide