參考: kafka中文文檔html
kafka+zookeeper集羣部署linux
kafka集羣部署web
kafka體系架構講解apache
kafka工做原理bootstrap
關閉selinux,關閉防火牆服務器
kafka 版本: kafka_2.11-2.1.0架構
zookpeeper版本: 3.4.12oracle
jdk: 1.8
ip | 角色 | 系統 |
172.10.10.226 | zookeeper+kafka | redhat7.3 |
172.10.10.225 | zookeeper+kafka | redhat7.3 |
172.10.10.224 | zookeeper+kafka | redhat7.3 |
參考: https://www.cnblogs.com/linuxprobe/p/5851699.html
http://www.cnblogs.com/ahu-lichang/p/6723826.html
也可本身下載安裝,這裏用的yum
JDK下載地址:http://www.oracle.com/technetwork/java/javase/downloads/index.html
rpm -ivh jdk-8u101-linux-x64.rpm
yum install java-1.8.0
Zookeeper連接:http://zookeeper.apache.org/
wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz -P /mnt tar zxvf zookeeper-3.4.12.tar.gz -C /mnt cd /mnt && mv zookeeper-3.4.12 zookeeper cd zookeeper cp conf/zoo_sample.cfg conf/zoo.cfg
#把zookeeper加入到環境變量
echo -e "# append zk_env\nexport PATH=$PATH:/opt/zookeeper/bin" >> /etc/profile
2.3.1 zookeeper配置文件修改.在zookeeper的conf目錄建立
tickTime=2000 initLimit=10 syncLimit=5 dataLogDir=/mnt/zookeeper/logs dataDir=/mnt/zookeeper/data clientPort=2181 #autopurge.snapRetainCount=500 #autopurge.purgeInterval=24 server.1= 172.10.10.226:2888:3888 #server.1 中的1表示的該node的id,要和後面myid文件中保持一致 server.2= 172.10.10.225:2888:3888 server.3= 172.10.10.224:2888:3888
#######參數說明
tickTime這個時間是做爲zookeeper服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是說每一個tickTime時間就會發送一個心跳。
initLimit這個配置項是用來配置zookeeper接受客戶端(這裏所說的客戶端不是用戶鏈接zookeeper服務器的客戶端,而是zookeeper服務器集羣中鏈接到leader的follower 服務器)初始化鏈接時最長能忍受多少個心跳時間間隔數。
當已經超過10個心跳的時間(也就是tickTime)長度後 zookeeper 服務器尚未收到客戶端的返回信息,那麼代表這個客戶端鏈接失敗。總的時間長度就是 10*2000=20秒。
syncLimit這個配置項標識leader與follower之間發送消息,請求和應答時間長度,最長不能超過多少個tickTime的時間長度,總的時間長度就是5*2000=10秒。
dataDir顧名思義就是zookeeper保存數據的目錄,默認狀況下zookeeper將寫數據的日誌文件也保存在這個目錄裏;
clientPort這個端口就是客戶端鏈接Zookeeper服務器的端口,Zookeeper會監聽這個端口接受客戶端的訪問請求;
server.A=B:C:D中的A是一個數字,表示這個是第幾號服務器,B是這個服務器的IP地址,C第一個端口用來集羣成員的信息交換,表示這個服務器與集羣中的leader服務器交換信息的端口,D是在leader掛掉時專門用來進行選舉leader所用的端口。
#建立相關目錄,三臺節點都須要
mkdir -p /mnt/zookeeper/{logs,data}
#其他zookeeper節點安裝完成以後,同步配置文件zoo.cfg。
2.3.2 建立serverid表識
除了修改zoo.cfg配置文件外,zookeeper集羣模式下還要配置一個myid文件,這個文件須要放在dataDir目錄下。
這個文件裏面有一個數據就是A的值(該A就是zoo.cfg文件中server.A=B:C:D中的A),在zoo.cfg文件中配置的dataDir路徑中建立myid文件。
#在172.10.10.226服務器上面建立myid文件,並設置值爲1,同時與zoo.cfg文件裏面的server.1保持一致,以下
echo "1" > /mnt/zookeeper/data/myid
其它2臺id分別爲 2 ,3
/mnt/zookeeper/bin/zkServer.sh start #啓動
/mnt/zookeeper/bin/zkServer.sh status #查看
bin/zkCli.sh -server 172.10.10.225:2181 #模擬客戶端鏈接
Zookeeper集羣搭建完畢以後,能夠經過客戶端腳本鏈接到zookeeper集羣上面,對客戶端來講,zookeeper集羣是一個總體,鏈接到zookeeper集羣實際上感受在獨享整個集羣的服務。
下載地址: http://kafka.apache.org/downloads
[root@r0 kafka_2.11-2.1.0]# grep -E -v "^#|^$" config/server.properties broker.id=0 ###集羣中惟一 listeners=PLAINTEXT://172.10.10.226:9092 ####本地ip:端口 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=3 #### 一個topic的partition個數 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 replication.factor=2 ##### 每一個partition的副本 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=172.10.10.226:2181,172.10.10.225:2181,172.10.10.224:2181 ####zookeeper集羣. zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
delete.topic.enable=true ########可刪除topic
將上述配置文件複製到另外2臺服務器,並修改每一個節點對應的 server.properties 文件的 broker.id和listenrs
bin/kafka-server-start.sh config/server.properties &
建立topic
bin/kafka-topics.sh --create --zookeeper 172.10.10.226:2181,172.10.10.225:2182,172.10.10.224:2183 --replication-factor 2 --partitions 3 --topic test
顯示topic
bin/kafka-topics.sh --describe --zookeeper 172.10.10.226:2181,172.10.10.225:2182,172.10.10.224:2183 --topic test
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N Topic:test PartitionCount:3 ReplicationFactor:2 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: test Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0
建立 producer(生產者):
[root@r0 kafka_2.11-2.1.0]# bin/kafka-console-producer.sh --broker-list 172.10.10.226:9092 -topic test OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N > >>1111 >2222 >333
建立 consumer(消費者):
[root@r2 kafka_2.11-2.1.0]# bin/kafka-console-consumer.sh --bootstrap-server 172.10.10.226:9092,172.10.10.225:9092,172.10.10.224:9092 --topic test --from-beginning OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N message 3 message 6 333333333 5555555555 5 5 #問題,發現爲啥順序不對,生產者的輸入順序和消費者的讀取順序不一致.
# 刪除topic bin/kafka-topics.sh --delete --zookeeper 172.10.10.226:2181,172.10.10.225:2181,172.10.10.224:2181 --topic test # 關閉kafka ,3臺都執行 [root@worker2 kafka_2.12-1.1.0]$ bin/kafka-server-stop.sh conf/server.properties [root@worker2 kafka_2.12-1.1.0]$ bin/kafka-server-stop.sh conf/server.properties [root@worker2 kafka_2.12-1.1.0]$ bin/kafka-server-stop.sh conf/server.properties # 關閉zookeeper ,3臺都執行 [root@master zookeeper-3.4.11]$ bin/zkServer.sh stop conf/zoo.cfg [root@worker1 zookeeper-3.4.11]$ bin/zkServer.shstop conf/zoo.cfg [root@worker2 zookeeper-3.4.11]$ bin/zkServer.shstop conf/zoo.cfg