Zookeeper+Kafka徹底分佈式實戰部署

                     Zookeeper+Kafka徹底分佈式實戰部署html

                                           做者:尹正傑java

版權聲明:原創做品,謝絕轉載!不然將追究法律責任。shell

 

 

 

  其實我以前部署過kafak和zookeeper的徹底分佈式,集羣是能夠正常使用沒錯,可是在調優方案我作的不多,本次部署模擬我實際生成環境中的kafka版本zookeeper的一些調優措施,以及一些腳本管理等。部署集羣須要你自行安裝jdk,本篇博客就直接上乾貨了。express

  關於本篇博客的測試版本視頻:連接:https://pan.baidu.com/s/1S3UqwTH05RKQOuQ9bwOFMg 提取碼:jsv3 apache

  關於kafka操做系統的優化,可參考:https://www.cnblogs.com/yinzhengjie/p/9993719.html緩存

 

  

 

一.集羣的調優方向安全

  1>.調大zookeeper的heap內存,默認是1G,能夠根據服務器大小配置其堆內存爲2G或者4G足矣(kafka實時傳輸的數據若是達到PB級別的話,得觀察一下YGC和FGC的值能夠適當再次調大);bash

  2>.修改kafka的副本數,默認的副本數是1,建議修改成2,若是副本數爲2,那麼容災能力就是1,若是副本數3,則容災能力就是2,固然副本數越多,可能會致使集羣的性能降低,可是可靠性更強,各有利弊,我這裏推薦副本數爲2;服務器

  3>.kafka推薦分區數,默認的分區數是1,理論上來講,parition的數量小於core的數量的話,值越大,kafka的吞吐量就越高,可是你必須得考慮你的磁盤IO的瓶頸,所以我不推薦你將分區數這隻過大,我建議這個值大於broker的數量,好比個人集羣broker的只有5臺,個人集羣的partition數量是20;網絡

  4>.kafka的heap內存,默認也是1G,生成環境中建議將它調大,不知道你們有沒有發現,你broker的heap內存無論有多的,它都能給你吃滿!我在生成環境中給kafka的heap內存是6G(kafka主要使用堆外內存,即大量使用操做系統的頁緩存,所以其並不須要分配太多的堆內存空間),zookeeper給的是2G,剩下的所有給操做系統預留着,不然你的機器會很是的卡頓;

  5>.各類kafka配置文件調優,我這裏就不一一贅述了,我在本文中已經有詳細的介紹。

 

二.部署zookeeper集羣

1>.下載截止20181110日最新穩定版本版本的zookeeper-3.4.13.tar.gz(2018-07-16 )版本

[root@yinzhengjie zookeeper]# wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
--2018-11-10 00:45:07--  https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
Resolving mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)... 101.6.8.193, 2402:f000:1:408:8100::1
Connecting to mirrors.tuna.tsinghua.edu.cn (mirrors.tuna.tsinghua.edu.cn)|101.6.8.193|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 37191810 (35M) [application/x-gzip]
Saving to: ‘zookeeper-3.4.13.tar.gz’

100%[====================================================================================================================================================================================================================================>] 37,191,810  81.5MB/s   in 0.4s   

2018-11-10 00:45:08 (81.5 MB/s) - ‘zookeeper-3.4.13.tar.gz’ saved [37191810/37191810]

[root@yinzhengjie zookeeper]# 
[root@yinzhengjie zookeeper]# ll
total 36324
-rw-r--r-- 1 root root 37191810 Jul 16 11:40 zookeeper-3.4.13.tar.gz
[root@yinzhengjie zookeeper]# 
[root@yinzhengjie zookeeper]# wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz

2>.解壓zookeeper

[root@yinzhengjie zookeeper]# tar -zxf zookeeper-3.4.13.tar.gz -C /soft/

3>.建立軟鏈接

[root@yinzhengjie ~]# ln -s /soft/zookeeper-3.4.13/ /soft/zk

4>.建立配置zookeeper的堆內存配置文件

