Apache Kafka 是一個分佈式的、容錯的流處理系統。在本文中,咱們將介紹Spring對Apache Kafka的支持,以及原生Kafka Java客戶端Api 所提供的抽象級別。java
Spring Kafka 經過 @KafkaListener 註解,帶來了一個簡單而典型的 Spring 模板編程模型,它還帶有一個 KafkaTemplate 和消息驅動的 POJO 。git
要下載和安裝Kafka,請參考官方指南。而後還須要在 pom.xml
文件中添加 spring-kafka
:github
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.3.7.RELEASE</version> </dependency>
新建一個 Spring Boot 示例應用程序,以默認配置啓動。spring
之前咱們使用命令行工具在 Kafka
中建立 topic
,例如:編程
$ bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic mytopic
可是隨着 AdminClient 在Kafka中的引入,咱們如今能夠經過編程來建立 Topic
。bootstrap
以下代碼,添加 KafkAdmin
bean 到 Spring中,它將自動爲 NewTopic
類的全部 bean
添加 topic
:安全
@Configuration public class KafkaTopicConfig { @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; @Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); return new KafkaAdmin(configs); } @Bean public NewTopic topic1() { return new NewTopic("developlee", 1, (short) 1); } }
要建立消息,首先須要配置 ProducerFactory ,並設置建立 Kafka Producer 實例的策略,而後使用 KafkaTemplate
。 KafkaTemplate
包裝了 Producer
實例,並提供向 Kafka Topic
發送消息的簡便方法。bash
在整個應用程序上下文中使用單個實例將提供更高的性能。所以推薦使用一個 Producer
實例。該實例是線程安全的,因此 KakfaTemplate
實例也是線程安全的,服務器
@Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
咱們使用 KafkaTemplate
來發布消息:異步
@Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String msg) { kafkaTemplate.send(topicName, msg); }
send
API 返回 ListenableFuture
對象。若是咱們想阻塞發送線程並得到關於發送消息的結果,咱們能夠調用ListenableFuture
對象的 get
API。線程將會等待結果,但它會下降生產者的速度。
Kafka是一個快速流處理平臺。所以,最好異步處理結果,這樣後續消息就無需等待前一條消息的結果。咱們能夠經過回調來實現:
public void sendMessage(String message) { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]"); } @Override public void onFailure(Throwable ex) { System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage()); } }); }
對於消費消息,咱們須要配置一個 ConsumerFactory
和一個 KafkaListenerContainerFactory
。
一旦這些bean在Spring Bean工廠中可用,就可使用 @KafkaListener
註解配置基於POJO的消費者。
配置類上須要添加 @EnableKafka
註解,以便可以檢測Spring
管理的bean上的 @KafkaListener
註解:
@EnableKafka @Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put( ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
@KafkaListener(topics = "topicName", groupId = "foo") public void listenGroupFoo(String message) { System.out.println("Received Message in group foo: " + message); }
能夠爲一個 topic 實現多個 listener,每一個topic 都有不一樣的組Id。此外,一個消費者能夠監聽來自不一樣 topic 的消息:
@KafkaListener(topics = "topic1, topic2", groupId = "foo")
Spring 還支持使用 listener 中的 @Header 註解檢索一個或多個消息標題:
@KafkaListener(topics = "topicName") public void listenWithHeaders( @Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { System.out.println( "Received Message: " + message" + "from partition: " + partition); }
注意到,咱們只使用一個分區建立了 topic 「developlee」。可是,對於具備多個分區的主題,@KafkaListener 能夠顯式訂閱具備初始偏移量 topic 的特定分區:
@KafkaListener( topicPartitions = @TopicPartition(topic = "topicName", partitionOffsets = { @PartitionOffset(partition = "0", initialOffset = "0"), @PartitionOffset(partition = "3", initialOffset = "0")}), containerFactory = "partitionsKafkaListenerContainerFactory") public void listenToPartition( @Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { System.out.println( "Received Message: " + message" + "from partition: " + partition); }
因爲 initialOffset 已被髮送到該 listener 中的分區0,所以每次初始化該 listener
時,將從新使用之前從分區0和分區3消耗的全部消息。若是不須要設置偏移量,咱們可使用 @TopicPartition 註解的 partitions 屬性只設置沒有偏移量的分區:
@KafkaListener(topicPartitions = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))
經過添加自定義過濾器,能夠將 listener
配置爲使用特定類型的消息。這能夠經過將 RecordFilterStrategy
設置爲 KafkaListenerContainerFactory
來完成:
@Bean public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setRecordFilterStrategy( record -> record.value().contains("World")); return factory; }
而後能夠將 listener
配置爲使用此容器工廠:
@KafkaListener( topics = "topicName", containerFactory = "filterKafkaListenerContainerFactory") public void listenWithFilter(String message) { System.out.println("Received Message in filtered listener: " + message); }
在這個 listener
中,全部與過濾器匹配的消息都將被丟棄。
到目前爲止,咱們只討論了字符串做爲消息發送和接收的對象。可是,咱們也能夠發送和接收定製的Java對象。這須要在 ProducerFactory
中配置適當的序列化器,並在 ConsumerFactory
中配置反序列化器。
讓咱們看一個簡單的bean,並將以消息的形式發送它:
public class Greeting { private String msg; private String name; // standard getters, setters and constructor }
在本例中,咱們將使用 JsonSerializer
。咱們看看 ProducerFactory
和 KafkaTemplate
的代碼:
@Bean public ProducerFactory<String, Greeting> greetingProducerFactory() { // ... configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, Greeting> greetingKafkaTemplate() { return new KafkaTemplate<>(greetingProducerFactory()); }
新的 KafkaTemplate
可用於發送 Greeting 消息:
kafkaTemplate.send(topicName, new Greeting("Hello", "World"));
一樣,咱們修改 ConsumerFactory
和 KafkaListenerContainerFactory
來正確反序列化 Greeting 消息:
@Bean public ConsumerFactory<String, Greeting> greetingConsumerFactory() { // ... return new DefaultKafkaConsumerFactory<>( props, new StringDeserializer(), new JsonDeserializer<>(Greeting.class)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(greetingConsumerFactory()); return factory; }
spring-kafka
JSON序列化器和反序列化器使用 Jackson 庫,該庫是 spring-kafka
項目的可選maven依賴項。咱們也把它加到 pom.xml 文件:
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.7</version> </dependency>
建議不要使用 Jackson 的最新版本,而是使用 pom.xml 文件 中 spring-kafka
的版本。
最後,咱們須要編寫一個 listener 來 消費 Greeting 消息:
@KafkaListener( topics = "topicName", containerFactory = "greetingKafkaListenerContainerFactory") public void greetingListener(Greeting greeting) { // process greeting message }
在本文中,咱們介紹了Apache Kafka 和 Spring 集成的基礎知識,且簡要介紹了用於發送和接收消息的類。
本文的完整源代碼能夠在GitHub上找到. 在執行代碼以前,請確保服務器正在運行 Kafka。
若是你以爲文章還不錯,記得關注公衆號: 鍋外的大佬
劉一手的博客