CentOS 7 Zookeeper 和 Kafka 集羣搭建

環境

  • CentOS 7.4
  • Zookeeper-3.6.1
  • Kafka_2.13-2.4.1
  • Kafka-manager-2.0.0.2

本次安裝的軟件所有在 /home/javateam 目錄下。java

Zookeeper 集羣搭建

  1. 添加三臺機器的 hosts,使用 vim /etc/hosts 命令添加如下內容:
192.168.30.78 node-78
192.168.30.79 node-79
192.168.30.80 node-80
  1. 首先解壓縮:
tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz

修改文件夾名稱:node

mv apache-zookeeper-3.6.1-bin.tar.gz zookeeper
  1. /etc/profile 配置文件添加如下內容,並執行source /etc/profile命令使配置生效:
export ZOOKEEPER_HOME=/home/javateam/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin
  1. 在上面配置文件中 dataDir 的目錄下建立一個 myid 文件,並寫入一個數值,好比0。myid 文件裏存放的是服務器的編號。
  2. 修改zookeeper配置文件。首先進入 $ZOOKEEPER_HOME/conf 目錄,複製一份 zoo_sample.cfg 並將名稱修改成 zoo.cfg:
# zookeeper服務器心跳時間,單位爲ms
tickTime=2000
# 投票選舉新leader的初始化時間
initLimit=10
# leader與follower心跳檢測最大容忍時間,響應超過 syncLimit * tickTime,leader認爲follower死掉,從服務器列表刪除follower
syncLimit=5
# 數據目錄
dataDir=/home/javateam/zookeeper/data/
# 日誌目錄
dataLogDir=/home/javateam/zookeeper/logs/
# 對外服務的端口
clientPort=2181
# 集羣ip配置
server.78=node-78:2888:3888
server.79=node-79:2888:3888
server.80=node-80:2888:3888

注意: 上面配置文件中的數據目錄和日誌目錄需自行去建立對應的文件夾。這裏server後的數字,與myid文件中的id是一致的。shell

  1. zookeeper啓動會佔用三個端口,分別的做用是:
2181:對cline端提供服務
3888:選舉leader使用
2888:集羣內機器通信使用(Leader監聽此端口)

記得使用如下命令開啓防火牆端口,並重啓防火牆:apache

firewall-cmd --zone=public --add-port=2181/tcp --permanent
firewall-cmd --zone=public --add-port=3888/tcp --permanent
firewall-cmd --zone=public --add-port=2888/tcp --permanent
firewall-cmd --reload
  1. 而後用 zkServer.sh start 分別啓動三臺機器上的zookeeper,啓動後用 zkServer.sh status 查看狀態,以下圖因此有一個leader兩個follower即表明成功:

WeChatfc81462212a55ba049b1afa06c9fabef.png

WeChat4d789c67638c5fc635fb2cc149d218c2.png

WeChatbfe38654fcccfcb45aba87c0fd2a4d58.png

Kafka 集羣搭建

  1. 首先解壓縮:
tar -zxvf kafka_2.13-2.4.1.tgz
  1. 改文件夾名稱:
mv kafka_2.13-2.4.1.tgz kafka
  1. /etc/profile 配置文件添加如下內容,並執行source /etc/profile命令使配置生效:
export KAFKA_HOME=/home/javateam/kafka
export PATH=$PATH:$KAFKA_HOME/bin
  1. JVM級別參數調優,修改 kafka/bin/kafka-server-start.sh,添加如下內容:
# 調整堆大小,默認1G過小了
export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"
# 選用G1垃圾收集器
export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
# 指定JMX暴露端口
export JMX_PORT="8999"

添加後,文件內容以下圖所示:vim

WeChat9d9de8b4d35d8483d8920db2bd98f524.png

  1. 操做系統級別參數調優,增長文件描述符的限制,使用 vim /etc/security/limits.conf 添加如下內容:
*  soft  nofile  100000
*  hard  nofile  100000
*  soft  nproc   65535
*  hard  nproc   65535
  1. 修改kafka的配置文件 $KAFKA_HOME/conf/server.properties,以下:
############################# Server Basics #############################

# 每個broker在集羣中的惟一標示,要求是正數。在改變IP地址,不改變broker.id的話不會影響consumers
broker.id=78

