Spring 對Apache Kafka的支持與集成

1. 引言

Apache Kafka 是一個分佈式的、容錯的流處理系統。在本文中,咱們將介紹Spring對Apache Kafka的支持,以及原生Kafka Java客戶端Api 所提供的抽象級別。java

Spring Kafka 經過 @KafkaListener 註解,帶來了一個簡單而典型的 Spring 模板編程模型,它還帶有一個 KafkaTemplate 和消息驅動的 POJO 。git

2. 安裝和設置

要下載和安裝Kafka,請參考官方指南。而後還須要在 pom.xml 文件中添加 spring-kafka:github

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.3.7.RELEASE</version>
</dependency>

新建一個 Spring Boot 示例應用程序,以默認配置啓動。spring

3. 配置 Topics

之前咱們使用命令行工具在 Kafka 中建立 topic,例如:編程

$ bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic mytopic

可是隨着 AdminClient 在Kafka中的引入,咱們如今能夠經過編程來建立 Topicbootstrap

以下代碼,添加 KafkAdmin bean 到 Spring中,它將自動爲 NewTopic 類的全部 bean 添加 topic安全

@Configuration
public class KafkaTopicConfig {
    
    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;
 
    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }
    
    @Bean
    public NewTopic topic1() {
         return new NewTopic("developlee", 1, (short) 1);
    }
}

4. 消息生成

要建立消息,首先須要配置 ProducerFactory ,並設置建立 Kafka Producer 實例的策略,而後使用 KafkaTemplateKafkaTemplate 包裝了 Producer 實例,並提供向 Kafka Topic 發送消息的簡便方法。bash

在整個應用程序上下文中使用單個實例將提供更高的性能。所以推薦使用一個 Producer 實例。該實例是線程安全的,因此 KakfaTemplate 實例也是線程安全的,服務器

4.1. Producer 配置

@Configuration
public class KafkaProducerConfig {
 
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        configProps.put(
          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        configProps.put(
          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
 
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

4.2. 消息發佈

咱們使用 KafkaTemplate 來發布消息:異步

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
 
public void sendMessage(String msg) {
    kafkaTemplate.send(topicName, msg);
}

send API 返回 ListenableFuture 對象。若是咱們想阻塞發送線程並得到關於發送消息的結果,咱們能夠調用ListenableFuture 對象的 get API。線程將會等待結果,但它會下降生產者的速度。

Kafka是一個快速流處理平臺。所以,最好異步處理結果,這樣後續消息就無需等待前一條消息的結果。咱們能夠經過回調來實現:

public void sendMessage(String message) {
            
    ListenableFuture<SendResult<String, String>> future = 
      kafkaTemplate.send(topicName, message);
	
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
 
        @Override
        public void onSuccess(SendResult<String, String> result) {
            System.out.println("Sent message=[" + message + 
              "] with offset=[" + result.getRecordMetadata().offset() + "]");
        }
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("Unable to send message=[" 
              + message + "] due to : " + ex.getMessage());
        }
    });
}

5. 消息消費

5.1. 消費者配置

對於消費消息,咱們須要配置一個 ConsumerFactory 和一個 KafkaListenerContainerFactory

一旦這些bean在Spring Bean工廠中可用,就可使用 @KafkaListener 註解配置基於POJO的消費者。

