本文主要分享下Spring Boot和Spring Kafka如何配置整合,實現發送和接收來自Spring Kafka的消息。html
先前我已經分享了Kafka的基本介紹與集羣環境搭建方法。關於Kafka的介紹請閱讀Apache Kafka簡介與安裝(一),關於Kafka安裝請閱讀Apache Kafka安裝,關於Kafka集羣環境搭建請閱讀Apache Kafka集羣環境搭建 。這裏關於服務器環境搭建不在贅述。java
建立一個kafka-producer-master的maven工程。整個項目結構以下:web
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.gzh.kafka.producer</groupId> <artifactId>producer</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>kafka-producer-master</name> <description>demo project for kafka producer</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <spring-kafka.version>2.1.5.RELEASE</spring-kafka.version> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${spring-kafka.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>${spring-kafka.version}</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger2 --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.8.0</version> </dependency> <!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger-ui --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.8.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
固然,根據我的喜愛,你也能夠使用application.yml屬性文件重寫配置。Spring Boot會嘗試根據pom.xml文件中指定的依賴關係自動配置應用程序,並設置合理的默認值。正則表達式
server.port=8000 spring.application.name=kafka-producer #kafka configuration spring.kafka.producer.bootstrap-servers=192.168.1.130:9092,192.168.1.101:9093,192.168.1.101:9094 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #topic kafka.app.topic.foo=test20180430
在上面的配置中,我給生產者分配的端口號是8000,服務器有3臺,採用先前window環境搭建zookeeper,kafka集羣 中配置的服務器。想了解關於kafka生產者相關的更多配置的話,能夠閱讀關於Spring Boot Kafka Properties的配置信息。spring
SpringKafka提供了使用Producer的KafkaTemplate類發送消息,並提供將數據發送到Kafka主題的高級操做。 提供異步和同步方法,異步方法返回Future。Spring Boot根據application.properties屬性文件中配置的屬性自動配置並初始化KafkaTemplate。爲了方便測試發送消息,使用了Spring的定時任務,在類上使用@EnableScheduling 註解開啓定時任務,經過@Scheduled註解指定發送消息規則。apache
package com.gzh.kafka.producer.component; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; @Component @EnableScheduling public class KafkaMessageProducer { private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageProducer.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Value("${kafka.app.topic.foo}") private String topic; @Scheduled(cron = "00/5 * * * * ?") public void send() { String message = "Hello World---" + System.currentTimeMillis(); LOG.info("topic="+topic+",message="+message); ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message); future.addCallback(success -> LOG.info("KafkaMessageProducer 發送消息成功!"), fail -> LOG.error("KafkaMessageProducer 發送消息失敗!")); } }
package com.gzh.kafka.producer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; @SpringBootApplication @EnableConfigurationProperties public class KafkaProducerApplication{ public static void main(String[] args) { SpringApplication.run(KafkaProducerApplication.class, args); } }
至此,Spring Boot整合Spring Kafka消息生產者應用已經整合完畢。啓動zookeeper、kafka各個服務器。啓動生產者應用,查看消息生產者應用控制檯日誌,以下圖說明整合OK。bootstrap
固然在建立消息生產者類時,咱們能夠更加靈活,能夠不使用定時任務,經過界面請求的方式,發送咱們想要發送的內容。簡單案例以下:服務器
package com.gzh.kafka.producer.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; @Service public class KafkaMessageSendService { private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageSendService.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Value("${kafka.app.topic.foo}") private String topic; public void send(String message){ LOG.info("topic="+topic+",message="+message); ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message); future.addCallback(success -> LOG.info("KafkaMessageProducer 發送消息成功!"), fail -> LOG.error("KafkaMessageProducer 發送消息失敗!")); } }
package com.gzh.kafka.producer.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import com.gzh.kafka.producer.service.KafkaMessageSendService; @RestController @RequestMapping(value="send",produces=MediaType.APPLICATION_JSON_UTF8_VALUE) public class KafkaMessageSendController { @Autowired private KafkaMessageSendService kafkaMessageSendService; @RequestMapping(value="/sendMessage",method=RequestMethod.POST) public String send(@RequestParam(required=true) String message){ try { kafkaMessageSendService.send(message); } catch (Exception e) { return "send failed."; } return message; } }
建立一個kafka-consumer-master的maven工程。整個項目結構以下:app
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.gzh.kafka.consumer</groupId> <artifactId>consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>kafka-consumer-master</name> <description>demo project for kafka consumer</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <spring-kafka.version>1.3.4.RELEASE</spring-kafka.version> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>${spring-kafka.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>${spring-kafka.version}</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
注意,這是使用Spring-Kafka時必定要注意版本問題,不然會報各類奇葩錯誤。Spring官方網站上給出了SpringKafka和kafka-client版本(它的版本號要和kafka服務器的版本保持一致)的對應關係:異步
Spring Boot會嘗試根據pom.xml文件中指定的依賴關係自動配置應用程序,並設置合理的默認值。
server.port=8001 spring.application.name=kafka-consumer #kafka configuration #指定消息被消費以後自動提交偏移量,以便下次繼續消費 spring.kafka.consumer.enable-auto-commit=true #指定消息組 spring.kafka.consumer.group-id=guan #指定kafka服務器地址 spring.kafka.consumer.bootstrap-servers=192.168.1.130:9092,192.168.1.101:9093,192.168.1.101:9094 #指定從最近地方開始消費(earliest) spring.kafka.consumer.auto-offset-reset=latest spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer #topic kafka.app.topic.foo=test20180430
在上面的配置中,我給生產者分配的端口號是8000,服務器有3臺,採用先前window環境搭建zookeeper,kafka集羣 中配置的服務器。想了解關於kafka生產者相關的更多配置的話,能夠閱讀關於Spring Boot Kafka Properties的配置信息。
經過使用@KafkaListener來註解一個方法Spring Kafka會自動建立一個消息監聽器容器。使用該註解,並指定要消費的topic(也能夠指定消費組以及分區號,支持正則表達式匹配),這樣,消費者一旦啓動,就會監聽kafka服務器上的topic,實時進行消費消息。
package com.gzh.kafka.consumer.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; @Component public class KafkaMessageConsumer { private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageConsumer.class); @KafkaListener(topics={"${kafka.app.topic.foo}"}) public void receive(@Payload String message, @Headers MessageHeaders headers){ LOG.info("KafkaMessageConsumer 接收到消息:"+message); headers.keySet().forEach(key->LOG.info("{}: {}",key,headers.get(key))); } }
package com.gzh.kafka.consumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; @SpringBootApplication @EnableConfigurationProperties public class KafkaConsumerApplication { public static void main(String[] args) { SpringApplication.run(KafkaConsumerApplication.class, args); } }
消費者應用已經完成,接下來讓咱們驗證Spring Kafka消息發送和接收效果。先依次啓動zookeeper、kafka服務器,而後在啓動生產者(kafka-producer-master)應用,再啓動消費者(kafka-consumer-master)應用,而後觀察生產者和消費者啓動類日誌: