springboot整合Kafka,使用zookeeper作服務治理

一.springboot自動配置方式整合kafka:git

springboot提供自動配置整合kafka的方式,須要作一下步驟:
1.引入kafka依賴包:github

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.7.RELEASE</version>
   </dependency>

2.在springboot配置中加入kafka相關配置,springboot啓動時候會自動加載這些配置,完成連接kafka,建立producer,consumer等。spring

spring:
  kafka:
    # kafka服務地址
    bootstrap-servers: 127.0.0.1:9092
    # 消費者配置
    consumer:
      bootstrap-servers: 127.0.0.1:9092
      group-id: myGroup
      enable-auto-commit: true
      auto-offset-reset: earliest
      auto-commit-interval: 1000
      max-poll-records: 10
    # 生產者配置
    producer:
      retries: 5
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1

3.消息發送端:bootstrap

@Component
public class MqProviderImpl{

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    @Override
    public void sendSkMessage(String message, Properties properties) {
        // 發送消息,註冊一個回調事件
        ListenableFuture<SendResult<String, String>> futureMessage = KafkaConfig.kafkaTemplateStatic.send("test_topic",
                message);

        futureMessage.addCallback(new ListenableFutureCallback<SendResult<String, String>>(){

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                log.info(" rev "+sendResult.getProducerRecord().value());
            }

            @Override
            public void onFailure(Throwable ex) {
                log.error(" error "+ex.getMessage());
            }
        });
    }}

4.消息消費端:springboot

@KafkaListener(topics = {"test_topic"})
    public void receiveSkMessageInfo(ConsumerRecord<String, String> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Acknowledgment ack) {
        log.info(record.value());
    }

以上實現是最簡單的方式,但使用springboot自動配置的方式,全部配置項必須事先寫好在在applicantion.yml的spring.kafka下面,試想在分佈式的場景中,若是某一項發生變更,每一個應用下面的配置都須要修改,這就須要將這些配置使用服務治理統一管理起來,這裏就須要一種自定義配置的方式來解決。app

springboot自動配置kafka是在KafkaAutoConfiguration這個類中實現的,它有一個成員KafkaProperties,這個properties中保存全部關於kafka的配置。分佈式

