由於kafka已經下降了zookeeper在kafka中的重要性,全部不推薦單獨搭建zookeeper了,使用集成的zookeeper已經很是好了bootstrap
如今kafka版本是1.1.0,可是根據scala編譯分爲2.11和2.12版本,其實就是2.11對應JDK1.7, 2.12對應JDK1.8vim
例如3臺集羣主機IP分別爲10.128.90.111;10.128.90.112;10.128.90.113服務器
kafka解壓部署在主機路徑 /app/kafka_2.11-1.1.0app
broker.id=0 //kafka broker的id,三個節點不相同測試
listeners=PLAINTEXT://10.128.90.111:29092 //kafka端口,三個節點分別填寫節點本機IPspa
log.dir=/app/kafkadata //kafka日誌路徑scala
zookeeper.connect=10.128.90.111:22181,10.128.90.112:22181,10.128.90.113:22181 //zookeeper集羣地址,使用,分隔日誌
dataDir=/app/zookeeperdata //zookeeper快照存儲地址server
clientPort=22181 //端口號blog
maxClientCnxns=0 //客戶端與服務端鏈接限制
initLimit=10
syncLimit=5
server.0=10.128.90.111:22888:23888
server.1=10.128.90.112:22888:23888
server.2=10.128.90.113:22888:23888
22888用於follower與leader之間的數據同步與通訊;23888用於leader選舉時的通訊
0,1,2 是zookeeper三個節點id,需在zookeeper快照存儲文件夾中建立myid文件,並寫入對應的id號
例如 10.128.90.111 需寫入0
切換路徑到/app/zookeeperdata下
vim myid
寫入0保存
啓動zookeeper:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties &
啓動kafka:
bin/kafka-server-start.sh -daemon config/server.properties &
建立Topic
bin/kafka-topics.sh --create --topic mytest --zookeeper 10.128.90.111:22181,10.128.90.112:22181,10.128.90.113:22181 --replication-factor 1 --partitions 1
replication-factor 副本數,至關於把數據複製了幾份保存在服務器上
partitions 分區,至關於把數據打散保存,能夠提升數據的吞吐量,不易過大,過大會形成穩定性,須要更大的內存,需打開更多的句柄
查看Topic列表
bin/kafka-topics.sh --list --zookeeper 10.128.90.111:22181
建立生產者
bin/kafka-console-producer.sh --broker-list 10.128.90.111:29092,10.128.90.112:29092,10.128.90.113:29092 --topic mytest
建立消費者
bin/kafka-console-consumer.sh --bootstrap-server 10.128.90.111:29092,10.128.90.112:29092,10.128.90.113:29092 --topic mytest
這時在生產者發生消息,消費者窗口就能接收到消息了
例如
修改bin/kafka-server-start.sh
第一行加 export JAVA_HOME=/app/jdk1.8.0_131