Zookeeper集羣:172.16.218.20一、172.16.218.20二、172.16.218.203
172.16.218.201 kafka1
172.16.218.202 kafka2
172.16.218.203 kafka3
172.16.200.48 kafka-managerjava
wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
# vi /etc/hosts 172.16.218.201 zookeeper1-201 172.16.218.202 zookeeper2-202 172.16.218.203 zookeeper3-203
# tar zxvf zookeeper-3.4.10.tar.gz # cd zookeeper-3.4.10 # mv zookeeper-3.4.10 zookeeper # chown -R root. /usr/local/zookeeper-node1 # cd /usr/local/zookepper-node1/conf/ # cp zoo_sample.cfg zoo.cfg
# tar zxvf jdk-8u74-linux-x64.tar.gz # mv jdk1.8.0_131 /usr/local/java/
#add java export JAVA_HOME=/usr/local/java/jdk1.8.0_131 export JAVA_BIN=/usr/local/java/jdk1.8.0_131/bin export PATH=$PATH:/usr/local/java/jdk1.8.0_131/bin export CLASSPATH=./:/usr/local/java/jdk1.8.0_131/lib:/usr/local/java/jdk1.8.0_131/jre/lib #而後執行 source /etc/profile
tickTime=2000 initLimit=10 syncLimit=5 dataDir=/home/zookeeper/data clientPort=2181 maxClientCnxns=60 autopurge.snapRetainCount=3 autopurge.purgeInterval=24 dataLogDir=/home/zookeeper/logs server.1=172.16.218.201:2888:3888 server.2=172.16.218.202:2888:3888 server.3=172.16.218.203:2888:3888
# mkdir -p /data/zookeeper/data/ # mkdir -p /data/zookeeper/logs
在201機器上建立myid,並設置爲1與配置文件zoo.cfg裏面server.1對應。 # cd /data/zookeeper/data # echo 1 > myid 在202機器上建立myid,並設置爲2與配置文件zoo.cfg裏面server.2對應。 echo "2" > /data/zookeeper/data/myid 在203機器上建立myid,並設置爲3與配置文件zoo.cfg裏面server.3對應。 echo "3" > /data/zookeeper/data/myid
# cd /usr/local/zookeeper/bin/ # ./zkServer.sh start # netstat -lutnp |grep java tcp 0 0 0.0.0.0:2181
vim /etc/rc.local 添加: /usr/local/zookeeper-node1/bin/zkServer.sh start
# ./zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper-node2/bin/../conf/zoo.cfg Mode: leader
網址:https://kafka.apache.org/downloads
mkdir kafka mkdir -p /data/kafka/kafkalogs cd /data/kafka tar -zxvf kafka_2.12-0.11.0.0.tgz
cd /data/kafka/kafka_2.12-0.11.0.0/config vim server.properties #broker.id=0 每臺服務器的broker.id都不能相同(1/2/3) #hostname host.name=172.16.218.201 #在log.retention.hours=168 下面新增下面三項 message.max.byte=5242880 default.replication.factor=2 replica.fetch.max.bytes=5242880 #設置zookeeper的鏈接端口 zookeeper.connect=172.16.218.201:2181,172.16.218.202:2181,172.16.218.203:2181
cd /data/kafka/kafka_2.12-0.11.0.0/bin #進入到kafka的bin目錄 ./kafka-server-start.sh -daemon ../config/server.properties &
執行命令: [root@mariadb-node1 config] # jps 1618 Jps 27987 QuorumPeerMain 1260 Kafka
#建立Topic ./kafka-topics.sh --create --zookeeper 172.16.218.201:2181,172.16.218.202:2181,172.16.218.203:2181 --replication-factor 2 --partitions 1 --topic shequ #解釋 --replication-factor 2 #複製兩份 --partitions 1 #建立1個分區 --topic #主題爲shequ #查看已建立的topic ./kafka-topics.sh --list --zookeeper 172.16.218.201:2181,172.16.218.202:2181,172.16.218.203:2181 '''在一臺服務器上建立一個發佈者'''#建立一個broker,發佈者 ./kafka-console-producer.sh --broker-list 172.16.218.201:19092 --topic shequ
# ./kafka-console-producer.sh --broker-list 172.16.218.202:19092 --topic test > Hello world > Hello kafka
./kafka-console-consumer.sh --zookeeper 172.16.218.201:2181,172.16.218.202:2181,172.16.218.203:2181 --topic test --from-beginning Hello world Hello kafka
./kafka-topics.sh --describe --zookeeper 172.16.218.201:2181,172.16.218.202:2181,172.16.218.203:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:2 Configs: Topic: test Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2 1) Leader 是給定分區的節點編號,每一個分區的部分數據會隨機指定不一樣的節點 2) Replicas 是該日誌會保存的複製 3) Isr 表示正在同步的複製
./kafka-topics.sh --delete --zookeeper 172.16.218.201:2181,172.16.218.202:2181,172.16.218.203:2181 --topic test Topic test is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
功能:
爲了簡化開發者和服務工程師維護Kafka集羣的工做,yahoo構建了一個叫作Kafka管理器的基於Web工具,叫作 Kafka Manager。這個管理工具能夠很容易地發現分佈在集羣中的哪些topic分佈不均勻,或者是分區在整個集羣分佈不均勻的的狀況。它支持管理多個集羣、選擇副本、副本從新分配以及建立Topic。同時,這個管理工具也是一個很是好的能夠快速瀏覽這個集羣的工具,有以下功能:node
1.管理多個kafka集羣 2.便捷的檢查kafka集羣狀態(topics,brokers,備份分佈狀況,分區分佈狀況) 3.選擇你要運行的副本 4.基於當前分區情況進行 5.能夠選擇topic配置並建立topic(0.8.1.1和0.8.2的配置不一樣) 6.刪除topic(只支持0.8.2以上的版本而且要在broker配置中設置delete.topic.enable=true) 7.Topic list會指明哪些topic被刪除(在0.8.2以上版本適用) 8.爲已存在的topic增長分區 9.爲已存在的topic更新配置 10.在多個topic上批量重分區 11.在多個topic上批量重分區(可選partition broker位置)
安裝步驟linux
# cd /usr/local # git clone https://github.com/yahoo/kafka-manager # cd kafka-manager # ./sbt clean dist
注: 執行sbt編譯打包可能花費很長時間,若是你hang在以下狀況
將project/plugins.sbt 中的logLevel參數修改成logLevel := Level.Debug(默認爲Warn)git
編譯成功後,會在target/universal下生成一個zip包github
# cd /usr/local/kafka-manager/target/universal # unzip kafka-manager-1.3.3.7.zip 將application.conf中的kafka-manager.zkhosts的值設置爲你的zk地址 如:kafka-manager.zkhosts="172.16.218.201:2181,172.16.218.202:2181,172.16.218.203:2181"
直接啓動:web
# cd kafka-manager-1.3.3.7/bin # ./kafka-manager -Dconfig.file=../conf/application.conf
後臺運行:apache
# ./kafka-manager -h # nohup ./kafka-manager -Dconfig.file=../conf/application.conf &
指定端口,例如:vim
# nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=9001 &
第一次進入web UI要進行kafka cluster的相關配置,根據本身的信息進行配置。服務器
網址:http://172.16.200.48:9000/ app
添加kafka集羣成功後,點擊進去,界面以下: