<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
spring: #kafka配置 kafka: #這裏改成你的kafka服務器ip和端口號 bootstrap-servers: 10.24.19.237:9092 #=============== producer ======================= producer: #若是該值大於零時,表示啓用重試失敗的發送次數 retries: 0 #每當多個記錄被髮送到同一分區時,生產者將嘗試將記錄一塊兒批量處理爲更少的請求,默認值爲16384(單位字節) batch-size: 16384 #生產者可用於緩衝等待發送到服務器的記錄的內存總字節數,默認值爲3355443 buffer-memory: 33554432 #key的Serializer類,實現類實現了接口org.apache.kafka.common.serialization.Serializer key-serializer: org.apache.kafka.common.serialization.StringSerializer #value的Serializer類,實現類實現了接口org.apache.kafka.common.serialization.Serializer value-serializer: org.apache.kafka.common.serialization.StringSerializer #=============== consumer ======================= consumer: #用於標識此使用者所屬的使用者組的惟一字符串 group-id: test-consumer-group #當Kafka中沒有初始偏移量或者服務器上再也不存在當前偏移量時該怎麼辦,默認值爲latest,表示自動將偏移重置爲最新的偏移量 #可選的值爲latest, earliest, none auto-offset-reset: earliest #消費者的偏移量將在後臺按期提交,默認值爲true enable-auto-commit: true #若是'enable-auto-commit'爲true,則消費者偏移自動提交給Kafka的頻率(以毫秒爲單位),默認值爲5000。 auto-commit-interval: 100 #密鑰的反序列化器類,實現類實現了接口org.apache.kafka.common.serialization.Deserializer key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #值的反序列化器類,實現類實現了接口org.apache.kafka.common.serialization.Deserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
package com.example.study.util; import com.google.common.collect.Lists; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.TopicPartitionInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** * 操做kafka的工具類 * * @author 154594742@qq.com * @date 2021/3/2 14:52 */ @Component public class KafkaUtils { @Value("${spring.kafka.bootstrap-servers}") private String springKafkaBootstrapServers; private AdminClient adminClient; @Autowired private KafkaTemplate kafkaTemplate; /** * 初始化AdminClient * '@PostConstruct該註解被用來修飾一個非靜態的void()方法。 * 被@PostConstruct修飾的方法會在服務器加載Servlet的時候運行,而且只會被服務器執行一次。 * PostConstruct在構造函數以後執行,init()方法以前執行。 */ @PostConstruct private void initAdminClient() { Map<String, Object> props = new HashMap<>(1); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, springKafkaBootstrapServers); adminClient = KafkaAdminClient.create(props); } /** * 新增topic,支持批量 */ public void createTopic(Collection<NewTopic> newTopics) { adminClient.createTopics(newTopics); } /** * 刪除topic,支持批量 */ public void deleteTopic(Collection<String> topics) { adminClient.deleteTopics(topics); } /** * 獲取指定topic的信息 */ public String getTopicInfo(Collection<String> topics) { AtomicReference<String> info = new AtomicReference<>(""); try { adminClient.describeTopics(topics).all().get().forEach((topic, description) -> { for (TopicPartitionInfo partition : description.partitions()) { info.set(info + partition.toString() + "\n"); } }); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } return info.get(); } /** * 獲取所有topic */ public List<String> getAllTopic() { try { return adminClient.listTopics().listings().get().stream().map(TopicListing::name).collect(Collectors.toList()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } return Lists.newArrayList(); } /** * 往topic中發送消息 */ public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); } }
package com.example.study.controller; import com.example.study.model.vo.ResponseVo; import com.example.study.util.BuildResponseUtils; import com.example.study.util.KafkaUtils; import com.google.common.collect.Lists; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.web.bind.annotation.*; import java.util.List; /** * kafka控制器 * * @author 154594742@qq.com * @date 2021/3/2 15:01 */ @RestController @Api(tags = "Kafka控制器") @Slf4j public class KafkaController { @Autowired private KafkaUtils kafkaUtils; /** * 新增topic (支持批量,這裏就單個做爲演示) * * @param topic topic * @return ResponseVo */ @ApiOperation("新增topic") @PostMapping("kafka") public ResponseVo<?> add(String topic) { NewTopic newTopic = new NewTopic(topic, 3, (short) 1); kafkaUtils.createTopic(Lists.newArrayList(newTopic)); return BuildResponseUtils.success(); } /** * 查詢topic信息 (支持批量,這裏就單個做爲演示) * * @param topic 自增主鍵 * @return ResponseVo */ @ApiOperation("查詢topic信息") @GetMapping("kafka/{topic}") public ResponseVo<String> getBytTopic(@PathVariable String topic) { return BuildResponseUtils.buildResponse(kafkaUtils.getTopicInfo(Lists.newArrayList(topic))); } /** * 刪除topic (支持批量,這裏就單個做爲演示) * (注意:若是topic正在被監聽會給人感受刪除不掉(但實際上是刪除掉後又會被建立)) * * @param topic topic * @return ResponseVo */ @ApiOperation("刪除topic") @DeleteMapping("kafka/{topic}") public ResponseVo<?> delete(@PathVariable String topic) { kafkaUtils.deleteTopic(Lists.newArrayList(topic)); return BuildResponseUtils.success(); } /** * 查詢全部topic * * @return ResponseVo */ @ApiOperation("查詢全部topic") @GetMapping("kafka/allTopic") public ResponseVo<List<String>> getAllTopic() { return BuildResponseUtils.buildResponse(kafkaUtils.getAllTopic()); } /** * 生產者往topic中發送消息demo * * @param topic * @param message * @return */ @ApiOperation("往topic發送消息") @PostMapping("kafka/message") public ResponseVo<?> sendMessage(String topic, String message) { kafkaUtils.sendMessage(topic, message); return BuildResponseUtils.success(); } /** * 消費者示例demo * <p> * 基於註解監聽多個topic,消費topic中消息 * (注意:若是監聽的topic不存在則會自動建立) */ @KafkaListener(topics = {"topic1", "topic2", "topic3"}) public void consume(String message) { log.info("receive msg: " + message); } }