一.springboot自動配置方式整合kafka:git
springboot提供自動配置整合kafka的方式,須要作一下步驟:
1.引入kafka依賴包:github
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.7.RELEASE</version> </dependency>
2.在springboot配置中加入kafka相關配置,springboot啓動時候會自動加載這些配置,完成連接kafka,建立producer,consumer等。spring
spring: kafka: # kafka服務地址 bootstrap-servers: 127.0.0.1:9092 # 消費者配置 consumer: bootstrap-servers: 127.0.0.1:9092 group-id: myGroup enable-auto-commit: true auto-offset-reset: earliest auto-commit-interval: 1000 max-poll-records: 10 # 生產者配置 producer: retries: 5 batch-size: 16384 buffer-memory: 33554432 acks: 1
3.消息發送端:bootstrap
@Component public class MqProviderImpl{ @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Override public void sendSkMessage(String message, Properties properties) { // 發送消息,註冊一個回調事件 ListenableFuture<SendResult<String, String>> futureMessage = KafkaConfig.kafkaTemplateStatic.send("test_topic", message); futureMessage.addCallback(new ListenableFutureCallback<SendResult<String, String>>(){ @Override public void onSuccess(SendResult<String, String> sendResult) { log.info(" rev "+sendResult.getProducerRecord().value()); } @Override public void onFailure(Throwable ex) { log.error(" error "+ex.getMessage()); } }); }}
4.消息消費端:springboot
@KafkaListener(topics = {"test_topic"}) public void receiveSkMessageInfo(ConsumerRecord<String, String> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Acknowledgment ack) { log.info(record.value()); }
以上實現是最簡單的方式,但使用springboot自動配置的方式,全部配置項必須事先寫好在在applicantion.yml的spring.kafka下面,試想在分佈式的場景中,若是某一項發生變更,每一個應用下面的配置都須要修改,這就須要將這些配置使用服務治理統一管理起來,這裏就須要一種自定義配置的方式來解決。app
springboot自動配置kafka是在KafkaAutoConfiguration這個類中實現的,它有一個成員KafkaProperties,這個properties中保存全部關於kafka的配置。分佈式
// 自動配置是在KafkaAutoConfiguration類實現的 @Configuration @ConditionalOnClass(KafkaTemplate.class) @EnableConfigurationProperties(KafkaProperties.class) @Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class }) public class KafkaAutoConfiguration { private final KafkaProperties properties;
KafkaProperties類的註解能夠看出,配置都是從yml裏的spring.kafka配置讀出來的ide
@ConfigurationProperties(prefix = "spring.kafka") public class KafkaProperties {
二.springboot手動配置方式整合kafka,使用zk作配置中心:ui
在分佈式的環境下,須要使用服務治理把yml裏的配置統一管理起來,這裏使用zookeeper來統一管理kafka的配置。若是將原有的配置放到zk中,來實現從zk上讀取配置,讓springboot接收到,這裏就須要從新定義kafka的配置類,不能使用原有的KafkaAutoConfiguration了。this
1.從zk上拉取配置,這裏使用噹噹開源的Config Toolkit,還自帶一個操做zk的管理界面,引入pom:
<dependency> <groupId>com.dangdang</groupId> <artifactId>config-toolkit</artifactId> <version>3.3.2-RELEASE</version> </dependency>
2.在yml中添加連接zk的配置,有這些配置才能保證應用能連接zk:
configs: # zk地址 address: 192.168.1.30:2181 # 保存應用配置的節點名 env: /projectx/modulex version: 1 # zk數據組 groupdefault: groupdefault
3.下載噹噹的config-toolkit,訪問http://localhost:8080/,加入相關配置,github上有詳細說明。
4.新建一個ZKConfiguration類,實現EnvironmentAware接口,實現EnvironmentAware接口的setEnvironment能夠在項目啓動時設置項目的環境變量,能夠在這個類中結合config-toolkit,把zk的配置加載到項目環境變量當中:
@Component public class ZKConfiguration implements EnvironmentAware { @Autowired private Environment env; private static Map<String, GeneralConfigGroup> GROUPMAP = new HashMap<>(); public ZKConfiguration() { } // 加載zk的基本配置 @Bean public ZookeeperConfigProfile zookeeperConfigProfile() { ZookeeperConfigProfile configProfile = new ZookeeperConfigProfile( Objects.requireNonNull(this.env.getProperty("configs.address")), Objects.requireNonNull(this.env.getProperty("configs.env")), this.env.getProperty("configs.version")); return configProfile; } //獲得具體組裏的配置 @Bean({"groupPropDefault"}) public GeneralConfigGroup generalConfigGroupDefault() { ZookeeperConfigProfile configProfile = this.zookeeperConfigProfile(); GeneralConfigGroup group = new ZookeeperConfigGroup(configProfile, this.env.getProperty("configs.groupdefault")); return group; } /** * 獲取配置組 * @return */ public GeneralConfigGroup getConfigGroup(String group) { return GROUPMAP.get(group); } /** * * 項目啓動時會調用這個方法,把zk裏的配置組存在臨時變量GROUPMAP裏,之後會用到 * 因此 數據源初始化,就設置在這個方法裏 * @param environment */ @Override public void setEnvironment(Environment environment) { this.env = environment; ZookeeperConfigProfile configProfile = this.zookeeperConfigProfile(); GROUPMAP.put("groupdefault", new ZookeeperConfigGroup(configProfile, this.env.getProperty("configs.groupdefault"))); }}
5.得到全部配置項後,就是讓springboot去創建kafka連接了,這裏至關於要從新實現KafkaAutoConfiguration
的配置。創建一個KafkaConfig配置類,這裏主要是配置全部kafka須要的bean:
@ConditionalOnClass({KafkaTemplate.class}) @EnableKafka public class KafkaConfig { // 把剛剛加載zk配置的類注入進來 @Autowired private ZKConfiguration zkConfiguration; // 建立 消費者工廠 @Bean("consumerFactory") @ConditionalOnMissingBean({ConsumerFactory.class}) public ConsumerFactory<String, String> consumerFactory() { // 建立工廠須要三個參數: // 1. 消費者配置的map // 2. key的反序列化實現類 // 3. value的反序列化實現類 return new DefaultKafkaConsumerFactory<String, String>(makeKafkaConfig(), new StringDeserializer(), new StringDeserializer()); } // 建立生產者工廠 @Bean("producerFactory") @ConditionalOnMissingBean({ProducerFactory.class}) public ProducerFactory<String, String> kafkaProducerFactory() { // 生產者工廠的參數如消費者工廠 return new DefaultKafkaProducerFactory(makeKafkaConfig(), new StringSerializer(), new StringSerializer()); } // 建立 kafkaTemplate 這個bean,有了這個bean才能在實際業務中使用kafka @Bean("kafkaTemplate") @ConditionalOnMissingBean({com.seckill.boot.common.util.KafkaTemplate.class}) public KafkaTemplate<String, Protobufable> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<String, String> kafkaProducerFactory, @Qualifier("producerListener") ProducerListener<String, Protobufable> producerListener) { KafkaTemplate<String, Protobufable> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory); kafkaTemplate.setProducerListener(producerListener); kafkaTemplate.setDefaultTopic("groupdefault"); return kafkaTemplate; } @Bean("producerListener") @ConditionalOnMissingBean({ProducerListener.class}) public ProducerListener<String, Protobufable> kafkaProducerListener() { return new LoggingProducerListener(); } @Bean @ConditionalOnProperty( name = {"spring.kafka.producer.transaction-id-prefix"} ) @ConditionalOnMissingBean public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) { return new KafkaTransactionManager(producerFactory); } // zk裏拿到的配置取出來 private Map<String, Object> makeKafkaConfig() { // 得到配置的group GeneralConfigGroup configGroup = zkConfiguration.getConfigGroup("groupdefault"); Map<String, Object> kafkaConfig = new HashMap<>(); kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configGroup.get("spring.kafka.bootstrap-servers")); kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG, configGroup.get("spring.kafka.consumer.group-id")); kafkaConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, configGroup.get("spring.kafka.consumer.auto-offset-reset")); kafkaConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, configGroup.get("spring.kafka.consumer.enable-auto-commit")); kafkaConfig.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, configGroup.get("spring.kafka.consumer.auto-commit-interval")); kafkaConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, configGroup.get("spring.kafka.consumer.key-serializer")); kafkaConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, configGroup.get("spring.kafka.consumer.value-serializer")); kafkaConfig.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, configGroup.get("spring.kafka.consumer.max-poll-records")); kafkaConfig.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, configGroup.get("spring.kafka.consumer.max-poll-interval-ms")); kafkaConfig.put("ack-mode", configGroup.get("spring.kafka.listener.ack-mode")); kafkaConfig.put("concurrency", configGroup.get("spring.kafka.listener.concurrency")); kafkaConfig.put(ProducerConfig.ACKS_CONFIG, configGroup.get("spring.kafka.producer.acks")); kafkaConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, configGroup.get("spring.kafka.producer.batch-size")); kafkaConfig.put(ProducerConfig.BUFFER_MEMORY_CONFIG, configGroup.get("spring.kafka.producer.buffer-memory")); kafkaConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, configGroup.get("spring.kafka.producer.key-serializer")); kafkaConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, configGroup.get("spring.kafka.producer.value-serializer")); kafkaConfig.put(ProducerConfig.RETRIES_CONFIG, configGroup.get("spring.kafka.producer.retries")); return kafkaConfig; } }
6.將kafka須要的bean配置好後,就能在實際業務中使用KafkaTemplate操做消息了
@Component public class MqProviderImpl{ @Autowired private KafkaTemplate<String, String> kafkaTemplate;