前置條件: Kafka服務器已經配置好。參見在Ubuntu機器上部署Kafka消息隊列html
使用Spring Cloud Stream 和Spring Kafka來進行集成。java
在build.gradle
中添加依賴項node
compile "org.springframework.cloud:spring-cloud-stream" compile "org.springframework.cloud:spring-cloud-stream-binder-kafka" compile "org.springframework.kafka:spring-kafka:1.0.5.RELEASE"
在application-dev.yml
或 ``中添加相應配置spring
spring: kafka.bootstrap-servers: 你的Kafka地址:9092 #http://docs.spring.io/spring-cloud-stream/docs/Brooklyn.RELEASE/reference/html/_apache_kafka_binder.html cloud: #spring.cloud.stream.kafka.binder.brokers stream: kafka: binder: brokers: 你的Kafka地址 zk-nodes: 你的ZooKeeper地址 bindings: output: destination: topic-kafka-dev
package com.your.company.common.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.*; import java.util.HashMap; import java.util.Map; /** * Kafka配置文件 * https://www.codenotfound.com/2016/09/spring-kafka-consumer-producer-example.html */ @Configuration @EnableKafka public class KafkaConfiguration { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; //------------ Begin 生產者配置 ------------ @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); // list of host:port pairs used for establishing the initial connections to the Kakfa cluster props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } //------------ End 生產者配置 ------------ //------------ Begin 消費者配置 ------------ @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); // list of host:port pairs used for establishing the initial connections to the Kakfa cluster props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000); // allows a pool of processes to divide the work of consuming and processing records props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka_group"); return props; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } //------------ End 消費者配置 ------------ }
簡單起見,把消息轉換成JSON字符串來發送和接收apache
package com.your.company.common.kafka; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; /** * 消息發佈者服務 */ @Service public class KafkaMessageSender { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaMessageSender.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void send(String topic, String message) { // the KafkaTemplate provides asynchronous send methods returning a Future ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message); // register a callback with the listener to receive the result of the send asynchronously future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { LOGGER.info("sent message='{}' with offset={}", message, result.getRecordMetadata().offset()); } @Override public void onFailure(Throwable ex) { LOGGER.error("unable to send message='{}'", message, ex); } }); // or, to block the sending thread to await the result, invoke the future's get() method } }
使用@KafkaListener(topics = "your_topic")
註解來監聽消息。bootstrap
package com.your.company.common.kafka; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; /** * 消息消費者 */ @Service public class KafkaConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class); //bin/kafka-console-consumer.sh --bootstrap-server 服務器IP:9092 --topic my_test --from-beginning @KafkaListener(topics = "my_test") public void receive(String message) { LOGGER.info("received message='{}'", message); System.out.println("receive message = " + message); } }
在調試過程當中,能夠在服務器端,用命令行來接收消息或發佈消息,分別調試發佈和消費過程是否正常。segmentfault