# 切換到目錄 cd /app # 獲取kafka最新安裝包,這邊使用的是鏡像地址,能夠去官方網站得到最新地址版本號使用kafka_2.11-1.1.0.tgz,能夠本身下載上傳上去 wget http://mirrors.hust.edu.cn/apache/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz # 解壓軟件包 tar -zxvf kafka_2.11-1.1.0.tgz # 建立軟鏈接 ln -s kafka_2.11-1.1.0 kafka # 進入配置文件 cd /app/kafka/config mkdir -p /app/kafka/logdirs # 啓動kafka以前先保證zookeeper是啓動的,保證java環境是安裝的,修改丟應的配置 vim server.properties # 啓動kafka服務 /app/kafka/bin/kafka-server-start.sh -daemon /app/kafka/config/server.properties # 添加主題 bin/kafka-topics.sh --create --zookeeper 172.16.48.129:2181 --replication-factor 1 --partitions 1 --topic test # 獲得Created topic "test"成功的提示 # 添加生產者 bin/kafka-console-producer.sh --broker-list 172.16.48.129:9092 --topic test # 輸入文字:This is a message # 添加生產者 bin/kafka-console-consumer.sh --bootstrap-server 172.16.48.129:9092 --topic test --from-beginning # 收到文字:This is another message # 生成者發送消息,消費者收到消息便可
配置kafka的server.properties的配置文件 # 修改zookeeper鏈接地址 zookeeper.connect=172.16.48.129:2181 # 修改kafka日誌地址 log.dirs=/app/kafka/logdirs # 在broker.id=0下面添加端口以及ip綁定的功能,內網ip地址 host.name=172.16.48.129
# 設定服務標記id,每一臺的id須要不相同當前分別對應一、二、3 broker.id=1 # 設定綁定的host名稱,綁定對應本身的內網ip地址 host.name=172.19.131.247 # 設定主體同步數量和集羣數量相同 offsets.topic.replication.factor=3 transaction.state.log.replication.factor=3 transaction.state.log.min.isr=3 # 配置參數設定在log.retention.hours下面 message.max.byte=5242880 default.replication.factor=3 replica.fetch.max.bytes=5242880 # zookeeper配置地址 zookeeper.connect=172.16.48.129:2181,172.16.48.130:2181,172.16.48.131:2181 # 容許刪除主題 delete.topic.enable=true
虛擬機1 | 虛擬機2 | 虛擬機3 |
---|---|---|
172.16.48.129 | 172.16.48.130 | 172.16.48.131 |
kafka1 | kafka2 | kafka3 |
# 建立測試主題 /app/kafka/bin/kafka-topics.sh --create --zookeeper 172.16.48.129:2181,172.16.48.130:2181,172.16.48.131:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic # 結果:Created topic "my-replicated-topic". # 檢查主題狀態 /app/kafka/bin/kafka-topics.sh --describe --zookeeper 172.16.48.129:2181,172.16.48.130:2181,172.16.48.131:2181 localhost:2181 --topic my-replicated-topic # 結果:Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: # Topic: my-replicated-topic Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 # 一個生產者 /app/kafka/bin/kafka-console-producer.sh --broker-list 172.16.48.129:9092,172.16.48.130:9092,172.16.48.131:9092 --topic my-replicated-topic # 三個消費者: /app/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.16.48.129:9092,172.16.48.130:9092,172.16.48.131:9092 --from-beginning --topic my-replicated-topic
經過監控集羣以及生產和消費,來判斷高可用狀況的測試java
# 經過主題狀態監控集羣的leader和Replicas和Isr /app/kafka/bin/kafka-topics.sh --describe --zookeeper 172.16.48.129:2181,172.16.48.130:2181,172.16.48.131:2181 localhost:2181 --topic my-replicated-topic # 分別中止leader的服務的kafka服務,來檢查是否能夠生產和消費的狀況 /app/kafka/bin/kafka-server-start.sh -daemon /app/kafka/config/server.properties /app/kafka/bin/kafka-server-stop.sh
結論:kafka環境在zookeeper沒有掛的狀況下,容許只剩下一臺還可以工做。apache
經常使用配置:/app/kafka/configbootstrap
日誌路徑:/app/kafka/logsvim