[root@yinzhengjie ~]# cat /soft/zk/conf/java.env 
#!/bin/bash
#@author :yinzhengjie
#blog:http://www.cnblogs.com/yinzhengjie
#EMAIL:y1053419035@qq.com

#指定JDK的安裝路徑
export JAVA_HOME=/soft/jdk

#指定zookeeper的heap內存大小
export JVMFLAGS="-Xms2048m -Xmx2048m $JVMFLAGS"
[root@yinzhengjie ~]# 

5>.修改zookeeper的配置文件zoo.cfg(須要手動建立,或者從「 /soft/zk/conf/zoo_sample.cfg 」賦值一個模板便可)

[root@yinzhengjie ~]# cat /soft/zk/conf/zoo.cfg 
# 滴答,計時的基本單位,默認是2000毫秒,即2秒。它是zookeeper最小的時間單位,用於丈量心跳時間和超時時間等,一般設置成默認2秒便可。
tickTime=2000

# 初始化限制是10滴答,默認是10個滴答,即默認是20秒。指定follower節點初始化是連接leader節點的最大tick次數。
initLimit=5

# 數據同步的時間限制,默認是5個滴答,即默認時間是10秒。設定了follower節點與leader節點進行同步的最大時間。與initLimit相似,它也是以tickTime爲單位進行指定的。
syncLimit=2

# 指定zookeeper的工做目錄,這是一個很是重要的參數,zookeeper會在內存中在內存只能中保存系統快照,並按期寫入該路徑指定的文件夾中。生產環境中須要注意該文件夾的磁盤佔用狀況。
dataDir=/home/yinzhengjie/zookeeper

# 監聽zookeeper的默認端口。zookeeper監聽客戶端連接的端口,通常設置成默認2181便可。
clientPort=2181

# 這個操做將限制鏈接到 ZooKeeper 的客戶端的數量,限制併發鏈接的數量,它經過 IP 來區分不一樣的客戶端。此配置選項能夠用來阻止某些類別的 Dos 攻擊。將它設置爲 0 或者忽略而不進行設置將會取消對併發鏈接的限制。
#maxClientCnxns=60
 
# 在上文中已經提到,3.4.0及以後版本,ZK提供了自動清理事務日誌和快照文件的功能,這個參數指定了清理頻率,單位是小時,須要配置一個1或更大的整數,默認是0,表示不開啓自動清理功能。
#autopurge.purgeInterval=1

# 這個參數和上面的參數搭配使用,這個參數指定了須要保留的文件數目。默認是保留3個。
#autopurge.snapRetainCount=3

#server.x=[hostname]:nnnnn[:nnnnn],這裏的x是一個數字,與myid文件中的id是一致的。右邊能夠配置兩個端口,第一個端口用於F和L之間的數據同步和其它通訊,第二個端口用於Leader選舉過程當中投票通訊。  
server.117=10.1.3.117:2888:3888
server.118=10.1.3.118:2888:3888
server.119=10.1.3.119:2888:3888
[root@yinzhengjie ~]# 

6>.編寫zookeeper的啓動腳本

[root@yinzhengjie ~]# cat /usr/local/bin/xzk.sh 
#!/bin/bash
#@author :yinzhengjie
#blog:http://www.cnblogs.com/yinzhengjie
#EMAIL:y1053419035@qq.com

#判斷用戶是否傳參
if [ $# -ne 1 ];then
    echo "無效參數,用法爲: $0  {start|stop|restart|status}"
    exit
fi

#獲取用戶輸入的命令
cmd=$1

#定義函數功能
function zookeeperManger(){
    case $cmd in
    start)
        echo "啓動服務"        
        remoteExecution start
        ;;
    stop)
        echo "中止服務"
        remoteExecution stop
        ;;
    restart)
        echo "重啓服務"
        remoteExecution restart
        ;;
    status)
        echo "查看狀態"
        remoteExecution status
        ;;
    *)
        echo "無效參數,用法爲: $0  {start|stop|restart|status}"
        ;;
    esac
}