配置類上須要添加 @EnableKafka 註解,以便可以檢測Spring管理的bean上的 @KafkaListener 註解:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
 
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        props.put(
          ConsumerConfig.GROUP_ID_CONFIG, 
          groupId);
        props.put(
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        props.put(
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> 
      kafkaListenerContainerFactory() {
   
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

5.2. 消息消費

@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
    System.out.println("Received Message in group foo: " + message);
}

能夠爲一個 topic 實現多個 listener,每一個topic 都有不一樣的組Id。此外,一個消費者能夠監聽來自不一樣 topic 的消息:

@KafkaListener(topics = "topic1, topic2", groupId = "foo")

Spring 還支持使用 listener 中的 @Header 註解檢索一個或多個消息標題:

@KafkaListener(topics = "topicName")
public void listenWithHeaders(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"
        + "from partition: " + partition);
}

5.3. 消費來自特定分區的消息

注意到,咱們只使用一個分區建立了 topic 「developlee」。可是,對於具備多個分區的主題,@KafkaListener 能夠顯式訂閱具備初始偏移量 topic 的特定分區:

@KafkaListener(
  topicPartitions = @TopicPartition(topic = "topicName",
  partitionOffsets = {
    @PartitionOffset(partition = "0", initialOffset = "0"), 
    @PartitionOffset(partition = "3", initialOffset = "0")}),
  containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"
        + "from partition: " + partition);
}

因爲 initialOffset 已被髮送到該 listener 中的分區0,所以每次初始化該 listener 時,將從新使用之前從分區0和分區3消耗的全部消息。若是不須要設置偏移量,咱們可使用 @TopicPartition 註解的 partitions 屬性只設置沒有偏移量的分區:

@KafkaListener(topicPartitions 
  = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))

5.4. 爲Listener添加消息過濾器

經過添加自定義過濾器,能夠將 listener 配置爲使用特定類型的消息。這能夠經過將 RecordFilterStrategy 設置爲 KafkaListenerContainerFactory 來完成:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
  filterKafkaListenerContainerFactory() {
 
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(
      record -> record.value().contains("World"));
    return factory;
}

而後能夠將 listener 配置爲使用此容器工廠:

@KafkaListener(
  topics = "topicName", 
  containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
    System.out.println("Received Message in filtered listener: " + message);
}

在這個 listener 中,全部與過濾器匹配的消息都將被丟棄

6. 自定義消息轉換器

到目前爲止,咱們只討論了字符串做爲消息發送和接收的對象。可是,咱們也能夠發送和接收定製的Java對象。這須要在 ProducerFactory 中配置適當的序列化器,並在 ConsumerFactory 中配置反序列化器。

讓咱們看一個簡單的bean,並將以消息的形式發送它:

public class Greeting {
 
    private String msg;
    private String name;
 
    // standard getters, setters and constructor
}

6.1. 生產自定義消息

在本例中,咱們將使用 JsonSerializer。咱們看看 ProducerFactoryKafkaTemplate 的代碼:

@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
    // ...
    configProps.put(
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
      JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}
 
@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
    return new KafkaTemplate<>(greetingProducerFactory());
}

新的 KafkaTemplate 可用於發送 Greeting 消息:

kafkaTemplate.send(topicName, new Greeting("Hello", "World"));

6.2. 消費自定義消息

一樣,咱們修改 ConsumerFactoryKafkaListenerContainerFactory 來正確反序列化 Greeting 消息:

@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
    // ...
    return new DefaultKafkaConsumerFactory<>(
      props,
      new StringDeserializer(), 
      new JsonDeserializer<>(Greeting.class));
}
 
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> 
  greetingKafkaListenerContainerFactory() {
 
    ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(greetingConsumerFactory());
    return factory;
}

spring-kafka JSON序列化器和反序列化器使用 Jackson 庫,該庫是 spring-kafka 項目的可選maven依賴項。咱們也把它加到 pom.xml 文件:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.7</version>
</dependency>

建議不要使用 Jackson 的最新版本,而是使用 pom.xml 文件 中 spring-kafka 的版本。
最後,咱們須要編寫一個 listener 來 消費 Greeting 消息:

@KafkaListener(
  topics = "topicName", 
  containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
    // process greeting message
}

7. 結語

在本文中,咱們介紹了Apache Kafka 和 Spring 集成的基礎知識,且簡要介紹了用於發送和接收消息的類。
本文的完整源代碼能夠在GitHub上找到. 在執行代碼以前,請確保服務器正在運行 Kafka。
若是你以爲文章還不錯,記得關注公衆號: 鍋外的大佬
劉一手的博客

相關文章
相關標籤/搜索