本文主要列一下spring for apache kafka的一些auto config以及屬性配置java
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.2.3.RELEASE</version> </dependency>
這個版本使用的是kafka client 0.10.2.1版本
使用的spring retry是1.1.3.RELEASE版本spring
@Configuration @ConditionalOnClass(KafkaTemplate.class) @EnableConfigurationProperties(KafkaProperties.class) @Import(KafkaAnnotationDrivenConfiguration.class) public class KafkaAutoConfiguration { private final KafkaProperties properties; public KafkaAutoConfiguration(KafkaProperties properties) { this.properties = properties; } @Bean @ConditionalOnMissingBean(KafkaTemplate.class) public KafkaTemplate<?, ?> kafkaTemplate( ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener) { KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<Object, Object>( kafkaProducerFactory); kafkaTemplate.setProducerListener(kafkaProducerListener); kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic()); return kafkaTemplate; } @Bean @ConditionalOnMissingBean(ProducerListener.class) public ProducerListener<Object, Object> kafkaProducerListener() { return new LoggingProducerListener<Object, Object>(); } @Bean @ConditionalOnMissingBean(ConsumerFactory.class) public ConsumerFactory<?, ?> kafkaConsumerFactory() { return new DefaultKafkaConsumerFactory<Object, Object>( this.properties.buildConsumerProperties()); } @Bean @ConditionalOnMissingBean(ProducerFactory.class) public ProducerFactory<?, ?> kafkaProducerFactory() { return new DefaultKafkaProducerFactory<Object, Object>( this.properties.buildProducerProperties()); } }
@Configuration @ConditionalOnClass(EnableKafka.class) class KafkaAnnotationDrivenConfiguration { private final KafkaProperties properties; KafkaAnnotationDrivenConfiguration(KafkaProperties properties) { this.properties = properties; } @Bean @ConditionalOnMissingBean public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() { ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer(); configurer.setKafkaProperties(this.properties); return configurer; } @Bean @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory") public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>(); configurer.configure(factory, kafkaConsumerFactory); return factory; } @EnableKafka @ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME) protected static class EnableKafkaConfiguration { } }
spring-boot-autoconfigure-1.5.7.RELEASE.jar!/META-INF/spring-configuration-metadata.jsonapache
{ "name": "spring.kafka.bootstrap-servers", "type": "java.util.List<java.lang.String>", "description": "Comma-delimited list of host:port pairs to use for establishing the initial\n connection to the Kafka cluster.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties" }, { "name": "spring.kafka.client-id", "type": "java.lang.String", "description": "Id to pass to the server when making requests; used for server-side logging.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties" }, { "name": "spring.kafka.ssl.key-password", "type": "java.lang.String", "description": "Password of the private key in the key store file.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl" }, { "name": "spring.kafka.ssl.keystore-location", "type": "org.springframework.core.io.Resource", "description": "Location of the key store file.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl" }, { "name": "spring.kafka.ssl.keystore-password", "type": "java.lang.String", "description": "Store password for the key store file.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl" }, { "name": "spring.kafka.ssl.truststore-location", "type": "org.springframework.core.io.Resource", "description": "Location of the trust store file.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl" }, { "name": "spring.kafka.ssl.truststore-password", "type": "java.lang.String", "description": "Store password for the trust store file.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Ssl" }, { "name": "spring.kafka.template.default-topic", "type": "java.lang.String", "description": "Default topic to which messages will be sent.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Template" }
{ "name": "spring.kafka.consumer.auto-commit-interval", "type": "java.lang.Integer", "description": "Frequency in milliseconds that the consumer offsets are auto-committed to Kafka\n if 'enable.auto.commit' true.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer" }, { "name": "spring.kafka.consumer.auto-offset-reset", "type": "java.lang.String", "description": "What to do when there is no initial offset in Kafka or if the current offset\n does not exist any more on the server.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer" }, { "name": "spring.kafka.consumer.bootstrap-servers", "type": "java.util.List<java.lang.String>", "description": "Comma-delimited list of host:port pairs to use for establishing the initial\n connection to the Kafka cluster.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer" }, { "name": "spring.kafka.consumer.client-id", "type": "java.lang.String", "description": "Id to pass to the server when making requests; used for server-side logging.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer" }, { "name": "spring.kafka.consumer.enable-auto-commit", "type": "java.lang.Boolean", "description": "If true the consumer's offset will be periodically committed in the background.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer" }, { "name": "spring.kafka.consumer.fetch-max-wait", "type": "java.lang.Integer", "description": "Maximum amount of time in milliseconds the server will block before answering\n the fetch request if there isn't sufficient data to immediately satisfy the\n requirement given by \"fetch.min.bytes\".", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer" }, { "name": "spring.kafka.consumer.fetch-min-size", "type": "java.lang.Integer", "description": "Minimum amount of data the server should return for a fetch request in bytes.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer" }, { "name": "spring.kafka.consumer.group-id", "type": "java.lang.String", "description": "Unique string that identifies the consumer group this consumer belongs to.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer" }, { "name": "spring.kafka.consumer.heartbeat-interval", "type": "java.lang.Integer", "description": "Expected time in milliseconds between heartbeats to the consumer coordinator.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer" }, { "name": "spring.kafka.consumer.key-deserializer", "type": "java.lang.Class<?>", "description": "Deserializer class for keys.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer" }, { "name": "spring.kafka.consumer.max-poll-records", "type": "java.lang.Integer", "description": "Maximum number of records returned in a single call to poll().", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer" }, { "name": "spring.kafka.consumer.value-deserializer", "type": "java.lang.Class<?>", "description": "Deserializer class for values.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Consumer" }, { "name": "spring.kafka.listener.ack-count", "type": "java.lang.Integer", "description": "Number of records between offset commits when ackMode is \"COUNT\" or\n \"COUNT_TIME\".", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener" }, { "name": "spring.kafka.listener.ack-mode", "type": "org.springframework.kafka.listener.AbstractMessageListenerContainer$AckMode", "description": "Listener AckMode; see the spring-kafka documentation.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener" }, { "name": "spring.kafka.listener.ack-time", "type": "java.lang.Long", "description": "Time in milliseconds between offset commits when ackMode is \"TIME\" or\n \"COUNT_TIME\".", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener" }, { "name": "spring.kafka.listener.concurrency", "type": "java.lang.Integer", "description": "Number of threads to run in the listener containers.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener" }, { "name": "spring.kafka.listener.poll-timeout", "type": "java.lang.Long", "description": "Timeout in milliseconds to use when polling the consumer.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Listener" }
{ "name": "spring.kafka.producer.acks", "type": "java.lang.String", "description": "Number of acknowledgments the producer requires the leader to have received\n before considering a request complete.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer" }, { "name": "spring.kafka.producer.batch-size", "type": "java.lang.Integer", "description": "Number of records to batch before sending.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer" }, { "name": "spring.kafka.producer.bootstrap-servers", "type": "java.util.List<java.lang.String>", "description": "Comma-delimited list of host:port pairs to use for establishing the initial\n connection to the Kafka cluster.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer" }, { "name": "spring.kafka.producer.buffer-memory", "type": "java.lang.Long", "description": "Total bytes of memory the producer can use to buffer records waiting to be sent\n to the server.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer" }, { "name": "spring.kafka.producer.client-id", "type": "java.lang.String", "description": "Id to pass to the server when making requests; used for server-side logging.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer" }, { "name": "spring.kafka.producer.compression-type", "type": "java.lang.String", "description": "Compression type for all data generated by the producer.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer" }, { "name": "spring.kafka.producer.key-serializer", "type": "java.lang.Class<?>", "description": "Serializer class for keys.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer" }, { "name": "spring.kafka.producer.retries", "type": "java.lang.Integer", "description": "When greater than zero, enables retrying of failed sends.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer" }, { "name": "spring.kafka.producer.value-serializer", "type": "java.lang.Class<?>", "description": "Serializer class for values.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties$Producer" }, { "name": "spring.kafka.properties", "type": "java.util.Map<java.lang.String,java.lang.String>", "description": "Additional properties used to configure the client.", "sourceType": "org.springframework.boot.autoconfigure.kafka.KafkaProperties" }