本文主要講在springboot2中,如何經過自定義的配置來集成,並能夠比較好的擴展性,同時集成多個kafka集羣java
引入kafka的依賴web
<!-- kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
添加配置文件,默認添加一個kafka的集羣,spring
topinfo: # kafka集羣配置 ,bootstrap-servers 是必須的 kafka: # 生產者的kafka集羣地址 bootstrap-servers: 192.168.90.225:9092,192.168.90.226:9092,192.168.90.227:9092 producer: topic-name: topinfo-01 consumer: group-id: ci-data
若是多個,則配置多個kafka的集羣配置便可apache
添加對應的屬性配置類,若是是多個kafka集羣,則能夠填多個便可,注意對應的@ConfigurationProperties。bootstrap
package com.topinfo.ci.dataex.config; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import com.topinfo.ci.dataex.bean.Consumer; import com.topinfo.ci.dataex.bean.Producer; /** * @Description: kafka 屬性配置 * @Author:楊攀 * @Since:2019年7月10日上午10:35:18 */ @ConfigurationProperties(prefix = "topinfo.kafka") @Component public class KafKaConfiguration { /** * @Fields bootstrapServer : 集羣的地址 */ private String bootstrapServers; private Producer producer; private Consumer consumer; public String getBootstrapServers() { return bootstrapServers; } public void setBootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; } public Producer getProducer() { return producer; } public void setProducer(Producer producer) { this.producer = producer; } public Consumer getConsumer() { return consumer; } public void setConsumer(Consumer consumer) { this.consumer = consumer; } }
kafka的配置類中, 主要注意的方法:數組
生產者工廠方法: producerFactory()
生產者KafkaTemplate :kafkaTemplate()緩存
消費者的工廠方法:consumerFactory()
消費者的監聽容器工廠方法: kafkaListenerContainerFactory()springboot
若是對應的是對個集羣,須要多配置幾個對應的這幾個方法便可。服務器
package com.topinfo.ci.dataex.config; import java.util.HashMap; import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; /** * @Description: kafka配置類 * @Author:楊攀 * @Since:2019年7月10日下午3:06:58 */ @Configuration public class KafKaConfig { @Autowired private KafKaConfiguration configuration; /** * @Description: 生產者的配置 * @Author:楊攀 * @Since: 2019年7月10日下午1:41:06 * @return */ public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<String, Object>(); // 集羣的服務器地址 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getBootstrapServers()); // 消息緩存 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960); // 生產者空間不足時,send()被阻塞的時間,默認60s props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 6000); // 生產者重試次數 props.put(ProducerConfig.RETRIES_CONFIG, 0); // 指定ProducerBatch(消息累加器中BufferPool中的)可複用大小 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); // 生產者會在ProducerBatch被填滿或者等待超過LINGER_MS_CONFIG時發送 props.put(ProducerConfig.LINGER_MS_CONFIG, 1); // key 和 value 的序列化 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 客戶端id props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.topinfo"); return props; } /** * @Description: 生產者工廠 * @Author:楊攀 * @Since: 2019年7月10日下午2:10:04 * @return */ @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<String, String>(producerConfigs()); } /** * @Description: KafkaTemplate * @Author:楊攀 * @Since: 2019年7月10日下午2:10:47 * @return */ @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } // ------------------------------------------------------------------------------------------------------------ /** * @Description: 消費者配置 * @Author:楊攀 * @Since: 2019年7月10日下午1:48:36 * @return */ public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<String, Object>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getBootstrapServers()); // 消費者組 props.put(ConsumerConfig.GROUP_ID_CONFIG, configuration.getConsumer().getGroupId()); // 自動位移提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 自動位移提交間隔時間 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100); // 消費組失效超時時間 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); // 位移丟失和位移越界後的恢復起始位置 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // key 和 value 的反序列化 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); return props; } /** * @Description: 消費者工廠 * @Author:楊攀 * @Since: 2019年7月10日下午2:14:13 * @return */ @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } /** * @Description: kafka 監聽容器工廠 * @Author:楊攀 * @Since: 2019年7月10日下午2:50:44 * @return */ @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); // 設置消費者工廠 factory.setConsumerFactory(consumerFactory()); // 要建立的消費者數量(10 個線程併發處理) factory.setConcurrency(10); return factory; } }
主要是能夠對主題進行管理。新增,修改,刪除等併發
package com.topinfo.ci.dataex.config; import java.util.HashMap; import java.util.Map; import java.util.Properties; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; /** * @Description: kafka 主題 配置類 * @Author:楊攀 * @Since:2019年7月10日下午3:06:58 */ @Configuration public class KafKaTopicConfig { @Autowired private KafKaConfiguration configuration; /** *@Description: kafka管理員,委派給AdminClient以建立在應用程序上下文中定義的主題的管理員。 *@Author:楊攀 *@Since: 2019年7月10日下午3:14:23 *@return */ @Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> props = new HashMap<>(); // 配置Kafka實例的鏈接地址 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, configuration.getBootstrapServers()); KafkaAdmin admin = new KafkaAdmin(props); return admin; } /** *@Description: kafka的管理客戶端,用於建立、修改、刪除主題等 *@Author:楊攀 *@Since: 2019年7月10日下午3:15:01 *@return */ @Bean public AdminClient adminClient() { return AdminClient.create(kafkaAdmin().getConfig()); } /** * @Description: 建立一個新的 topinfo 的Topic,若是kafka中topinfo 的topic已經存在,則忽略。 * @Author:楊攀 * @Since: 2019年7月10日上午11:13:28 * @return */ @Bean public NewTopic topinfo() { // 主題名稱 String topicName = configuration.getProducer().getTopicName(); // 第二個參數是分區數, 第三個參數是副本數量,確保集羣中配置的數目大於等於副本數量 return new NewTopic(topicName, 2, (short) 2); } }
生產者在發送消息的時候,使用對應的kafkaTemplate便可,若是是多個,須要注意導入的是對應的kafkaTemplate。
package com.topinfo.ci.dataex.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.topinfo.ci.dataex.config.KafKaConfig; @RestController @RequestMapping("kafka") public class TestKafKaProducerController { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @RequestMapping("send") public String send(String name) { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("topinfo", name); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { System.out.println("生產者-發送消息成功:" + result.toString()); } @Override public void onFailure(Throwable ex) { System.out.println("生產者-發送消息失敗:" + ex.getMessage()); } }); return "test-ok"; } }
消費者須要在接收的方法上添加@KafkaListener,用於監聽對應的topic,能夠配置topic多個。
package com.topinfo.ci.dataex.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import com.topinfo.ci.dataex.config.KafKaConfig; /** * @Description: kafka消費者 * @Author:楊攀 * @Since:2019年7月10日上午11:24:31 */ @Component public class KafKaConsumer { private final Logger logger = LoggerFactory.getLogger(KafKaConsumer.class); /** * @Description: 能夠同時訂閱多主題,只需按數組格式便可,也就是用「,」隔開 * @Author:楊攀 * @Since: 2019年7月10日上午11:26:16 * @param record */ @KafkaListener(topics = { "topinfo" }) public void receive(ConsumerRecord<?, ?> record) { logger.info("消費獲得的消息---key: " + record.key()); logger.info("消費獲得的消息---value: " + record.value().toString()); } }
若是多個集羣的狀況下,須要在KafkaListener監聽註解上添加containerFactory,對應配置中的監聽容器工廠。
/** * @Description: 能夠同時訂閱多主題,只需按數組格式便可,也就是用「,」隔開 * @Author:楊攀 * @Since: 2019年7月10日上午11:26:16 * @param record */ @KafkaListener(topics = { "topinfo" }, containerFactory = "kafkaListenerContainerFactory") public void receive(ConsumerRecord<?, ?> record) { logger.info("消費獲得的消息---key: " + record.key()); logger.info("消費獲得的消息---value: " + record.value().toString()); }
好了, 至此全部的配置就差很少了。
最後還有一項, 看到下面的綠色按鈕沒,來,點一下,乖! O(∩_∩)O哈哈~ ...