#定義執行的命令
function remoteExecution(){
    for (( i=117 ; i<=119 ; i++ )) ; do
            tput setaf 2
            echo ========== kafka${i}.aggrx zkServer.sh  $1 ================
            tput setaf 9
            ssh kafka${i}.aggrx  "source /etc/profile ; zkServer.sh $1"
    done
}

#調用函數
zookeeperManger
[root@yinzhengjie ~]#
[root@yinzhengjie ~]# chmod +x /usr/local/bin/xzk.sh 
[root@yinzhengjie ~]# 
[root@yinzhengjie ~]# ll /usr/local/bin/xzk.sh 
-rwxr-xr-x 1 root root 1101 Nov  7 10:53 /usr/local/bin/xzk.sh
[root@yinzhengjie ~]# 

8>.同步系統配置文件

[root@yinzhengjie ~]# cat /usr/local/bin/xrsync.sh 
#!/bin/bash
#@author :yinzhengjie
#blog:http://www.cnblogs.com/yinzhengjie
#EMAIL:y1053419035@qq.com

#判斷用戶是否傳參
if [ $# -lt 1 ];then
    echo "請輸入參數";
    exit
fi


#獲取文件路徑
file=$@

#獲取子路徑
filename=`basename $file`

#獲取父路徑
dirpath=`dirname $file`

#獲取完整路徑
cd $dirpath
fullpath=`pwd -P`


#同步文件到Kafka集羣
for (( i=116;i<=120;i++ ))
do
    #使終端變綠色
    tput setaf 2
    echo =========== kafka${i}.aggrx : $file ===========
    #使終端變回原來的顏色,即白灰色
    tput setaf 7
    #遠程執行命令
    rsync -lr $filename `whoami`@kafka${i}.aggrx:$fullpath
    #判斷命令是否執行成功
    if [ $? == 0 ];then
        echo "命令執行成功"
    fi
done
[root@yinzhengjie ~]# 
[root@yinzhengjie ~]# 
[root@yinzhengjie ~]# ll /usr/local/bin/xrsync.sh 
-rwxr-xr-x 1 root root 771 Oct 13 20:12 /usr/local/bin/xrsync.sh
[root@yinzhengjie ~]# 
[root@yinzhengjie ~]# cat /usr/local/bin/xrsync.sh                          #用於分發配置文件,須要你手動配置SSH無祕鑰登陸
[root@yinzhengjie ~]# tail -3 /etc/profile
#ADD Zookeeper PATH BY yinzhengjie
ZOOKEEPER=/soft/zk
PATH=$PATH:$ZOOKEEPER/bin
[root@yinzhengjie ~]# 
[root@yinzhengjie ~]# xrsync.sh /etc/profile
=========== yinzhengjie.aggrx : /etc/profile ===========
命令執行成功
=========== kafka117.aggrx : /etc/profile ===========
命令執行成功
=========== kafka118.aggrx : /etc/profile ===========
命令執行成功
=========== kafka119.aggrx : /etc/profile ===========
命令執行成功
=========== kafka120.aggrx : /etc/profile ===========
命令執行成功
[root@yinzhengjie ~]# 

9>.將上述解壓的配置文件使用xrsync.sh同步到其它節點

  注意,接下來須要在/home/yinzhengjie/zookeeper/」目錄中建立一個myid,並寫入配置文件。也可使用一個shell循環搞定,僅供參考。

[root@yinzhengjie ~]#  for (( i=116;i<=119;i++ )) do ssh kafka${i}.aggrx "echo -n $i > /home/yinzhengjie/zookeeper/myid" ;done

10>.啓動zookeeper並查看狀態

[root@yinzhengjie ~]# xzk.sh status        #這是查看zookeeper的狀態,若是是啓動,或中止zookeeper,直接調用start或者stop方法便可
查看狀態
========== kafka117.aggrx zkServer.sh status ================
ZooKeeper JMX enabled by default
Using config: /soft/zk/bin/../conf/zoo.cfg
Mode: follower
========== kafka118.aggrx zkServer.sh status ================
ZooKeeper JMX enabled by default
Using config: /soft/zk/bin/../conf/zoo.cfg
Mode: leader        #很顯然,該節點爲zookeeper節點。
========== kafka119.aggrx zkServer.sh status ================
ZooKeeper JMX enabled by default
Using config: /soft/zk/bin/../conf/zoo.cfg
Mode: follower
[root@yinzhengjie ~]# 

 

三.部署kafka集羣

1>.官網下載kafka(kafka_2.11-0.10.2.1.tgz)

[root@yinzhengjie ~]# wget https://archive.apache.org/dist/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz
--2018-11-10 01:21:08--  https://archive.apache.org/dist/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz
Resolving archive.apache.org (archive.apache.org)... 163.172.17.199
Connecting to archive.apache.org (archive.apache.org)|163.172.17.199|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 37664956 (36M) [application/x-gzip]
Saving to: ‘kafka_2.11-0.10.2.1.tgz’

100%[====================================================================================================================================================================================================================================>] 37,664,956   228KB/s   in 2m 58s 

2018-11-10 01:24:07 (207 KB/s) - ‘kafka_2.11-0.10.2.1.tgz’ saved [37664956/37664956]

[root@yinzhengjie ~]# 
[root@yinzhengjie ~]# ll
total 36784
-rw-r--r-- 1 root root 37664956 Apr 27  2017 kafka_2.11-0.10.2.1.tgz
[root@yinzhengjie ~]# 
[root@yinzhengjie ~]# wget https://archive.apache.org/dist/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz

2>.解壓kafka

[root@yinzhengjie ~]# tar -zxf kafka_2.11-0.10.2.1.tgz -C /soft/

3>.建立軟鏈接

[root@yinzhengjie ~]# ln -s /soft/kafka_2.11-0.10.2.1/ /soft/kafka

4>.修改kafka的配置文件(server.properties)

[root@yinzhengjie ~]# cat /soft/kafka/config/server.properties | grep -v ^# | grep -v ^$
broker.id=116
delete.topic.enable=true
auto.create.topics.enable=false
port=9092
host.name=10.1.3.116
num.network.threads=30
num.io.threads=30
socket.send.buffer.bytes=5242880
socket.receive.buffer.bytes=5242880
socket.request.max.bytes=104857600
queued.max.requests=1000
log.dirs=/home/yinzhengjie/kafka/logs,/home/yinzhengjie/kafka/logs2,/home/yinzhengjie/kafka/log3
num.partitions=20
num.recovery.threads.per.data.dir=1
default.replication.factor=2
message.max.bytes=104857600
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=600000
zookeeper.connect=10.1.3.117:2181,10.1.3.118:2181,10.1.3.119:2181
zookeeper.session.timeout.ms=180000
zookeeper.connection.timeout.ms=6000
max.request.size=104857600
fetch.message.max.bytes=104857600
replica.fetch.max.bytes=104857600
replica.fetch.wait.max.ms=2000
unclean.leader.election=false
num.replica.fetchers=5
[root@yinzhengjie ~]# 
[root@yinzhengjie ~]# cat /soft/kafka/config/server.properties 
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

#每個broker在集羣中的惟一表示,要求是正數。當該服務器的IP地址發生改變時,broker.id沒有變化,則不會影響consumers的消息狀況
broker.id=116

#這就是說,這條命令其實並不執行刪除動做,僅僅是在zookeeper上標記該topic要被刪除而已,同時也提醒用戶必定要提早打開delete.topic.enable開關,不然刪除動做是不會執行的。
delete.topic.enable=true

#是否容許自動建立topic,如果false,就須要經過命令建立topic
auto.create.topics.enable=false

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092

#Socket服務器偵聽的地址。若是沒有配置,它將得到從Java.NET.InAddio.GETCANONICALITHAMEMENE()返回的值
#listeners=PLAINTEXT://10.1.3.116:9092

#broker server服務端口
port=9092

#broker的主機地址,如果設置了,那麼會綁定到這個地址上,如果沒有,會綁定到全部的接口上,並將其中之一發送到ZK,通常不設置
host.name=10.1.3.116
# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().

#kafka 0.9.x之後的版本新增了advertised.listeners配置,kafka 0.9.x之後的版本不要使用 advertised.host.name 和 advertised.host.port 已經deprecated.若是配置的話,它使用 "listeners" 的值。不然,它將使用從java.net.InetAddress.getCanonicalHostName()返回的值。
#advertised.listeners=PLAINTEXT://your.host.name:9092


#將偵聽器(listener)名稱映射到安全協議,默認狀況下它們是相同的。有關詳細信息,請參閱配置文檔。
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL


#處理網絡請求的最大線程數
num.network.threads=30

#處理磁盤I/O的線程數
num.io.threads=30


#套接字服務器使用的發送緩衝區(SOYSNDBUF)
socket.send.buffer.bytes=5242880

#套接字服務器使用的接收緩衝區(SOYRCVBUF)
socket.receive.buffer.bytes=5242880

#套接字服務器將接受的請求的最大大小(對OOM的保護)
socket.request.max.bytes=104857600

#I/O線程等待隊列中的最大的請求數,超過這個數量,network線程就不會再接收一個新的請求。應該是一種自我保護機制。
queued.max.requests=1000

############################# Log Basics #############################

#日誌存放目錄,多個目錄使用逗號分割,若是你有多塊磁盤,建議配置成多個目錄,從而達到I/O的效率的提高。
log.dirs=/home/yinzhengjie/kafka/logs,/home/yinzhengjie/kafka/logs2,/home/yinzhengjie/kafka/logs3

#每一個topic的分區個數,如果在topic建立時候沒有指定的話會被topic建立時的指定參數覆蓋
num.partitions=20

#在啓動時恢復日誌和關閉時刷盤日誌時每一個數據目錄的線程的數量,默認1
num.recovery.threads.per.data.dir=1


# 默認副本數
default.replication.factor=2

#服務器接受單個消息的最大大小,即消息體的最大大小,單位是字節
message.max.bytes=104857600

# 自動負載均衡,若是設爲true,複製控制器會週期性的自動嘗試,爲全部的broker的每一個partition平衡leadership,爲更優先(preferred)的replica分配leadership。
# auto.leader.rebalance.enable=false


############################# Log Flush Policy #############################

#在強制fsync一個partition的log文件以前暫存的消息數量。調低這個值會更頻繁的sync數據到磁盤,影響性能。一般建議人家使用replication來確保持久性,而不是依靠單機上的fsync,可是這能夠帶來更多的可靠性,默認10000。
#log.flush.interval.messages=10000

#2次fsync調用之間最大的時間間隔,單位爲ms。即便log.flush.interval.messages沒有達到,只要這個時間到了也須要調用fsync。默認3000ms.
#log.flush.interval.ms=10000

############################# Log Retention Policy #############################


# 日誌保存時間 (hours|minutes),默認爲7天(168小時)。超過這個時間會根據policy處理數據。bytes和minutes不管哪一個先達到都會觸發。
log.retention.hours=168

#日誌數據存儲的最大字節數。超過這個時間會根據policy處理數據。
#log.retention.bytes=1073741824

#控制日誌segment文件的大小,超出該大小則追加到一個新的日誌segment文件中(-1表示沒有限制)
log.segment.bytes=536870912

# 當達到下面時間,會強制新建一個segment
#log.roll.hours = 24*7

# 日誌片斷文件的檢查週期,查看它們是否達到了刪除策略的設置(log.retention.hours或log.retention.bytes)
log.retention.check.interval.ms=600000

#是否開啓壓縮
#log.cleaner.enable=false

#日誌清理策略選擇有:delete和compact主要針對過時數據的處理,或是日誌文件達到限制的額度,會被 topic建立時的指定參數覆蓋
#log.cleanup.policy=delete

# 日誌壓縮運行的線程數
#log.cleaner.threads=2


# 壓縮的日誌保留的最長時間
#log.cleaner.delete.retention.ms=3600000


############################# Zookeeper #############################

#zookeeper集羣的地址,能夠是多個,多個之間用逗號分割.
zookeeper.connect=10.1.3.117:2181,10.1.3.118:2181,10.1.3.119:2181

#ZooKeeper的最大超時時間,就是心跳的間隔,如果沒有反映,那麼認爲已經死了,不易過大
zookeeper.session.timeout.ms=180000

#指定多久消費者更新offset到zookeeper中。注意offset更新時基於time而不是每次得到的消息。一旦在更新zookeeper發生異常並重啓,將可能拿到已拿到過的消息,鏈接zk的超時時間
zookeeper.connection.timeout.ms=6000

#請求的最大大小爲字節,請求的最大字節數。這也是對最大記錄尺寸的有效覆蓋。注意:server具備本身對消息記錄尺寸的覆蓋,這些尺寸和這個設置不一樣。此項設置將會限制producer每次批量發送請求的數目,以防發出巨量的請求。
max.request.size=104857600

#每次fetch請求中,針對每次fetch消息的最大字節數。這些字節將會督導用於每一個partition的內存中,所以,此設置將會控制consumer所使用的memory大小。這個fetch請求尺寸必須至少和server容許的最大消息尺寸相等,不然,producer可能發送的消息尺寸大於consumer所能消耗的尺寸。
fetch.message.max.bytes=104857600

#ZooKeeper集羣中leader和follower之間的同步時間,換句話說:一個ZK follower能落後leader多久。
#zookeeper.sync.time.ms=2000


############################# Replica Basics #############################

# leader接收follower的"fetch請求"的超時時間,默認是10秒。
# replica.lag.time.max.ms=30000

# 若是relicas落後太多,將會認爲此partition relicas已經失效。而通常狀況下,由於網絡延遲等緣由,總會致使replicas中消息同步滯後。若是消息嚴重滯後,leader將認爲此relicas網絡延遲較大或者消息吞吐能力有限。在broker數量較少,或者網絡不足的環境中,建議提升此值.follower落後於leader的最大message數,這個參數是broker全局的。設置太大 了,影響真正「落後」follower的移除;設置的過小了,致使follower的頻繁進出。沒法給定一個合適的replica.lag.max.messages的值,所以不推薦使用,聽說新版本的Kafka移除了這個參數。
#replica.lag.max.messages=4000

# follower與leader之間的socket超時時間
#replica.socket.timeout.ms=30000

# follower每次fetch數據的最大尺寸
replica.fetch.max.bytes=104857600

# follower的fetch請求超時重發時間
replica.fetch.wait.max.ms=2000

# fetch的最小數據尺寸
#replica.fetch.min.bytes=1

# 是否容許控制器關閉broker ,默認值爲true,它會關閉全部在這個broker上的leader,並轉移到其餘broker,建議啓用,增長集羣穩定性。
# controlled.shutdown.enable = false

#0.11.0.0版本開始unclean.leader.election.enable參數的默認值由原來的true改成false,能夠關閉unclean leader election,也就是不在ISR(IN-Sync Replica)列表中的replica,不會被提高爲新的leader partition。kafka集羣的持久化力大於可用性,若是ISR中沒有其它的replica,會致使這個partition不能讀寫。
unclean.leader.election=false

# follower中開啓的fetcher線程數, 同步速度與系統負載均衡
num.replica.fetchers=5

# partition leader與replicas之間通信時,socket的超時時間
#controller.socket.timeout.ms=30000

# partition leader與replicas數據同步時,消息的隊列尺寸.
#controller.message.queue.size=10

#指定將使用哪一個版本的 inter-broker 協議。 在全部經紀人升級到新版本以後,這一般會受到衝擊。升級時要設置
#inter.broker.protocol.version=0.10.1

#指定broker將用於將消息添加到日誌文件的消息格式版本。 該值應該是有效的ApiVersion。 一些例子是:0.8.20.9.0.00.10.0。 經過設置特定的消息格式版本,用戶保證磁盤上的全部現有消息都小於或等於指定的版本。 不正確地設置這個值將致使使用舊版本的用戶出錯,由於他們將接收到他們不理解的格式的消息。
#log.message.format.version=0.10.1

[root@yinzhengjie ~]# 
[root@yinzhengjie ~]# cat /soft/kafka/config/server.properties                                               #上述配置文件詳解

5>.修改kafka的啓動腳本

[root@yinzhengjie ~]# cat /soft/kafka/bin/kafka-server-start.sh 
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

if [ $# -lt 1 ];
then
    echo "USAGE: $0 [-daemon] server.properties [--override property=value]*"
    exit 1
fi
base_dir=$(dirname $0)

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi


if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    #在這裏指定堆內存爲20G
    export KAFKA_HEAP_OPTS="-Xmx20G -Xms20G"     
fi

EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'}

COMMAND=$1
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac


exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
[root@yinzhengjie ~]# 

6>.編寫kafka啓動腳本

[root@yinzhengjie ~]# cat /usr/local/bin/xkafka.sh 
#!/bin/bash
#@author :yinzhengjie
#blog:http://www.cnblogs.com/yinzhengjie
#EMAIL:y1053419035@qq.com

#判斷用戶是否傳參
if [ $# -ne 1 ];then
    echo "無效參數,用法爲: $0  {start|stop}"
    exit
fi

#獲取用戶輸入的命令
cmd=$1



for (( i=116 ; i<=120 ; i++ )) ; do
    tput setaf 2
    echo ========== kafka${i}.aggrx  $cmd ================
    tput setaf 9
    case $cmd in
        start)
        ssh  kafka${i}.aggrx "source /etc/profile ; nohup kafka-server-start.sh /soft/kafka/config/server.properties >> /home/yinzhengjie/kafka/console/kafka-`date +%F`.log &" 
            #ssh  kafka${i}.aggrx  "source /etc/profile ; kafka-server-start.sh -daemon /soft/kafka/config/server.properties"
            echo  kafka${i}.aggrx  "服務已啓動"
            ;;
        stop) 
            ssh kafka${i}.aggrx  "source /etc/profile ; kafka-server-stop.sh" 
            echo kafka${i}.aggrx  "服務已中止"
            ;;
            *) 
            echo "無效參數,用法爲: $0  {start|stop}"
            exit 
            ;;
     esac
