Kafka集羣部署指南

1、前言

一、Kafka簡介

Kafka是一個開源的分佈式消息引擎/消息中間件,同時Kafka也是一個流處理平臺。Kakfa支持以發佈/訂閱的方式在應用間傳遞消息,同時並基於消息功能添加了Kafka Connect、Kafka Streams以支持鏈接其餘系統的數據(ElasticsearchHadoop等)node

Kafka最核心的最成熟的仍是他的消息引擎,因此Kafka大部分應用場景仍是用來做爲消息隊列削峯平谷。另外,Kafka也是目前性能最好的消息中間件。apache

二、Kafka架構

Kafka架構圖

在Kafka集羣(Cluster)中,一個Kafka節點就是一個Broker,消息由Topic來承載,能夠存儲在1個或多個Partition中。發佈消息的應用爲Producer、消費消息的應用爲Consumer,多個Consumer能夠促成Consumer Group共同消費一個Topic中的消息。bootstrap

概念/對象 簡單說明
Broker Kafka節點
Topic 主題,用來承載消息
Partition 分區,用於主題分片存儲
Producer 生產者,向主題發佈消息的應用
Consumer 消費者,從主題訂閱消息的應用
Consumer Group 消費者組,由多個消費者組成

三、準備工做

一、Kafka服務器

準備3臺CentOS服務器,並配置好靜態IP、主機名bash

服務器名 IP 說明
kafka01 192.168.88.51 Kafka節點1
kafka02 192.168.88.52 Kafka節點2
kafka03 192.168.88.53 Kafka節點3

軟件版本說明服務器

說明
Linux Server CentOS 7
Kafka 2.3.0

二、ZooKeeper集羣

Kakfa集羣須要依賴ZooKeeper存儲Broker、Topic等信息,這裏咱們部署三臺ZK架構

服務器名 IP 說明
zk01 192.168.88.21 ZooKeeper節點
zk02 192.168.88.22 ZooKeeper節點
zk03 192.168.88.23 ZooKeeper節點

部署過程參考:https://ken.io/note/zookeeper...tcp

2、部署過程

一、應用&數據目錄

#建立應用目錄
mkdir /usr/kafka

#建立Kafka數據目錄
mkdir /kafka
mkdir /kafka/logs
chmod 777 -R /kafka

二、下載&解壓

Kafka官方下載地址:https://kafka.apache.org/down...
此次我下載的是2.3.0版本分佈式

#建立並進入下載目錄
mkdir /home/downloads
cd /home/downloads

#下載安裝包
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz 

#解壓到應用目錄
tar -zvxf kafka_2.12-2.3.0.tgz -C /usr/kafka
kafka_2.12-2.3.0.tgz 其中2.12是Scala編譯器的版本,2.3.0纔是Kafka的版本

三、Kafka節點配置

#進入應用目錄
cd /usr/kafka/kafka_2.12-2.3.0/

#修改配置文件
vi config/server.properties

通用配置

配置日誌目錄、指定ZooKeeper服務器ide

# A comma separated list of directories under which to store log files
log.dirs=/kafka/logs

# root directory for all kafka znodes.
zookeeper.connect=192.168.88.21:2181,192.168.88.22:2181,192.168.88.23:2181

分節點配置

  • Kafka01
broker.id=0

#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.88.51:9092
  • Kafka02
broker.id=1

#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.88.52:9092
  • Kafka03
broker.id=2

#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.88.53:9092

四、防火牆配置

#開放端口
firewall-cmd --add-port=9092/tcp --permanent

#從新加載防火牆配置
firewall-cmd --reload

五、啓動Kafka

#進入kafka根目錄
cd /usr/kafka/kafka_2.12-2.3.0/
#啓動
/bin/kafka-server-start.sh config/server.properties &

#啓動成功輸出示例(最後幾行)
[2019-06-26 21:48:57,183] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2019-06-26 21:48:57,183] INFO Kafka startTimeMs: 1561531737175 (org.apache.kafka.common.utils.AppInfoParser)
[2019-06-26 21:48:57,185] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

3、Kafka測試

一、建立Topic

在kafka01(Broker)上建立測試Tpoic:test-ken-io,這裏咱們指定了3個副本、1個分區oop

bin/kafka-topics.sh --create --bootstrap-server 192.168.88.51:9092 --replication-factor 3 --partitions 1 --topic test-ken-io

Topic在kafka01上建立後也會同步到集羣中另外兩個Broker:kafka0二、kafka03

二、查看Topic

咱們能夠經過命令列出指定Broker的

bin/kafka-topics.sh --list --bootstrap-server 192.168.88.52:9092

三、發送消息

這裏咱們向Broker(id=0)的Topic=test-ken-io發送消息

bin/kafka-console-producer.sh --broker-list  192.168.88.51:9092  --topic test-ken-io

#消息內容
> test by ken.io

四、消費消息

在Kafka02上消費Broker03的消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning

在Kafka03上消費Broker02的消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning

而後均能收到消息

test by ken.io

這是由於這兩個消費消息的命令是創建了兩個不一樣的Consumer
若是咱們啓動Consumer指定Consumer Group Id就能夠做爲一個消費組協同工,1個消息同時只會被一個Consumer消費到

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning --group testgroup_ken

bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning --group testgroup_ken

4、備註

一、Kafka經常使用配置項說明

Kafka經常使用Broker配置說明:

配置項 默認值/示例值 說明
broker.id 0 Broker惟一標識
listeners PLAINTEXT://192.168.88.53:9092 監聽信息,PLAINTEXT表示明文傳輸
log.dirs kafka/logs kafka數據存放地址,能夠填寫多個。用","間隔
message.max.bytes message.max.bytes 單個消息長度限制,單位是字節
num.partitions 1 默認分區數
log.flush.interval.messages Long.MaxValue 在數據被寫入到硬盤和消費者可用前最大累積的消息的數量
log.flush.interval.ms Long.MaxValue 在數據被寫入到硬盤前的最大時間
log.flush.scheduler.interval.ms Long.MaxValue 檢查數據是否要寫入到硬盤的時間間隔。
log.retention.hours 24 控制一個log保留時間,單位:小時
zookeeper.connect 192.168.88.21:2181 ZooKeeper服務器地址,多臺用","間隔

二、附錄


本文首發於個人獨立博客:https://ken.io/note/kafka-cluster-deploy-guide

相關文章
相關標籤/搜索