// 自動配置是在KafkaAutoConfiguration類實現的
@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class,
        KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {

    private final KafkaProperties properties;

KafkaProperties類的註解能夠看出,配置都是從yml裏的spring.kafka配置讀出來的ide

@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaProperties {

二.springboot手動配置方式整合kafka,使用zk作配置中心:ui

在分佈式的環境下,須要使用服務治理把yml裏的配置統一管理起來,這裏使用zookeeper來統一管理kafka的配置。若是將原有的配置放到zk中,來實現從zk上讀取配置,讓springboot接收到,這裏就須要從新定義kafka的配置類,不能使用原有的KafkaAutoConfiguration了。this

1.從zk上拉取配置,這裏使用噹噹開源的Config Toolkit,還自帶一個操做zk的管理界面,引入pom:

<dependency>
      <groupId>com.dangdang</groupId>
      <artifactId>config-toolkit</artifactId>
      <version>3.3.2-RELEASE</version>
    </dependency>

2.在yml中添加連接zk的配置,有這些配置才能保證應用能連接zk:

configs:
    # zk地址
    address: 192.168.1.30:2181
    # 保存應用配置的節點名
    env: /projectx/modulex
    version: 1
    # zk數據組
    groupdefault: groupdefault

3.下載噹噹的config-toolkit,訪問http://localhost:8080/,加入相關配置,github上有詳細說明。

4.新建一個ZKConfiguration類,實現EnvironmentAware接口,實現EnvironmentAware接口的setEnvironment能夠在項目啓動時設置項目的環境變量,能夠在這個類中結合config-toolkit,把zk的配置加載到項目環境變量當中:

@Component
    public class ZKConfiguration implements EnvironmentAware {
        @Autowired
        private Environment env;
        
        private static Map<String, GeneralConfigGroup> GROUPMAP = new HashMap<>();
        
        public ZKConfiguration() {
        }
        
        // 加載zk的基本配置
        @Bean
        public ZookeeperConfigProfile zookeeperConfigProfile() {
            ZookeeperConfigProfile configProfile = new ZookeeperConfigProfile(
                    Objects.requireNonNull(this.env.getProperty("configs.address")),
                    Objects.requireNonNull(this.env.getProperty("configs.env")),
                    this.env.getProperty("configs.version"));

            return configProfile;
        }

        //獲得具體組裏的配置
        @Bean({"groupPropDefault"})
        public GeneralConfigGroup generalConfigGroupDefault() {
            ZookeeperConfigProfile configProfile = this.zookeeperConfigProfile();
            GeneralConfigGroup group = new ZookeeperConfigGroup(configProfile, this.env.getProperty("configs.groupdefault"));
            return group;
        }

        /**
        * 獲取配置組
        * @return
        */
        public GeneralConfigGroup getConfigGroup(String group) {
            return GROUPMAP.get(group);
        }
        
        /**
        * 
        * 項目啓動時會調用這個方法,把zk裏的配置組存在臨時變量GROUPMAP裏,之後會用到
        * 因此 數據源初始化,就設置在這個方法裏
        * @param environment
        */
        @Override
        public void setEnvironment(Environment environment) {
            this.env = environment;
            ZookeeperConfigProfile configProfile = this.zookeeperConfigProfile();
            GROUPMAP.put("groupdefault", new ZookeeperConfigGroup(configProfile, this.env.getProperty("configs.groupdefault")));
        }}

5.得到全部配置項後,就是讓springboot去創建kafka連接了,這裏至關於要從新實現KafkaAutoConfiguration
的配置。創建一個KafkaConfig配置類,這裏主要是配置全部kafka須要的bean:

@ConditionalOnClass({KafkaTemplate.class})
    @EnableKafka
    public class KafkaConfig {
        // 把剛剛加載zk配置的類注入進來
        @Autowired
        private ZKConfiguration zkConfiguration;
        
    // 建立 消費者工廠
    @Bean("consumerFactory")
    @ConditionalOnMissingBean({ConsumerFactory.class})
    public ConsumerFactory<String, String> consumerFactory() {
        // 建立工廠須要三個參數:
        // 1. 消費者配置的map
        // 2. key的反序列化實現類
        // 3. value的反序列化實現類
        return new DefaultKafkaConsumerFactory<String, String>(makeKafkaConfig(), new StringDeserializer(), new StringDeserializer());
    }
    
    // 建立生產者工廠
    @Bean("producerFactory")
    @ConditionalOnMissingBean({ProducerFactory.class})
    public ProducerFactory<String, String> kafkaProducerFactory() {
        // 生產者工廠的參數如消費者工廠
        return new DefaultKafkaProducerFactory(makeKafkaConfig(), new StringSerializer(), new StringSerializer());
    }
    
    // 建立 kafkaTemplate 這個bean,有了這個bean才能在實際業務中使用kafka
    @Bean("kafkaTemplate")
    @ConditionalOnMissingBean({com.seckill.boot.common.util.KafkaTemplate.class})
    public KafkaTemplate<String, Protobufable> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<String, String> kafkaProducerFactory,
                                                          @Qualifier("producerListener") ProducerListener<String, Protobufable> producerListener) {
        KafkaTemplate<String, Protobufable> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
        kafkaTemplate.setProducerListener(producerListener);
        kafkaTemplate.setDefaultTopic("groupdefault");
        return kafkaTemplate;
    }

    @Bean("producerListener")
    @ConditionalOnMissingBean({ProducerListener.class})
    public ProducerListener<String, Protobufable> kafkaProducerListener() {
        return new LoggingProducerListener();
    }

    @Bean
    @ConditionalOnProperty(
            name = {"spring.kafka.producer.transaction-id-prefix"}
    )
    @ConditionalOnMissingBean
    public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
        return new KafkaTransactionManager(producerFactory);
    }
    
    // zk裏拿到的配置取出來
    private Map<String, Object> makeKafkaConfig() {
        // 得到配置的group 
        GeneralConfigGroup configGroup = zkConfiguration.getConfigGroup("groupdefault");

        Map<String, Object> kafkaConfig = new HashMap<>();
        kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configGroup.get("spring.kafka.bootstrap-servers"));
        kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG, configGroup.get("spring.kafka.consumer.group-id"));
        kafkaConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, configGroup.get("spring.kafka.consumer.auto-offset-reset"));
        kafkaConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, configGroup.get("spring.kafka.consumer.enable-auto-commit"));
        kafkaConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, configGroup.get("spring.kafka.consumer.auto-commit-interval"));
        kafkaConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, configGroup.get("spring.kafka.consumer.key-serializer"));
        kafkaConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, configGroup.get("spring.kafka.consumer.value-serializer"));
        kafkaConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, configGroup.get("spring.kafka.consumer.max-poll-records"));
        kafkaConfig.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, configGroup.get("spring.kafka.consumer.max-poll-interval-ms"));

        kafkaConfig.put("ack-mode", configGroup.get("spring.kafka.listener.ack-mode"));
        kafkaConfig.put("concurrency", configGroup.get("spring.kafka.listener.concurrency"));

        kafkaConfig.put(ProducerConfig.ACKS_CONFIG, configGroup.get("spring.kafka.producer.acks"));
        kafkaConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, configGroup.get("spring.kafka.producer.batch-size"));
        kafkaConfig.put(ProducerConfig.BUFFER_MEMORY_CONFIG, configGroup.get("spring.kafka.producer.buffer-memory"));
        kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, configGroup.get("spring.kafka.producer.key-serializer"));
        kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, configGroup.get("spring.kafka.producer.value-serializer"));
        kafkaConfig.put(ProducerConfig.RETRIES_CONFIG, configGroup.get("spring.kafka.producer.retries"));

        return kafkaConfig;
    }
}

6.將kafka須要的bean配置好後,就能在實際業務中使用KafkaTemplate操做消息了

@Component
    public class MqProviderImpl{
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
相關文章
相關標籤/搜索