本次安裝的軟件所有在 /home/javateam
目錄下。java
hosts
,使用 vim /etc/hosts
命令添加如下內容:192.168.30.78 node-78 192.168.30.79 node-79 192.168.30.80 node-80
tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz
修改文件夾名稱:node
mv apache-zookeeper-3.6.1-bin.tar.gz zookeeper
/etc/profile
配置文件添加如下內容,並執行source /etc/profile
命令使配置生效:export ZOOKEEPER_HOME=/home/javateam/zookeeper export PATH=$PATH:$ZOOKEEPER_HOME/bin
dataDir
的目錄下建立一個 myid
文件,並寫入一個數值,好比0。myid
文件裏存放的是服務器的編號。$ZOOKEEPER_HOME/conf
目錄,複製一份 zoo_sample.cfg
並將名稱修改成 zoo.cfg
:# zookeeper服務器心跳時間,單位爲ms tickTime=2000 # 投票選舉新leader的初始化時間 initLimit=10 # leader與follower心跳檢測最大容忍時間,響應超過 syncLimit * tickTime,leader認爲follower死掉,從服務器列表刪除follower syncLimit=5 # 數據目錄 dataDir=/home/javateam/zookeeper/data/ # 日誌目錄 dataLogDir=/home/javateam/zookeeper/logs/ # 對外服務的端口 clientPort=2181 # 集羣ip配置 server.78=node-78:2888:3888 server.79=node-79:2888:3888 server.80=node-80:2888:3888
注意: 上面配置文件中的數據目錄和日誌目錄需自行去建立對應的文件夾。這裏server後的數字,與myid文件中的id是一致的。shell
2181:對cline端提供服務 3888:選舉leader使用 2888:集羣內機器通信使用(Leader監聽此端口)
記得使用如下命令開啓防火牆端口,並重啓防火牆:apache
firewall-cmd --zone=public --add-port=2181/tcp --permanent firewall-cmd --zone=public --add-port=3888/tcp --permanent firewall-cmd --zone=public --add-port=2888/tcp --permanent firewall-cmd --reload
zkServer.sh start
分別啓動三臺機器上的zookeeper,啓動後用 zkServer.sh status
查看狀態,以下圖因此有一個leader兩個follower即表明成功:tar -zxvf kafka_2.13-2.4.1.tgz
mv kafka_2.13-2.4.1.tgz kafka
/etc/profile
配置文件添加如下內容,並執行source /etc/profile
命令使配置生效:export KAFKA_HOME=/home/javateam/kafka export PATH=$PATH:$KAFKA_HOME/bin
kafka/bin/kafka-server-start.sh
,添加如下內容:# 調整堆大小,默認1G過小了 export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G" # 選用G1垃圾收集器 export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true" # 指定JMX暴露端口 export JMX_PORT="8999"
添加後,文件內容以下圖所示:vim
vim /etc/security/limits.conf
添加如下內容:* soft nofile 100000 * hard nofile 100000 * soft nproc 65535 * hard nproc 65535
$KAFKA_HOME/conf/server.properties
,以下:############################# Server Basics ############################# # 每個broker在集羣中的惟一標示,要求是正數。在改變IP地址,不改變broker.id的話不會影響consumers broker.id=78 ############################# Socket Server Settings ############################# # 提供給客戶端響應的地址和端口 listeners=PLAINTEXT://node-78:9092 # broker 處理消息的最大線程數 num.network.threads=3 # broker處理磁盤IO的線程數 ,數值應該大於你的硬盤數 num.io.threads=8 # socket的發送緩衝區大小 socket.send.buffer.bytes=102400 # socket的接收緩衝區,socket的調優參數SO_SNDBUFF socket.receive.buffer.bytes=102400 # socket請求的最大數值,防止serverOOM,message.max.bytes必然要小於socket.request.max.bytes,會被topic建立時的指定參數覆蓋 socket.request.max.bytes=104857600 ############################# Log Basics ############################# # kafka數據的存放地址,多個地址的話用逗號分割 log.dirs=/home/javateam/kafka/logs # 每一個topic的分區個數,如果在topic建立時候沒有指定的話會被topic建立時的指定參數覆蓋 num.partitions=3 # 每一個分區的副本數 replication.factor=2 # 咱們知道segment文件默認會被保留7天的時間,超時的話就會被清理,那麼清理這件事情就須要有一些線程來作。這裏就是用來設置恢復和清理data下數據的線程數量 num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3. offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # 控制一條消息數據被保存多長時間,默認是7天 log.retention.hours=168 # 指定Broker爲消息保存的總磁盤容量大小,-1表明不限制 log.retention.bytes=-1 # Broker能處理的最大消息大小,默認976KB(1000012),此處改成100MB message.max.bytes=104857600 # 日誌文件中每一個segment的大小,默認爲1G log.segment.bytes=1073741824 #上面的參數設置了每個segment文件的大小是1G,那麼就須要有一個東西去按期檢查segment文件有沒有達到1G,多長時間去檢查一次,就須要設置一個週期性檢查文件大小的時間(單位是毫秒)。 log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # 消費者集羣經過鏈接Zookeeper來找到broker。zookeeper鏈接服務器地址 zookeeper.connect=node-78:2181,node-79:2181,node-80:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 ############################# Group Coordinator Settings ############################# # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. # The default value for this is 3 seconds. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=0 ############################# Broker Settings ############################# # 不讓落後太多的副本競選Leader unclean.leader.election.enable=false # 關閉kafka按期對一些topic分區進行Leader重選舉 auto.leader.rebalance.enable=false
vim startup.sh
內容以下所示:# 進程守護模式啓動kafka kafka-server-start.sh -daemon /home/javateam/kafka/config/server.properties
vim shutdown.sh
內容以下所示:# 中止kafka服務 kafka-server-stop.sh
sh /home/javateam/kafka/startup.sh
注意:後面的路徑換成你本身腳本所在的路徑。服務器
ids
信息:zkCli.sh -server 127.0.0.1:2181 ls /brokers/ids
以下圖所示,表明集羣搭建成功:app
unzip kafka-manager-2.0.0.2.zip
mv kafka-manager-2.0.0.2.zip kafka-manager
kafka-manager/conf/application.conf
,把裏面的 kafka-manager.zkhosts
換成你本身的zookeeper 集羣地址就行了,例如:kafka-manager.zkhosts="node-78:2181,node-79:2181,node-80:2181"
vim startup.sh
內容以下:nohup /home/javateam/kafka-manager/bin/kafka-manager -Dhttp.port=9000 > /home/javateam/kafka-manager/nohup.out 2>&1 &
sh /home/javateam/kafka-manager/startup.sh
啓動 kafka-manager,而後訪問9000端口,以下圖所示表明成功:不知道怎麼使用的話就去 google,這裏再也不贅述。less