<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.2.RELEASE</version> </parent> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
spring boot會自動配置kafka,接下來只要配置yml屬性文件和主題名配置。spring
spring: kafka: bootstrap-servers: 127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092 producer: retries: 0 batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer properties: linger.ms: 1 consumer: enable-auto-commit: false auto-commit-interval: 100ms key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: session.timeout.ms: 15000
kafka: topic: group-id: topicGroupId topic-name: - topic1 - topic2 - topic3
新建KafkaTopicPropertiesapache
@ConfigurationProperties("kafka.topic") public class KafkaTopicProperties implements Serializable { private String groupId; private String[] topicName; public String getGroupId() { return groupId; } public void setGroupId(String groupId) { this.groupId = groupId; } public String[] getTopicName() { return topicName; } public void setTopicName(String[] topicName) { this.topicName = topicName; }
添加KafkaTopicConfigurationbootstrap
@Configuration @EnableConfigurationProperties(KafkaTopicProperties.class) public class KafkaTopicConfiguration { private final KafkaTopicProperties properties; public KafkaTopicConfiguration(KafkaTopicProperties properties) { this.properties = properties; } @Bean public String[] kafkaTopicName() { return properties.getTopicName(); } @Bean public String topicGroupId() { return properties.getGroupId(); } }
@Service public class IndicatorService { private Logger LOG = LoggerFactory.getLogger(IndicatorService.class); private final KafkaTemplate<Integer, String> kafkaTemplate; /** * 注入KafkaTemplate * @param kafkaTemplate kafka模版類 */ @Autowired public IndicatorService(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } @KafkaListener(topics = "#{kafkaTopicName}", groupId = "#{topicGroupId}") public void processMessage(ConsumerRecord<Integer, String> record) { LOG.info("kafka processMessage start"); LOG.info("processMessage, topic = {}, msg = {}", record.topic(), record.value()); // do something ... LOG.info("kafka processMessage end"); } public void sendMessage(String topic, String data) { LOG.info("kafka sendMessage start"); ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(topic, data); future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() { @Override public void onFailure(Throwable ex) { LOG.error("kafka sendMessage error, ex = {}, topic = {}, data = {}", ex, topic, data); } @Override public void onSuccess(SendResult<Integer, String> result) { LOG.info("kafka sendMessage success topic = {}, data = {}",topic, data); } }); LOG.info("kafka sendMessage end"); } }
至此就能夠跑起來了,有什麼不明白的能夠留言。session