SpringBoot + KafKa集羣的集成

簡介

本文主要講在springboot2中,如何經過自定義的配置來集成,並能夠比較好的擴展性,同時集成多個kafka集羣java

引入依賴

引入kafka的依賴web

<!-- kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

配置文件

添加配置文件,默認添加一個kafka的集羣,spring

topinfo:
     # kafka集羣配置 ,bootstrap-servers 是必須的
   kafka:
      # 生產者的kafka集羣地址
      bootstrap-servers:  192.168.90.225:9092,192.168.90.226:9092,192.168.90.227:9092 
      producer: 
         topic-name:  topinfo-01
         
      consumer:
         group-id:  ci-data

若是多個,則配置多個kafka的集羣配置便可apache

添加屬性配置類

添加對應的屬性配置類,若是是多個kafka集羣,則能夠填多個便可,注意對應的@ConfigurationProperties。bootstrap

package com.topinfo.ci.dataex.config;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import com.topinfo.ci.dataex.bean.Consumer;
import com.topinfo.ci.dataex.bean.Producer;

/**
 * @Description: kafka 屬性配置
 * @Author:楊攀
 * @Since:2019年7月10日上午10:35:18
 */
@ConfigurationProperties(prefix = "topinfo.kafka")
@Component
public class KafKaConfiguration {

    /**
     * @Fields bootstrapServer : 集羣的地址
     */
    private String bootstrapServers;

    private Producer producer;

    private Consumer consumer;

    public String getBootstrapServers() {
        return bootstrapServers;
    }

    public void setBootstrapServers(String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
    }

    public Producer getProducer() {
        return producer;
    }

    public void setProducer(Producer producer) {
        this.producer = producer;
    }

    public Consumer getConsumer() {
        return consumer;
    }

    public void setConsumer(Consumer consumer) {
        this.consumer = consumer;
    }

}

添加kafka配置類

kafka的配置類中, 主要注意的方法:數組

生產者工廠方法: producerFactory()
生產者KafkaTemplate :kafkaTemplate()緩存

消費者的工廠方法:consumerFactory()
消費者的監聽容器工廠方法: kafkaListenerContainerFactory()springboot

若是對應的是對個集羣,須要多配置幾個對應的這幾個方法便可。服務器

package com.topinfo.ci.dataex.config;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

/**
 * @Description: kafka配置類
 * @Author:楊攀
 * @Since:2019年7月10日下午3:06:58
 */
@Configuration
public class KafKaConfig {

    @Autowired
    private KafKaConfiguration configuration;

     
    
    /**
     * @Description: 生產者的配置
     * @Author:楊攀
     * @Since: 2019年7月10日下午1:41:06
     * @return
     */
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<String, Object>();
        // 集羣的服務器地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getBootstrapServers());
        //  消息緩存
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
        // 生產者空間不足時,send()被阻塞的時間,默認60s
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000);
        // 生產者重試次數
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        // 指定ProducerBatch(消息累加器中BufferPool中的)可複用大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,  4096);
        // 生產者會在ProducerBatch被填滿或者等待超過LINGER_MS_CONFIG時發送
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // key 和 value 的序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        // 客戶端id
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.topinfo");

        return props;
    }

    /**
     * @Description: 生產者工廠
     * @Author:楊攀
     * @Since: 2019年7月10日下午2:10:04
     * @return
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<String, String>(producerConfigs());
    }

    /**
     * @Description: KafkaTemplate
     * @Author:楊攀
     * @Since: 2019年7月10日下午2:10:47
     * @return
     */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }


    // ------------------------------------------------------------------------------------------------------------

    /**
     * @Description: 消費者配置
     * @Author:楊攀
     * @Since: 2019年7月10日下午1:48:36
     * @return
     */
    public Map<String, Object> consumerConfigs() {

        Map<String, Object> props = new HashMap<String, Object>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getBootstrapServers());
        // 消費者組
        props.put(ConsumerConfig.GROUP_ID_CONFIG, configuration.getConsumer().getGroupId());
        // 自動位移提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        // 自動位移提交間隔時間
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        // 消費組失效超時時間
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
        // 位移丟失和位移越界後的恢復起始位置
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        // key 和 value 的反序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");

        return props;
    }

    /**
     * @Description: 消費者工廠
     * @Author:楊攀
     * @Since: 2019年7月10日下午2:14:13
     * @return
     */
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    /**
     * @Description: kafka 監聽容器工廠
     * @Author:楊攀
     * @Since: 2019年7月10日下午2:50:44
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        // 設置消費者工廠
        factory.setConsumerFactory(consumerFactory());
        // 要建立的消費者數量(10 個線程併發處理)
        factory.setConcurrency(10);

        return factory;
    }

}

主題配置類

主要是能夠對主題進行管理。新增,修改,刪除等併發

package com.topinfo.ci.dataex.config;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

/**
 * @Description: kafka 主題 配置類
 * @Author:楊攀
 * @Since:2019年7月10日下午3:06:58
 */
