手把手教你Spring Boot2.x整合kafka

首先得本身搭建一個kafka,搭建教程請自行百度,本人是使用docker搭建了一個單機版的zookeeper+kafka做爲演示,文末會有完整代碼包提供給你們下載參考

廢話很少說,教程開始

1、老規矩,先在pom.xml中添加kafka相關依賴

<dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
 </dependency>

2、在application.yml中添加相關配置

spring:
  #kafka配置
  kafka:
    #這裏改成你的kafka服務器ip和端口號
    bootstrap-servers: 10.24.19.237:9092
    #=============== producer  =======================
    producer:
      #若是該值大於零時,表示啓用重試失敗的發送次數
      retries: 0
      #每當多個記錄被髮送到同一分區時,生產者將嘗試將記錄一塊兒批量處理爲更少的請求,默認值爲16384(單位字節)
      batch-size: 16384
      #生產者可用於緩衝等待發送到服務器的記錄的內存總字節數,默認值爲3355443
      buffer-memory: 33554432
      #key的Serializer類,實現類實現了接口org.apache.kafka.common.serialization.Serializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #value的Serializer類,實現類實現了接口org.apache.kafka.common.serialization.Serializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    #=============== consumer  =======================
    consumer:
      #用於標識此使用者所屬的使用者組的惟一字符串
      group-id: test-consumer-group
      #當Kafka中沒有初始偏移量或者服務器上再也不存在當前偏移量時該怎麼辦,默認值爲latest,表示自動將偏移重置爲最新的偏移量
      #可選的值爲latest, earliest, none
      auto-offset-reset: earliest
      #消費者的偏移量將在後臺按期提交,默認值爲true
      enable-auto-commit: true
      #若是'enable-auto-commit'爲true,則消費者偏移自動提交給Kafka的頻率(以毫秒爲單位),默認值爲5000。
      auto-commit-interval: 100
      #密鑰的反序列化器類,實現類實現了接口org.apache.kafka.common.serialization.Deserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #值的反序列化器類,實現類實現了接口org.apache.kafka.common.serialization.Deserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3、添加操做kafka的工具類KafkaUtils.java(這裏我只是簡單的封裝了一些方法,你們能夠根據須要自行添加須要的方法)

package com.example.study.util;

import com.google.common.collect.Lists;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartitionInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

/**
 * 操做kafka的工具類
 *
 * @author 154594742@qq.com
 * @date 2021/3/2 14:52
 */

@Component
public class KafkaUtils {

    @Value("${spring.kafka.bootstrap-servers}")
    private String springKafkaBootstrapServers;

    private AdminClient adminClient;

    @Autowired
    private KafkaTemplate kafkaTemplate;


    /**
     * 初始化AdminClient
     * '@PostConstruct該註解被用來修飾一個非靜態的void()方法。
     * 被@PostConstruct修飾的方法會在服務器加載Servlet的時候運行,而且只會被服務器執行一次。
     * PostConstruct在構造函數以後執行,init()方法以前執行。
     */
    @PostConstruct
    private void initAdminClient() {
        Map<String, Object> props = new HashMap<>(1);
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, springKafkaBootstrapServers);
        adminClient = KafkaAdminClient.create(props);
    }

    /**
     * 新增topic,支持批量
     */
    public void createTopic(Collection<NewTopic> newTopics) {
        adminClient.createTopics(newTopics);
    }

    /**
     * 刪除topic,支持批量
     */
    public void deleteTopic(Collection<String> topics) {
        adminClient.deleteTopics(topics);
    }

    /**
     * 獲取指定topic的信息
     */
    public String getTopicInfo(Collection<String> topics) {
        AtomicReference<String> info = new AtomicReference<>("");
        try {
            adminClient.describeTopics(topics).all().get().forEach((topic, description) -> {
                for (TopicPartitionInfo partition : description.partitions()) {
                    info.set(info + partition.toString() + "\n");
                }
            });
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        return info.get();
    }

    /**
     * 獲取所有topic
     */
    public List<String> getAllTopic() {
        try {
            return adminClient.listTopics().listings().get().stream().map(TopicListing::name).collect(Collectors.toList());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        return Lists.newArrayList();
    }

    /**
     * 往topic中發送消息
     */
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }

}

4、添加KafkaController.java做爲kafka的demo類