############################# Socket Server Settings #############################

# 提供給客戶端響應的地址和端口
listeners=PLAINTEXT://node-78:9092

# broker 處理消息的最大線程數
num.network.threads=3

# broker處理磁盤IO的線程數 ,數值應該大於你的硬盤數
num.io.threads=8

# socket的發送緩衝區大小
socket.send.buffer.bytes=102400

# socket的接收緩衝區,socket的調優參數SO_SNDBUFF
socket.receive.buffer.bytes=102400

# socket請求的最大數值,防止serverOOM,message.max.bytes必然要小於socket.request.max.bytes,會被topic建立時的指定參數覆蓋
socket.request.max.bytes=104857600


############################# Log Basics #############################

# kafka數據的存放地址,多個地址的話用逗號分割
log.dirs=/home/javateam/kafka/logs

# 每一個topic的分區個數,如果在topic建立時候沒有指定的話會被topic建立時的指定參數覆蓋
num.partitions=3

# 每一個分區的副本數
replication.factor=2

# 咱們知道segment文件默認會被保留7天的時間,超時的話就會被清理,那麼清理這件事情就須要有一些線程來作。這裏就是用來設置恢復和清理data下數據的線程數量
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# 控制一條消息數據被保存多長時間,默認是7天
log.retention.hours=168

# 指定Broker爲消息保存的總磁盤容量大小,-1表明不限制
log.retention.bytes=-1

# Broker能處理的最大消息大小,默認976KB(1000012),此處改成100MB
message.max.bytes=104857600

# 日誌文件中每一個segment的大小,默認爲1G
log.segment.bytes=1073741824

#上面的參數設置了每個segment文件的大小是1G,那麼就須要有一個東西去按期檢查segment文件有沒有達到1G,多長時間去檢查一次,就須要設置一個週期性檢查文件大小的時間(單位是毫秒)。
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# 消費者集羣經過鏈接Zookeeper來找到broker。zookeeper鏈接服務器地址
zookeeper.connect=node-78:2181,node-79:2181,node-80:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0


############################# Broker Settings #############################

# 不讓落後太多的副本競選Leader
unclean.leader.election.enable=false

# 關閉kafka按期對一些topic分區進行Leader重選舉
auto.leader.rebalance.enable=false
  1. 編寫kafka啓動腳本,vim startup.sh 內容以下所示:
# 進程守護模式啓動kafka
kafka-server-start.sh -daemon /home/javateam/kafka/config/server.properties
  1. 編寫kafka中止腳本,vim shutdown.sh 內容以下所示:
# 中止kafka服務
kafka-server-stop.sh
  1. 用以下命令,分別啓動kafka服務:
sh /home/javateam/kafka/startup.sh

注意:後面的路徑換成你本身腳本所在的路徑。服務器

  1. 啓動成功後,鏈接zookeeper查看節點 ids 信息:
zkCli.sh -server 127.0.0.1:2181
ls /brokers/ids

以下圖所示,表明集羣搭建成功:app

WeChatc5943d73fa0fd92c1a66962147af7afd.png

Kafka-manager 搭建

  1. 首先解壓縮:
unzip kafka-manager-2.0.0.2.zip
  1. 改文件夾名稱
mv kafka-manager-2.0.0.2.zip kafka-manager
  1. 修改配置文件 kafka-manager/conf/application.conf,把裏面的 kafka-manager.zkhosts 換成你本身的zookeeper 集羣地址就行了,例如:kafka-manager.zkhosts="node-78:2181,node-79:2181,node-80:2181"
  2. 編寫 kafka-manager 啓動腳本,vim startup.sh 內容以下:
nohup /home/javateam/kafka-manager/bin/kafka-manager -Dhttp.port=9000 > /home/javateam/kafka-manager/nohup.out 2>&1 &
  1. 使用 sh /home/javateam/kafka-manager/startup.sh 啓動 kafka-manager,而後訪問9000端口,以下圖所示表明成功:

WeChatcf5d42bbb563a7b23864ed6755ce9c0d.png

不知道怎麼使用的話就去 google,這裏再也不贅述。less

相關文章
相關標籤/搜索