kafka簡介。html
再次以前,先安裝kafka服務。java
參考文檔:web
spring for kafka文檔spring
spring boot for kafka文檔apache
<!-- kafka --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
注意一些配置能夠移除到配置文件中。多線程
package com.sunrun.emailanalysis.config; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaProducerConfigure { @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } /** * Kafka配置信息 * @return */ @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.21.1.24:9092"); props.put(ProducerConfig.RETRIES_CONFIG, 0); props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); props.put(ProducerConfig.LINGER_MS_CONFIG, 1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } /** * Spring Kafka的template * @return */ @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
package com.sunrun.emailanalysis.config; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @EnableKafka @Configuration public class KafkaConsumerConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.21.1.24:9092"); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100"); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1"); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return propsMap; } }
定義了ProducerConfig和ConsumerConfig後咱們須要實現具體的生產者和消費者。其中,咱們在KafkaListenerContainerFactory中使用了ConcurrentKafkaListenerContainer, 咱們將使用多線程消費消息。app
定義好了組件後,咱們就能夠在程序中使用它們了。ide
package com.sunrun.emailanalysis.service; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Service; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; @Service public class MyProducerService { @Autowired private KafkaTemplate template; //發送消息方法 public void send(String topic, String message) { ListenableFuture<SendResult<String, String>> future = template.send(topic,message); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { System.out.println("msg OK." + result.toString()); } @Override public void onFailure(Throwable ex) { System.out.println("msg send failed: "); } }); } }
package com.sunrun.emailanalysis.controller; import com.sunrun.emailanalysis.service.MyProducerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; @Controller @RequestMapping("kafka") public class KafkaController { @Autowired private MyProducerService myProducerService; @RequestMapping("index") public void index(){ myProducerService.send("test","你好啊,整合Spring KAFKA"); } }
咱們能夠使用咱們的consumer配置建立消費者組件(@Compenent、@Bean),Spring項目啓動的時候加載消費者。spring-boot
還能夠直接使用@KafkaListener(topics = {"topicName"})
註解。線程
package com.sunrun.emailanalysis.cosumer; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class EAConsumer { @KafkaListener(topics = {"test"}) public void receive(String message){ System.out.println("接收到信息:" + message); } }
在spring kafka中,能夠把cosumer看作是listener。
接下來啓動SpringBoot項目,輸入網址
http://127.0.0.1/kafka/index
便可激活生產消息的方式,在控制檯由監聽程序打印出發送的消息
msg OK.SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=你好啊,整合Spring KAFKA, timestamp=null), recordMetadata=test-0@6] 接收到信息:你好啊,整合Spring KAFKA
一、參考文檔: