本文與前文是有關聯的,以前的兩篇文章客官能夠擡腿出門右轉,elk 導讀, 實戰一
kafka的配置安裝:
#kafka 和zookeeper 都依賴java ,機器上必須安裝java,具體安裝方法和效驗方法,請各位客官擡腿向上看!
#下載安裝包:一樣放到/opt/elk/ 的目錄裏邊:
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/1.0.1/kafka_2.12-1.0.1.tgz
#解壓安裝包:
tar -zxvf kafka_2.12-1.0.1.tgz
#kafka 採用集羣方式部署,測試環境部署,因此我用兩臺機器模擬出3個節點的cluster 。kafka 依賴zookeeper,zookeeper 簡稱ZK,已經包含在kafka的tar包裏,不須要而外下載安裝,爲啥要用三個節點呢?這個是zk的一個選舉leader的特性,官方推薦是奇數個server,奇數個最少3個,這樣壞一個還有兩個,能夠正常選舉,如下是zookeeper給出的官方說法:
For replicated mode, a minimum of three servers are required, and it is strongly recommended that you have an odd number of servers. If you only have two servers, then you are in a situation where if one of them fails, there are not enough machines to form a majority quorum. Two servers is inherently less stable than a single server, because there are two single points of failure.
#因爲是集羣配置,配置之間幾乎差很少,因此配置好一個,直接scp到其他節點就ok了,到了其他節點把關鍵參數修改一下就好了。
具體配置方法以下:
#給zookeeper 建立data 目錄和logs 目錄
mkdir /opt/elk/kafka_2.12-1.0.1/zookeeper/{data,logs} -p
#建立myid 文件
echo 1 > /opt/elk/kafka/zookeeper/data/myid
cd /opt/elk/kafka_2.12-1.0.1
#kafka &ZK 的全部配置文件都放到了config 目錄下
cd /config
#因爲kafka 依賴zk 因此啓動的時候須要先配置zk,啓動的時候先把zk啓動了,而後再啓動kafka
vi zookeeper.properties
具體配置內容以下:
#zk 存放數據的目錄,同時由於是集羣,zk 須要有一個叫作myid的文件也是放到這個目錄下
dataDir=/opt/elk/kafka/zookeeper/datahtml
#This option will direct the machine to write the transaction log to the dataLogDir rather than the dataDir. This allows a dedicated log device to be used, and helps avoid competition between logging and snaphots.
dataLogDir=/opt/elk/kafka/zookeeper/logs
#客戶端鏈接端口
clientPort=2181
#最大客戶端鏈接數
maxClientCnxns=20
#這個時間是做爲Zookeeper服務器之間或客戶端與服務器之間維持心跳的時間間隔
tickTime=2000
#此配置表示,容許follower(相對於Leaderer言的「客戶端」)鏈接並同步到Leader的初始化鏈接時間,以tickTime爲單位。當初始化鏈接時間超過該值,則表示鏈接失敗。
initLimit=10java
syncLimit=5
#集羣模式關鍵配置參數
server.x=[hostname]:nnnnn[:nnnnn]
There are two port numbers nnnnn. The first followers use to connect to the leader, and the second is for leader election. The leader election port is only necessary if electionAlg is 1, 2, or 3 (default). If electionAlg is 0, then the second port is not necessary. If you want to test multiple servers on a single machine, then different ports can be used for each server.
#server.myid=ip:followers_connect to the leader:leader_election # server 是固定的,myid 是須要手動分配,第一個端口是follower是連接到leader的端口,第二個是用來選舉leader 用的port apache
server.1=192.168.1.1:2888:3888
server.2=192.168.1.2:2888:3888
server.3=192.168.1.2:2889:3889
#個人第二個server和第三個server 是用的同一臺機器跑了兩個實例,因此端口須要使用不一樣的端口來配置,切記若是有火牆的話,必定要放行你配置的口服務器
#kafka 的配置一樣在config 目錄中的server.properties 是kafka的配置文件
vi server.properties
#The broker id for this server. If unset, a unique broker id will be generated.To avoid conflicts between zookeeper generated broker id's and user configured broker id's, generated broker ids start from reserved.broker.max.id + 1.
#每一個server須要單獨配置broker id,若是不配置系統會自動配置。
broker.id=0
#消費者的訪問端口,logstash或者elasticsearch
listeners=PLAINTEXT://192.168.115.65:9092
#The number of threads that the server uses for receiving requests from the network and sending responses to the network ,接收和發送網絡信息的線程數
num.network.threads=3
#The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
#The SO_SNDBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used.
socket.send.buffer.bytes=102400
#The SO_RCVBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used.
socket.receive.buffer.bytes=102400
#The maximum number of bytes in a socket request
socket.request.max.bytes=104857600
#這個是設置log的目錄
log.dirs=/usr/local/kafka/logs網絡
num.partitions=1
#The number of threads per data directory to be used for log recovery at startup and flushing at shutdown
num.recovery.threads.per.data.dir=1
#The replication factor for the offsets topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.
offsets.topic.replication.factor=1
#The replication factor for the transaction topic (set higher to ensure availability). Internal topic creation will fail until the cluster size meets this replication factor requirement.
transaction.state.log.replication.factor=1
#Overridden min.insync.replicas config for the transaction topic.
transaction.state.log.min.isr=1
#The number of hours to keep a log file before deleting it (in hours), tertiary to log.retention.ms property。 配置多少小時以後會刪除以前的數據。
log.retention.hours=168
#The maximum size of a single log file。 單個日誌文件的大小
log.segment.bytes=1073741824
#The frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion。多少毫秒檢查一次是否有須要刪除的log 文件
log.retention.check.interval.ms=300000
#這塊是重點,配置kafka連接的ZK server
zookeeper.connect=192.168.165.65:2181,192.168.101.242:2181,192.168.101.242:2182
#zookeeper 連接超時設置
zookeeper.connection.timeout.ms=6000
#The amount of time the group coordinator will wait for more consumers to join a new group before performing the first rebalance. A longer delay means potentially fewer rebalances, but increases the time until processing begins.
group.initial.rebalance.delay.ms=0
#官方參考文檔地址:http://kafka.apache.org/10/documentation.html#brokerconfigssession
#打包配置好的文件包
tar -zcvf /opt/elk/kafka_2.12-1.0.1-ready.tar.gz kafka_2.12-1.0.1
#scp 源文件到另一臺機器上
scp /opt/elk/kafka_2.12-1.0.1-ready.tar.gz elk@192.168.1.2:/opt/elk/ less
#登陸機器1.2 上配置,這個機器上配置兩套kafka,兩套ZK
FYI:若是你是三臺機器配置的話,就不須要這樣麻煩了,只須要把配置好的安裝包直接分發到不一樣的機器上,而後修改zookeeper的myid,kafka的broker.id 就能夠了。
#解壓拷貝過來的包
tar -zxvf kafka_2.12-1.0.1-ready.tar.gz
#因爲這臺機器須要配置兩套kafka和ZK 須要創建對用的數據目錄和log目錄給你不一樣的實例用
#建立kafka 數據目錄
mkdir /opt/elk/kafka_2.12-1.0.1/data/{k2,k3}
#建立 ZK的數據和log 目錄,官方推薦這兩個目錄最好不在一個磁盤下,可能會影響磁盤寫入讀取性能,因此若是數據量大的話,最好分開
mkdir /opt/elk/kafka_2.12-1.0.1/zookeeper/{z2/{data,logs},z3/{data,logs}} -p
#建立myid 文件並寫入ID number
echo 2 > /opt/elk/kafka_2.12-1.0.1/zookeeper/z2/data/myid
echo 3 > /opt/elk/kafka_2.12-1.0.1/zookeeper/z3/data/myid
#把ZK 和kafka 都是制定配置文件運行的,因此咱們須要分別把zookeeper.properties & server.properties 複製爲兩個不一樣的文件名字
cd /opt/elk/kafka_2.12-1.0.1/
cp zookeeper.properties zookeeper-2.properties
mv zookeeper.properties zookeeper-3.properties
cp server.properties server-2.properties
mv server.properties server-3.properties
#全部配置文件搞定了以後,zk-2 &zk-3 須要修改地方以下:
dataDir=剛剛建立好的目錄
dataLogDir=剛剛建立好的目錄
#zk-3 須要多修改一個地方
clientPort=2182 socket
#Kafka 須要修改以下幾個地方:
broker.id=指定的id
#kafka 3 修改port
listeners=PLAINTEXT://192.168.1.2:9093elasticsearch
#到這裏全部kafka集羣的全部配置都搞定了,開始啓動集羣了,順序是先啓動zk,而後再啓動kafka
#1.1機器上執行以下命令
nohup /opt/elk/kafka_2.12-1.0.1/bin/zookeeper-server-start.sh config/zookeeper.properties >>/dev/null 2>&1 &
#1.2 機器上執行以下命令
nohup bin/zookeeper-server-start.sh config/zookeeper-2.properties >>/dev/null 2>&1 &
nohup bin/zookeeper-server-start.sh config/zookeeper-3.properties >>/dev/null 2>&1 &分佈式
#能夠經過lsof 命令查看服務是否正常啓動
lsof -i:2181
#介紹幾個簡單的確認zk 服務是否正常的命令
#須要本機安裝nc 命令
yum -y install nc
#使用echo ruok|nc 127.0.0.1 2181 測試是否啓動了該Server,若回覆imok表示已經啓動。
#查看zk的配置,配置正常返回證實zk service 正常
echo conf | nc 192.168.1.1 2181
#stat 能夠查看集羣狀態
echo stat | nc 127.0.0.1 2182
還有以下經常使用命令:
ZooKeeper 支持某些特定的四字命令字母與其的交互。它們大可能是查詢命令,用來獲取 ZooKeeper 服務的當前狀態及相關信息。用戶在客戶端能夠經過 telnet 或 nc 向 ZooKeeper 提交相應的命令
#zk 搞定了以後開始搞kafka 了
1.1 上執行以下命令
nohup /opt/elk/kafka_2.12-1.0.1/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties >>/dev/null 2>&1 &
1.2上執行以下兩條命:
nohup /opt/elk/kafka_2.12-1.0.1/bin/kafka-server-start.sh /usr/local/kafka/config/server-2.properties >>/dev/null 2>&1 &
nohup /opt/elk/kafka_2.12-1.0.1/bin/kafka-server-start.sh /usr/local/kafka/config/server-3.properties >>/dev/null 2>&1 &
#經過lsof 命令查看端口是否正常啓動
#測試kafka 工做是否正常,新建一個topic
/opt/elk/kafka_2.12-1.0.1/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
#提示如下內容證實沒有問題
Created topic "test".
#經過list 來查看
/usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.1.1:2181 --list
test
#到這裏kafka 集羣就搞定了