done
[root@yinzhengjie ~]# 

7>.查看各個服務器的啓動進程

[root@yinzhengjie ~]# cat /usr/local/bin/xcall.sh 
#!/bin/bash
#@author :yinzhengjie
#blog:http://www.cnblogs.com/yinzhengjie
#EMAIL:y1053419035@qq.com


#判斷用戶是否傳參
if [ $# -lt 1 ];then
        echo "請輸入參數"
        exit
fi

#獲取用戶輸入的命令
cmd=$@

#Kafka集羣批量執行命令
for (( i=116;i<=120;i++ ))
do
    #使終端變綠色
    tput setaf 2
    echo =========== kafka${i}.aggrx : $cmd ===========
    #使終端變回原來的顏色,即白灰色
    tput setaf 7
    #遠程執行命令
    ssh kafka${i}.aggrx $cmd
    #判斷命令是否執行成功
    if [ $? == 0 ];then
        echo "命令執行成功"
    fi
done
[root@yinzhengjie ~]# 
[root@yinzhengjie ~]# cat /usr/local/bin/xcall.sh                      #編寫批量執行的腳本,須要你手動配置SSH免密要登陸。
[root@yinzhengjie ~]# xcall.sh jps
=========== yinzhengjie.aggrx : jps ===========
934 Jps
9929 Kafka
10746 ProdServerStart
命令執行成功
=========== kafka117.aggrx : jps ===========
953 Jps
8236 Kafka
4735 QuorumPeerMain
命令執行成功
=========== kafka118.aggrx : jps ===========
4616 QuorumPeerMain
2425 Jps
8382 Kafka
命令執行成功
=========== kafka119.aggrx : jps ===========
23953 Jps
4763 QuorumPeerMain
8079 Kafka
命令執行成功
=========== kafka120.aggrx : jps ===========
26196 Jps
8143 Kafka
命令執行成功
[root@yinzhengjie ~]# 
相關文章
相關標籤/搜索