package com.example.study.controller;

import com.example.study.model.vo.ResponseVo;
import com.example.study.util.BuildResponseUtils;
import com.example.study.util.KafkaUtils;
import com.google.common.collect.Lists;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.web.bind.annotation.*;

import java.util.List;

/**
 * kafka控制器
 *
 * @author 154594742@qq.com
 * @date 2021/3/2 15:01
 */

@RestController
@Api(tags = "Kafka控制器")
@Slf4j
public class KafkaController {

    @Autowired
    private KafkaUtils kafkaUtils;

    /**
     * 新增topic (支持批量,這裏就單個做爲演示)
     *
     * @param topic topic
     * @return ResponseVo
     */
    @ApiOperation("新增topic")
    @PostMapping("kafka")
    public ResponseVo<?> add(String topic) {
        NewTopic newTopic = new NewTopic(topic, 3, (short) 1);
        kafkaUtils.createTopic(Lists.newArrayList(newTopic));
        return BuildResponseUtils.success();
    }

    /**
     * 查詢topic信息 (支持批量,這裏就單個做爲演示)
     *
     * @param topic 自增主鍵
     * @return ResponseVo
     */
    @ApiOperation("查詢topic信息")
    @GetMapping("kafka/{topic}")
    public ResponseVo<String> getBytTopic(@PathVariable String topic) {
        return BuildResponseUtils.buildResponse(kafkaUtils.getTopicInfo(Lists.newArrayList(topic)));
    }

    /**
     * 刪除topic (支持批量,這裏就單個做爲演示)
     * (注意:若是topic正在被監聽會給人感受刪除不掉(但實際上是刪除掉後又會被建立))
     *
     * @param topic topic
     * @return ResponseVo
     */
    @ApiOperation("刪除topic")
    @DeleteMapping("kafka/{topic}")
    public ResponseVo<?> delete(@PathVariable String topic) {
        kafkaUtils.deleteTopic(Lists.newArrayList(topic));
        return BuildResponseUtils.success();
    }

    /**
     * 查詢全部topic
     *
     * @return ResponseVo
     */
    @ApiOperation("查詢全部topic")
    @GetMapping("kafka/allTopic")
    public ResponseVo<List<String>> getAllTopic() {
        return BuildResponseUtils.buildResponse(kafkaUtils.getAllTopic());
    }

    /**
     * 生產者往topic中發送消息demo
     *
     * @param topic
     * @param message
     * @return
     */
    @ApiOperation("往topic發送消息")
    @PostMapping("kafka/message")
    public ResponseVo<?> sendMessage(String topic, String message) {
        kafkaUtils.sendMessage(topic, message);
        return BuildResponseUtils.success();
    }

    /**
     * 消費者示例demo
     * <p>
     * 基於註解監聽多個topic,消費topic中消息
     * (注意:若是監聽的topic不存在則會自動建立)
     */
    @KafkaListener(topics = {"topic1", "topic2", "topic3"})
    public void consume(String message) {
        log.info("receive msg: " + message);
    }
}

5、運行項目,而後訪問 http://localhost:8080/swagger-ui.html 測試一下效果吧

這三個topic原本是不存在的,這裏是由@KafkaListener註解方式監聽時自動建立的

一、咱們來新增一個名爲‘myTopic’的topic試試

二、再從新查詢一下全部的topic發現咱們新增topic成功了

三、接下來咱們試試刪除一下myTopic和由@KafkaListener註解方式監聽時自動建立的topic1


四、咱們再查詢一下全部的topic發現‘myTopic’被刪除掉了,可是‘topic1’並無被刪除掉,緣由就是由於‘topic1’正在被監聽,刪除掉後又會被自動建立,正如我代碼的註釋中的說明同樣

6、咱們來試試咱們最關心也是最經常使用的功能,做爲生產者發送消息到topic以及做爲消費者消費topic中的消息

一、咱們先把控制檯的日誌先清除一下方便待會兒查看效果

二、咱們往正在被監聽的"topic1"、"topic2"、"topic3"中的任意一個發送測試消息

三、查看IDEA控制檯發現topic中的消息被監聽消費到了,大功告成

附上完整代碼包供你們學習參考,若是對你有幫助,請給個關注或者點個贊吧! 點擊下載完整代碼包

相關文章
相關標籤/搜索