環境:騰訊雲centos7java
一、下載git
http://mirror.bit.edu.cn/apache/kafka/2.3.0/kafka_2.11-2.3.0.tgz
二、解壓github
tar -xvf kafka_2.11-2.3.0.tgz mv kafka_2.11-2.3.0 /usr/java/kafka2.11 cd /usr/java/kafka2.11
三、啓動與測試web
(a)zookeeper啓動 bin/zookeeper-server-start.sh config/zookeeper.properties (b)kafka服務端啓動 bin/kafka-server-start.sh config/server.properties (c)列出topic bin/kafka-topics.sh --zookeeper localhost:2181 --list (d)建立topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Demo1 (e)描述Topic bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Demo1 (f)發佈消息到指定的Topic bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Demo1 (g)消費指定Topic上的消息 (已過期,老版本使用,不然報zookeeper is not a recognized option) bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic Demo1 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic Demo1 --from-beginning
四、安裝kafka web界面spring
a)下載地址:https://github.com/quantifind/KafkaOffsetMonitor/releases/download/v0.2.1/KafkaOffsetMonitor-assembly-0.2.1.jarapache
b) 運行bootstrap
mkdir /mydata/kafkamonitorlogs java -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m -cp KafkaOffsetMonitor-assembly-0.2.1.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --zk 132.232.44.82:2181 \ --port 8787 \ --refresh 10.seconds \ --retain 7.days 1>/mydata/kafkamonitorlogs/stdout.log 2>/mydata/kafkamonitorlogs/stderr.log &
c) web訪問 centos
http://ip:8787
本人虛擬機內存過小了,因此沒法查看到消息列表,可是web界面確實能夠用!springboot
完畢!app
########springboot集成實踐###########
一、pom.xml添加依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
二、yml文件添加配置
spring: profiles: active: @activatedProperties@ kafka: bootstrap-servers: 132.232.44.82:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: test enable-auto-commit: true auto-commit-interval: 1000 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
三、在Kafka的config/server.properties文件中添加
advertised.listeners=PLAINTEXT://132.232.44.89:9092
四、KafkaConsumer.java
package com.cn.commodity.controller; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * kafka消費者測試 */ @Component public class KafkaConsumer { @KafkaListener(topics = "test_topic1") public void listen (ConsumerRecord<?, ?> record) throws Exception { System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value()); } }
五、KafkaProducer.java
package com.cn.commodity.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * 測試kafka生產者 */ @RestController @RequestMapping("kafka") public class KafkaProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @RequestMapping("send") public String send(String msg){ kafkaTemplate.send("test_topic1", msg); return "success"; } }
啓動運行,完畢!