@Configuration
public class KafKaTopicConfig {

    @Autowired
    private KafKaConfiguration configuration;

    /**
     *@Description: kafka管理員,委派給AdminClient以建立在應用程序上下文中定義的主題的管理員。
     *@Author:楊攀
     *@Since: 2019年7月10日下午3:14:23
     *@return
     */
    @Bean
    public KafkaAdmin kafkaAdmin() {
        
        Map<String, Object> props = new HashMap<>();
        
        // 配置Kafka實例的鏈接地址
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getBootstrapServers());
        KafkaAdmin admin = new KafkaAdmin(props);
        return admin;
    }

    /**
     *@Description: kafka的管理客戶端,用於建立、修改、刪除主題等
     *@Author:楊攀
     *@Since: 2019年7月10日下午3:15:01
     *@return
     */
    @Bean
    public AdminClient adminClient() {
        return AdminClient.create(kafkaAdmin().getConfig());
    }
    
    /**
     * @Description: 建立一個新的 topinfo 的Topic,若是kafka中topinfo 的topic已經存在,則忽略。
     * @Author:楊攀
     * @Since: 2019年7月10日上午11:13:28
     * @return
     */
    @Bean
    public NewTopic topinfo() {

        // 主題名稱
        String topicName = configuration.getProducer().getTopicName();
        // 第二個參數是分區數, 第三個參數是副本數量,確保集羣中配置的數目大於等於副本數量
        return new NewTopic(topicName, 2, (short) 2);
    }

}

生產者測試

生產者在發送消息的時候,使用對應的kafkaTemplate便可,若是是多個,須要注意導入的是對應的kafkaTemplate。

package com.topinfo.ci.dataex.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.topinfo.ci.dataex.config.KafKaConfig;

@RestController
@RequestMapping("kafka")
public class TestKafKaProducerController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    @RequestMapping("send")
    public String send(String name) {
        
        ListenableFuture<SendResult<String, String>>  future = kafkaTemplate.send("topinfo", name);
        
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println("生產者-發送消息成功:" + result.toString());
            }

            @Override
            public void onFailure(Throwable ex) {
                System.out.println("生產者-發送消息失敗:" + ex.getMessage());
            }
        });
        
        
        return "test-ok";
    }
    
}

消費者測試

消費者須要在接收的方法上添加@KafkaListener,用於監聽對應的topic,能夠配置topic多個。

package com.topinfo.ci.dataex.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import com.topinfo.ci.dataex.config.KafKaConfig;

/**
 * @Description: kafka消費者
 * @Author:楊攀
 * @Since:2019年7月10日上午11:24:31
 */
@Component
public class KafKaConsumer {

    private final Logger logger = LoggerFactory.getLogger(KafKaConsumer.class);

    
    /**
     * @Description: 能夠同時訂閱多主題,只需按數組格式便可,也就是用「,」隔開
     * @Author:楊攀
     * @Since: 2019年7月10日上午11:26:16
     * @param record
     */
    @KafkaListener(topics = { "topinfo" })
    public void receive(ConsumerRecord<?, ?> record) {

        logger.info("消費獲得的消息---key: " + record.key());
        logger.info("消費獲得的消息---value: " + record.value().toString());
        
    }

}

若是多個集羣的狀況下,須要在KafkaListener監聽註解上添加containerFactory,對應配置中的監聽容器工廠。

/**
     * @Description: 能夠同時訂閱多主題,只需按數組格式便可,也就是用「,」隔開
     * @Author:楊攀
     * @Since: 2019年7月10日上午11:26:16
     * @param record
     */
    @KafkaListener(topics = { "topinfo" }, containerFactory = "kafkaListenerContainerFactory")
    public void receive(ConsumerRecord<?, ?> record) {

        logger.info("消費獲得的消息---key: " + record.key());
        logger.info("消費獲得的消息---value: " + record.value().toString());
        
    }

好了, 至此全部的配置就差很少了。

最後還有一項, 看到下面的綠色按鈕沒,來,點一下,乖! O(∩_∩)O哈哈~ ...

相關文章
相關標籤/搜索