官方文檔: https://docs.spring.io/spring-kafka/reference/html/html
@KafkaListener
java
The @KafkaListener
annotation is used to designate a bean method as a listener for a listener container. The bean is wrapped in a MessagingMessageListenerAdapter
configured with various features, such as converters to convert the data, if necessary, to match the method parameters.spring
If, say, six TopicPartition
instances are provided and the concurrency
is 3
; each container gets two partitions. For five TopicPartition
instances, two containers get two partitions, and the third gets one. If the concurrency
is greater than the number of TopicPartitions
, the concurrency
is adjusted down such that each container gets one partition.app
You can now configure a KafkaListenerErrorHandler
to handle exceptions. See Handling Exceptions for more information.ide
By default, the @KafkaListener
id
property is now used as the group.id
property, overriding the property configured in the consumer factory (if present). Further, you can explicitly configure the groupId
on the annotation. Previously, you would have needed a separate container factory (and consumer factory) to use different group.id
values for listeners. To restore the previous behavior of using the factory configured group.id
, set the idIsGroup
property on the annotation to false
.url
示例:spa
demo類:.net
配置類及註解:public class Listener { @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId") public void listen(String data) { ... } }
@Configuration @EnableKafka public class KafkaConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); ... return props; } }