參考:html
http://www.cnblogs.com/smartloli/p/4538173.htmljava
http://blog.csdn.net/lsshlsw/article/details/47342821web
虛擬機中共五個centos系統,每一個系統有兩個用戶root和hadoop:cdh1,cdh2,cdh3,cdh4,cdh5centos
集羣規劃併發
安裝kafka(cdh3機器)app
第一步,解壓已下載好的kafka安裝包ssh
#tar -zxvf kafka_2.9.2-0.8.2.2.tgz
解壓後刪除kafka安裝包,節省磁盤空間oop
#rm -rf kafka_2.9.2-0.8.2.2.tgz
第二步,root用戶配置環境變量測試
#vi /etc/profile
添加如下配置spa
KAFKA_HOME=/home/hadoop/app/kafka_2.9.2-0.8.2.2 export PATH=$PATH:$KAFKA_HOME/bin
使環境變量及時生效
#source /etc/profile
第三步,配置zookeeper.properties
hadoop用戶登陸
#su hadoop
進入$KAFKA_HOME/config目錄,配置zookeeper.properties文件
#vi zookeeper.properties
更改如下配置:
#用於存放zookeeper生成的數據文件,默認放在/tmp/zookeeper路徑下
dataDir=/home/hadoop/data/kafka/zookeeper
#zookeeper監聽的端口
clientPort=2181
#鏈接到 ZooKeeper 的客戶端的數量,限制併發鏈接的數量,它經過 IP 來區分不一樣的客戶端。此配置選項能夠用來阻止某些類別的 Dos 攻擊。將它設置爲 0 或者忽略而不進行設置將會取消對併發鏈接的限制
maxClientCnxns=3
建立zookeeper生成的數據存放目錄:
#mkdir –p /home/hadoop/data/kafka/zookeeper/
第四步,配置server.properties文件
進入$KAFKA_HOME/config目錄,配置server.properties文件
更改如下配置
#zookeeper的ip和端口
zookeeper.connect=cdh3:2181,cdh4:2181,cdh5:2181
#broker存放數據文件的地方,默認在/tmp/kafka-logs/
log.dirs=/home/hadoop/data/kafka/kafka-logs
須要注意的是broker.id=0,broker的惟一標識,整數,配置broker的時候,每臺機器上的broker保證惟一,從0開始。如:在另外2臺機器上分別配置broker.id=1,broker.id=2
建立broker存放數據目錄:
#mkdir -p /home/hadoop/data/kafka/kafka-logs/
第五步,配置producer.properties
# vi producer.properties
更改一下配置:
# broker的ip和端口
metadata.broker.list=cdh3:9092,cdh4:9092,cdh5:9092
第六步,配置consumer.properties
# vi consumer.properties
更改如下配置:
# zookeeper的ip和端口
zookeeper.connect=cdh3:2181,cdh4:2181,cdh5:2181
第七步,拷貝kafka數據目錄/home/hadoop/data/kafka到其餘節點
在cdh3的~/tools目錄下執行deploy.sh批處理命令
# ./deploy.sh ~/data/kafka/ ~/data/ zookeeper
第七步,拷貝kafka安裝文件到其餘節點(cdh4,cdh5)
在cdh3的~/tools目錄下執行deploy.sh批處理命令
# ./deploy.sh ~/app/kafka_2.9.2-0.8.2.2/ ~/app/ zookeeper
拷貝完成後,更改cdh4和cdh5兩個節點中server.properties的屬性:
cdh4 broker.id=1 cdh5 broker.id=2
第八步,其餘節點的root用戶配置環境變量(能夠用批處理腳本,也能夠每一個節點手動配置)
#vi /etc/profile
添加如下配置
KAFKA_HOME=/home/hadoop/app/kafka_2.9.2-0.8.2.2 export PATH=$PATH:$KAFKA_HOME/bin
使環境變量及時生效
#source /etc/profile
第九步,啓動kafka
啓動zookeeper 進入cdh3節點的~/tools目錄 #cd ~/tools/ #./runRemoteCmd.sh "~/app/zookeeper-3.4.5-cdh5.4.5/bin/zkServer.sh start" zookeeper 查看各節點進程,進入cdh3的~/tools目錄下 #cd ~/tools/ #./runRemoteCmd.sh "jps" zookeeper
啓動kafka
進入cdh3節點的~/tools目錄
#cd ~/tools/ #啓動cdh3,cdh4,cdh5三臺虛擬機的kafka #./runRemoteCmd.sh " kafka-server-start.sh ~/app/kafka_2.9.2-0.8.2.2/config/server.properties" zookeeper 或者 #./runRemoteCmd.sh " kafka-server-start.sh ~/app/kafka_2.9.2-0.8.2.2/config/server.properties &" zookeeper
由上New leader is 0能夠看出cdh3節點是主節點。
查看進程,進入cdh3的~/tools目錄下
#cd ~/tools/ #./runRemoteCmd.sh "jps" zookeeper
第十步,建立Topic
#kafka-topics.sh --zookeeper cdh3:2181,cdh4:2181,cdh5:2181 --topic topic1 --replication-factor 3 --partitions 1 –create
查看Topic相關信息
#kafka-topics.sh --zookeeper cdh3:2181,cdh4:2181,cdh5:2181 --topic topic1 --describe
下面解釋一下這些輸出。第一行是對全部分區的一個描述,而後每一個分區都會對應一行,由於咱們只有一個分區因此下面就只加了一行。
Leader:負責處理消息的讀和寫,Leader是從全部節點中隨機選擇的。
Replicas:列出了全部的副本節點,無論節點是否在服務中。
Isr:是正在服務中的節點
第十一步,生產消息
下面咱們使用kafka的Producer生產一些消息,而後讓Kafka的Consumer去消費,命令以下所示:
#kafka-console-producer.sh --broker-list cdh3:9092,cdh4:9092,cdh5:9092 --topic topic1
第十二步,消費消息
在另一個節點(如cdh4)啓動消費進程,來消費這些消息,命令以下所示:
#kafka-console-consumer.sh --zookeeper cdh3:2181,cdh4:2181,cdh5:2181 --from-beginning --topic topic1
第十三步,測試kafka的HA高可用
殺掉leader節點cdh3的kafka服務
查看cdh5的kafka服務輸出信息:
能夠看出,leader切換爲cdh5節點(broker.id=2),切換成功
第十四步,從新測試消息的生產和消費
生產消息
在kafka已經服務啓動的節點下生產消息:
# kafka-console-producer.sh --broker-list cdh3:9092,cdh4:9092,cdh5:9092 --topic topic1
在其餘節點(如cdh4)下消費消息:
# kafka-console-consumer.sh --zookeeper cdh3:2181,cdh4:2181,cdh5:2181 --from-beginning --topic topic1
總結
在部署Kafka Cluster的時候,有些地方須要咱們注意,好比:在咱們啓動Kafka集羣的時候,確保ZK集羣啓動,另外,在配置Kafka配置文件信息時,確保ZK的集羣信息配置到相應的配置文件中,整體來講,配置還算較爲簡單,須要在部署的時候,仔細配置各個文件便可。
啓動kafka的消息監控
第一步,將jar文件上傳到虛擬機,如cdh3節點
第二步,編寫jar文件的執行腳本
腳本以下:
nohup java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --zk cdh3:2181,cdh4:2181,cdh5:2181 \ --port 8089 \ --refresh 10.seconds \ --retain 1.days &
經常使用參數說明
–zk - Zookeeper hosts
–port - 啓動webUI的端口號
–refresh - 頁面數據刷新時間
–retain - 歷史數據存放的時間(存放在SQLlite中)
#vi kafkaOffsetMonitor.sh
啓動jar執行腳本
sh kafkaOffsetMonitor.sh
訪問http://cdh3:8089監控頁面
消費者組列表:
topic的全部partiton消費狀況列表:
以上圖中參數含義解釋以下:
topic:建立時topic名稱
partition:分區編號
offset:表示該parition已經消費了多少條message
logSize:表示該partition已經寫了多少條message
Lag:表示有多少條message沒有被消費。
Owner:表示消費者
Created:該partition建立時間
Last Seen:消費狀態刷新最新時間。
kafka正在運行的topic:
kafka集羣中topic列表:
kafka集羣中broker列表:
中止KafkaOffsetMonitor監控
#jps #kill -9 進程號
**********************關閉kafka*********************
關閉kafka 進入cdh3節點的~/tools目錄 #cd ~/tools/ #關閉cdh3,cdh4,cdh5三臺虛擬機的kafka #./runRemoteCmd.sh " kafka-server-stop.sh ~/app/kafka_2.9.2-0.8.2.2/config/server.properties" zookeeper 或者 #./runRemoteCmd.sh " kafka-server-stop.sh ~/app/kafka_2.9.2-0.8.2.2/config/server.properties &" zookeeper 查看進程,進入cdh3的~/tools目錄下 #cd ~/tools/ #./runRemoteCmd.sh "jps" zookeeper 關閉zookeeper 進入cdh3節點的~/tools目錄 #cd ~/tools/ #./runRemoteCmd.sh "~/app/zookeeper-3.4.5-cdh5.4.5/bin/zkServer.sh stop" zookeeper 查看進程,進入cdh3的~/tools目錄下 #cd ~/tools/ #./runRemoteCmd.sh "jps" zookeeper
Kafka集羣安裝完成
完成!