Spring Boot Kafka概覽、配置及優雅地實現發佈訂閱

本文屬於原創,轉載註明出處,歡迎關注微信小程序小白AI博客 微信公衆號小白AI或者網站 https://xiaobaiai.nethtml

[TOC]前端

1 前言

本篇文章內容很全,很長,很細!不要心急,慢慢看!我都寫完了,相信你看完確定能夠的,有任何問題能夠隨時交流!java

本篇文章內容很全,很長,很細!不要心急,慢慢看!我都寫完了,相信你看完確定能夠的,有任何問題能夠隨時交流!git

本篇文章內容很全,很長,很細!不要心急,慢慢看!我都寫完了,相信你看完確定能夠的,有任何問題能夠隨時交流!github

本篇文章主要介紹Spring Kafka的經常使用配置、主題自動建立、發佈消息到集羣、訂閱消息(羣組)、流處理配置以及嵌入式Kafka作測試配置相關內容,最後經過兩種方式去實現消息的發佈和訂閱功能,其中一種是基於Spring Integration方式。本文內容基於Spring Kafka2.3.3文檔Spring Boot Kafka相關文檔,Spring建立了一個名爲Spring kafka的項目,它封裝了Apache的kafka客戶端部分(生產者/消費者/流處理等),以便在Spring項目中快速集成kafka,Spring-Kafka項目提供了Apache Kafka自動化配置,經過Spring Boot的簡化配置(以spring.kafka.*做爲前綴的配置參數),在Spring Boot中使用Kafka特別簡單。而且Spring Boot還提供了一個嵌入式Kafka代理方便作測試。spring

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup

實現下面的所涉及到的功能實現,須要有以下環境:apache

  • Java運行或開發環境(JRE/JDK)
  • Kafka安裝成功

更多的配置能夠參考《Kafka,ZK集羣開發或部署環境搭建及實驗》這一篇文章。編程

本文儘可能作到闡述邏輯清晰,主要路線就是全局介紹Spring Kafka的主要功能及重點配置,而Spring Boot對Spring Kafka進一步簡化配置,經過Spring Boot中的Kafka幾大註解實現發佈訂閱功能,同時經過Spring Integration + 自定義Kafka配置方式實現一個較爲複雜的Kafka發佈訂閱功能,本文經過本身實驗和整理了較久的時間,涵蓋了Spring Kafka大部份內容,但願你們耐心讀下來,有什麼問題隨時反饋,一塊兒學習。

2 Spring Kafka功能概覽

Spring Kafka、Spring Integration和Kafka客戶端版本聯繫或者兼容性以下(截至2019年12月9日):json

Spring for Apache Kafka Spring Integration for Apache Kafka Version kafka-clients
2.3.x 3.2.x 2.3.1
2.2.x 3.1.x 2.0.1, 2.1.x, 2.2.x
2.1.x 3.0.x 1.0.x, 1.1.x, 2.0.0
1.3.x 2.3.x 0.11.0.x, 1.0.x
具體更多版本特色能夠看官網,spring kafka當前最新爲2.3.4版本。

Spring Kafka相關的註解有以下幾個:bootstrap

註解類型 描述
EnableKafka 啓用由AbstractListenerContainerFactory在封面(covers)下建立的Kafka監聽器註解端點,用於配置類;
EnableKafkaStreams 啓用默認的Kafka流組件
KafkaHandler 在用KafkaListener註解的類中,將方法標記爲Kafka消息監聽器的目標的註解
KafkaListener 將方法標記爲指定主題上Kafka消息監聽器的目標的註解
KafkaListeners 聚合多個KafkaListener註解的容器註解
PartitionOffset 用於向KafkaListener添加分區/初始偏移信息
TopicPartition 用於向KafkaListener添加主題/分區信息

如使用@EnableKafka能夠監聽AbstractListenerContainerFactory子類目標端點,如ConcurrentKafkaListenerContainerFactoryAbstractKafkaListenerContainerFactory的子類。

public class ConcurrentKafkaListenerContainerFactory<K,V>
extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K,V>,K,V>
@Configuration
 @EnableKafka
 public class AppConfig {
        @Bean
        public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory() {
                ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
                factory.setConsumerFactory(consumerFactory());
                factory.setConcurrency(4);
                return factory;
        }
        // other @Bean definitions
 }
@EnableKafka並非在Spring Boot中啓用Kafka必須的,Spring Boot附帶了Spring Kafka的自動配置,所以不須要使用顯式的 @EnableKafka。若是想要本身實現Kafka配置類,則須要加上 @EnableKafka,若是你不想要Kafka自動配置,好比測試中,須要作的只是移除 KafkaAutoConfiguration
@SpringBootTest("spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration")

2.1 自動建立主題

💡 要在應用啓動時就建立主題,能夠添加NewTopic類型的Bean。若是該主題已經存在,則忽略Bean。

2.2 發送消息

Spring的KafkaTemplate是自動配置的,你能夠直接在本身的Bean中自動鏈接它,以下例所示:

@Component
public class MyBean {

    private final KafkaTemplate kafkaTemplate;

    @Autowired
    public MyBean(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // ...

}

KafkaTemplate包裝了一個生產者,並提供了向kafka主題發送數據的方便方法。提供異步和同步(發送阻塞)方法,異步(發送非阻塞)方法返回ListenableFuture,以此監聽異步發送狀態,成功仍是失敗,KafkaTemplate提供以下接口:

ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
    T doInKafka(Producer<K, V> producer);
}

sendDefault API 要求已向模板提供默認主題。部分API接受一個時間戳做爲參數,並將該時間戳存儲在記錄中,如何存儲用戶提供的時間戳取決於Kafka主題上配置的時間戳類型,若是主題配置爲使用CREATE_TIME,則記錄用戶指定的時間戳(若是未指定則生成)。若是將主題配置爲使用LOG_APPEND_TIME,則忽略用戶指定的時間戳,而且代理將添加本地代理時間。metricspartitionsFor方法委託給底層Producer上的相同方法。execute方法提供對底層生產者的直接訪問

要使用模板,能夠配置一個生產者工廠並在模板的構造函數中提供它。下面的示例演示瞭如何執行此操做:

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    // KafkaTemplate構造函數中輸入生產者工廠配置
    return new KafkaTemplate<Integer, String>(producerFactory());
}

而後,要使用模板,能夠調用其方法之一發送消息。

當你使用包含Message<?>參數的方法時,主題、分區和鍵信息在消息頭中提供,有以下子項:

KafkaHeaders.TOPIC
KafkaHeaders.PARTITION_ID
KafkaHeaders.MESSAGE_KEY
KafkaHeaders.TIMESTAMP

如訪問頭部信息中某一項信息:

public void handleMessage(Message<?> message) throws MessagingException {
    LOGGER.debug("===Received Msg Topic: {}",  message.getHeaders().get(KafkaHeaders.TOPIC));
}

可選的功能是,可使用ProducerListener配置KafkaTemplate,以得到帶有發送結果(成功或失敗)的異步回調,而不是等待未來完成。如下列表顯示了ProducerListener接口的定義:

public interface ProducerListener<K, V> {
    void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata);
    void onError(String topic, Integer partition, K key, V value, Exception exception);
    boolean isInterestedInSuccess();
}

默認狀況下,模板配置有LoggingProducerListener,它只記錄錯誤,在發送成功時不執行任何操做。只有當isInterestedInSuccess返回true時才調用onSuccess
爲了方便起見,若是你只想實現其中一個方法,那麼將提供抽象ProducerListenerAdapter。對於isInterestedInSuccess,它返回false。下面演示了異步結果回調:

public void sendMessage(String msg) {
    LOGGER.info("===Producing message[{}]: {}", mTopic, msg);       
    ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg);
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
        @Override
        public void onSuccess(SendResult<String, String> result) {
            LOGGER.info("===Producing message success");  
        }

        @Override
        public void onFailure(Throwable ex) {
            LOGGER.info("===Producing message failed");  
        }

    });
}

若是但願阻止式發送線程等待結果,能夠調用futureget()方法。你可能但願在等待以前調用flush(),或者爲了方便起見,模板有一個帶有autoFlush參數的構造函數,該構造函數在每次發送時都會致使模板flush()。不過,請注意,刷新可能會顯著下降性能:

public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

使用DefaultKafkaProducerFactory:

如上面使用KafkaTemplate中所示,ProducerFactory用於建立生產者。默認狀況下,當不使用事務時,DefaultKafkaProducerFactory會建立一個供全部客戶機使用的單例生產者,如KafkaProducer javadocs中所建議的那樣。可是,若是對模板調用flush(),這可能會致使使用同一個生產者的其餘線程延遲。從2.3版開始,DefaultKafkaProducerFactory有一個新屬性producerPerThread。當設置爲true時,工廠將爲每一個線程建立(和緩存)一個單獨的生產者,以免此問題。

producerPerThread爲true時,當再也不須要生產者時,用戶代碼必須在工廠上調用 closeThreadBoundProducer()。這將實際關閉生產者並將其從 ThreadLocal中移除。調用reset()或destroy()不會清理這些生產者。

建立DefaultKafkaProducerFactory時,能夠經過調用只接受屬性映射的構造函數(請參閱使用KafkaTemplate中的示例)從配置中獲取鍵和/或值序列化器類,或者序列化程序實例能夠傳遞給DefaultKafkaProducerFactory構造函數(在這種狀況下,全部生產者共享相同的實例)。或者,能夠提供Supplier<Serializer> s(從版本2.3開始),用於爲每一個生產者獲取單獨的Serializer實例:

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

使用ReplyingKafkaTemplate:

版本2.1.3引入了KafkaTemplate的一個子類來提供請求/應答語義。這個類名爲ReplyingKafkaTemplate,而且有一個方法(除了超類中的那些方法以外)。下面的列表顯示了方法簽名:

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);

結果是一個ListenableFuture,它被結果異步填充(或者超時時出現異常)。結果還有一個sendFuture屬性,這是調用KafkaTemplate.send()的結果。你可使用此Future肯定發送操做的結果。這裏就不展開了。

2.3 接收消息

能夠經過配置MessageListenerContainer並提供消息監聽器或使用@KafkaListener註解來接收消息。

2.3.1 消息監聽器

使用消息監聽器容器(message listener container)時,必須提供監聽器才能接收數據。目前有八個消息監聽器支持的接口。下面的列表顯示了這些接口:

// 使用自動提交或容器管理的提交方法之一時,使用此接口處理從Kafka 消費者 poll() 做接收的單個ConsumerRecord實例
public interface MessageListener<K, V> {
    void onMessage(ConsumerRecord<K, V> data);
}

// 使用手動提交方法之一時,使用此接口處理從Kafka 消費者 poll() 操做接收的單個ConsumerRecord實例
public interface AcknowledgingMessageListener<K, V> {
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}

// 使用自動提交或容器管理的提交方法之一時,使用此接口處理從Kafka 消費者 poll() 操做接收的單個ConsumerRecord實例。提供對消費者對象的訪問。
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}

// 使用手動提交方法之一時,使用此接口處理從Kafka 消費者 poll() 操做接收的單個ConsumerRecord實例。提供對消費者對象的訪問。
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 
    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}

// 使用自動提交或容器管理的提交方法之一時,使用此接口處理從Kafka 消費者 poll() 操做接收的全部ConsumerRecord實例。使用此接口時不支持AckMode.RECORD,由於監聽器已得到完整的批處理。
public interface BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data);
}

// 使用手動提交方法之一時,使用此接口處理從Kafka 消費者 poll() 操做接收的全部ConsumerRecord實例。
public interface BatchAcknowledgingMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}

// 使用自動提交或容器管理的提交方法之一時,使用此接口處理從Kafka 消費者 poll() 操做接收的全部ConsumerRecord實例。使用此接口時不支持AckMode.RECORD,由於監聽器已得到完整的批處理。提供對使用者對象的訪問。
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}

// 使用手動提交方法之一時,使用此接口處理從Kafka 消費者 poll() 操做接收的全部ConsumerRecord實例。提供對使用者對象的訪問。
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 
    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
上述消費者對象不是線程安全的。只能在調用偵聽器的線程上調用其方法。
2.3.1.1 消息監聽器容器

提供了兩個MessageListenerContainer的實現:

  • KafkaMessageListenerContainer
  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainer從單個線程上的全部主題或分區接收全部消息(即一個分區只能分配到一個消費者,一個消費者能夠被分配多個分區)。ConcurrentMessageListenerContainer委託給一個或多個KafkaMessageListenerContainer實例,以提供多線程使用,從多線程上去處理主題或分區的全部消息。

從Spring Kafka2.2.7版開始,你能夠將RecordInterceptor添加到偵聽器容器中;在調用偵聽器以容許檢查或修改記錄以前,將調用它。若是攔截器返回null,則不調用偵聽器。偵聽器是批處理偵聽器時不調用偵聽器。從2.3版開始,CompositeRecordInterceptor可用於調用多個攔截器。

默認狀況下,使用事務時,偵聽器在事務啓動後調用。從2.3.4版開始,你能夠設置偵聽器容器的interceptBeforeTx屬性,以便在事務啓動以前調用偵聽器。沒有爲批處理偵聽器提供偵聽器,由於Kafka已經提供了ConsumerInterceptor

2.3.1.2 使用KafkaMessageListenerContainer

有以下構造函數可用:

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties,
                    TopicPartitionOffset... topicPartitions)

每一個都獲取一個ConsumerFactory以及有關主題和分區的信息,以及ContainerProperties對象中的其餘配置。ConcurrentMessageListenerContainer(稍後介紹)使用第二個構造函數跨使用者實例分發TopicPartitionOffsetContainerProperties具備如下構造函數:

public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)

第一個構造函數接受一個TopicPartitionOffset參數數組來顯式地指示容器要使用哪些分區(使用消費者的 assign()方法)和可選的初始偏移量。默認狀況下,正值是絕對偏移量。默認狀況下,負值是相對於分區內的當前最後偏移量。提供了TopicPartitionOffset的構造函數,該構造函數接受一個附加的布爾參數。若是是true,則初始偏移(正偏移或負偏移)相對於該消耗器的當前位置。容器啓動時應用偏移量。第二個是主題數組,Kafka基於group.id屬性:在組中分佈分區來分配分區。第三個使用regex表達式來選擇主題。

要將MessageListener分配給容器,能夠在建立容器時使用ContainerProps.setMessageListener方法。下面的示例演示瞭如何執行此操做:

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

注意當建立一個Defaultkafkafkaconsumerfactory時,使用構造器,該構造器僅以其特性爲基礎,就意味着從配置中獲取了key/value的Deserializer類別。或者,反序列化程序實例能夠傳遞給key/value的DefaultKafkaConsumerFactory構造函數,在這種狀況下,全部消費者共享相同的實例。另外一個選項是提供Supplier<Deserializer>s(從版本2.3開始),用於爲每一個使用者獲取單獨的反序列化程序實例:

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

有關能夠設置的各類屬性的更多信息,請參閱Javadoc 中ContainerProperties

從版本Spring Kafka 2.1.1開始,一個名爲logContainerConfig的新屬性就可用了。當啓用true和INFO日誌記錄時,每一個偵聽器容器都會寫入一條日誌消息,總結其配置屬性。

例如,要將日誌級別更改成INFO,可使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.level.INFO)

從版本Spring Kafka 2.2開始,添加了名爲missingtopicsfailal的新容器屬性(默認值:true)。若是代理上不存在任何客戶端發佈或訂閱涉及到的主題,這將阻止容器啓動。若是容器配置爲偵聽主題模式(regex),則不適用。之前,容器線程在consumer.poll()方法中循環,等待在記錄許多消息時出現主題。除了日誌,沒有跡象代表有問題。要恢復之前的行爲,能夠將屬性設置爲false,這個時候,Broker設置項allow.auto.create.topics=true,且這個容器屬性爲false,則會自動建立不存在的topic。

2.3.1.3 使用 ConcurrentMessageListenerContainer

單個構造函數相似於第一個KafkaListenerContainer構造函數。下面的列表顯示了構造函數的簽名:

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

它還有一個併發屬性。例如,container.setConcurrency(3)即表示建立三個KafkaMessageListenerContainer實例。對於第一個構造函數,Kafka使用它的組管理功能將分區分佈到消費者之間。

當監聽多個主題時,默認的分區分佈可能不是你指望的那樣。例如,若是你有三個主題,每一個主題有五個分區,而且但願使用 concurrency=15,那麼你只看到五個活動的消費者,每一個消費者從每一個主題中分配一個分區,其餘十個消費者處於空閒狀態。這是由於默認的Kafka PartitionAssignorRangeAssignor(參見其Javadoc)。對於這種狀況,你可能須要考慮改用 RoundRobinAssignor,它將分區分佈到全部使用者。而後,爲每一個使用者分配一個主題或分區。若要更改 PartitionAssignor,你能夠在提供給 DefaultKafkaConsumerFactory的屬性中設置 partition.assignment.strategy消費者配置參數( ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)。

使用Spring Boot時,能夠按以下方式分配設置策略:

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

對於第二個構造函數,ConcurrentMessageListenerContainerTopicPartition實例分佈在委託KafkaMessageListenerContainer實例上。

例如,若是提供了六個TopicPartition實例,併發性爲3;每一個容器獲得兩個分區。對於五個TopicPartition實例,兩個容器獲得兩個分區,第三個容器獲得一個分區。若是併發性大於TopicPartitions的數量,則會向下調整併發性,以便每一個容器得到一個分區。調整分區的方式可使用命令行工具kafka-topics.sh查詢和調整主題上的分區數。還能夠添加一個NewTopic Bean,若是NewTopic設定的數目大於當前數目,spring boot的自動配置的KafkaAdmin將向上調整分區。

client.id屬性(若是已設置)將附加 -n,其中n是對應於併發的消費者實例。當啓用JMX時,這是爲MBeans提供惟一名稱所必需的。

從版本Spring Kafka 1.3開始,MessageListenerContainer提供了對底層KafkaConsumer的度量的訪問。對於ConcurrentMessageListenerContainermetrics()方法返回全部目標KafkaMessageListenerContainer實例的度量(metrics)。根據爲底層KafkaConsumer提供的client-id度量被分組到Map<MetricName, ?extends Metric>

從2.3版開始,ContainerProperties提供了一個idleBetweenPolls選項,容許偵聽器容器中的主循環在KafkaConsumer.poll()調用之間睡眠。從提供的選項中選擇實際睡眠間隔做爲最小值,而且選擇max.poll.interval.ms 消費者配置和當前記錄批處理時間之間的差別。

2.3.1.4 提交偏移量

提供了幾個提交偏移量的選項。若是enable.auto.commit使用者屬性爲true,則Kafka將根據其配置自動提交偏移量。若是爲false,則容器支持多個AckMode設置(在下一個列表中描述)。默認的確認模式是批處理。從2.3版開始,框架將enable.auto.commit設置爲false,除非在配置中顯式設置。之前,若是未設置屬性,則使用Kafka默認值(true)。消費者 poll()方法返回一個或多個ConsumerRecords。爲每一個記錄調用MessageListener。如下列表描述了容器對每一個AckMode採起的操做:

  • RECORD: 當偵聽器在處理記錄後返回時提交偏移量。
  • BATCH: 處理完poll()返回的全部記錄後提交偏移量。
  • TIME: 在處理完poll()返回的全部記錄後提交偏移量,只要超過上次提交後的ackTime
  • COUNT: 在處理完poll()返回的全部記錄後提交偏移量,只要上次提交後收到ackCount記錄。
  • COUNT_TIME: 相似於TIMECOUNT,但若是兩個條件都爲true,則執行提交。
  • MANUAL: 消息偵聽器負責acknowledge()Acknowledgment。以後,應用與BATCH相同的語義。
  • MANUAL_IMMEDIATE: 偵聽器調用Acknowledgement.acknowledge()方法時當即提交偏移量。
MANUAL和MANUAL_IMMEDIATE 要求偵聽器是 AcknowledgingMessageListenerBatchAcknowledgingMessageListener。請參見消息偵聽器。

根據syncCommits容器屬性,使用消費者上的commitSync()commitAsync()方法。默認狀況下,syncCommits爲true;另請參閱setSyncCommitTimeout。請參閱setCommitCallback以獲取異步提交的結果;默認回調是LoggingCommitCallback,它記錄錯誤(以及調試級別的成功)。

由於偵聽器容器有本身的提交偏移的機制,因此它但願Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG爲false。從2.3版開始,除非在使用者工廠或容器的使用者屬性重寫中特別設置,不然它將無條件地將其設置爲false。

Acknowledgment有如下方法:

public interface Acknowledgment {
    void acknowledge();
}

此方法使偵聽器能夠控制什麼時候提交偏移。

從版本2.3開始,確認接口有兩個附加方法nack(long sleep)nack(int index, long sleep)。第一個用於記錄偵聽器,第二個用於批處理偵聽器。爲偵聽器類型調用錯誤的方法將引起IllegalStateException

nack()只能在調用偵聽器的消費者線程上調用。

使用批處理偵聽器時,能夠在發生故障的批內指定索引。調用nack()時,將在對失敗和丟棄的記錄的分區執行索引和查找以前提交記錄的偏移量,以便在下次poll()時從新傳遞這些偏移量。這是對SeekToCurrentBatchErrorHandler的改進,SeekToCurrentBatchErrorHandler只能查找整個批次以便從新交付。

注意:經過組管理使用分區分配時,確保sleep參數(加上處理上一次輪詢記錄所花費的時間)小於 consumer max.poll.interval.ms屬性很是重要。
2.3.1.5 偵聽器容器自動啓動和手動啓動

偵聽器容器實現了SmartLifecycle(經過SmartLifecycle在Spring加載和初始化全部bean後,接着執行一些任務或者啓動須要的異步服務),默認狀況下autoStartuptrue。容器在後期啓動(Integer.MAX-VALUE - 100)。實現SmartLifecycle以處理來自偵聽器的數據的其餘組件應該在較早的階段啓動。-100爲之後的階段留出了空間,使組件可以在容器以後自動啓動。好比咱們經過@Bean將監聽器容器交給Spring管理,這個時候經過SmartLifecycle自動執行了初始化的任務,可是當咱們手動經過new監聽器容器實例,則後初始化則不會執行,好比KafkaMessageListenerContainer實例須要手動執行start()

autoStartup在手動執行start中設置true與false沒有做用,能夠參見@KafkaListener聲明週期管理這一小節。

2.3.2 @KafkaListener註解

2.3.2.1 Record Listeners

@KafkaListener註解用於將bean方法指定爲偵聽器容器的偵聽器。bean包裝在一個MessagingMessageListenerAdapter中,該適配器配置有各類功能,如轉換器,用於轉換數據(若有必要)以匹配方法參數。經過使用屬性佔位符(${…}),或者可使用SpEL(#{…​})配置註釋上的大多數屬性。有關更多信息,請參閱Javadoc。

@KafkaListener:

  • id:listener惟一id,當GroupId沒有被配置的時候,默認id爲自動產生,此值指定後會覆蓋group id。
  • containerFactory:上面提到了@KafkaListener區分單數據仍是多數據消費只須要配置一下註解的containerFactory屬性就能夠了,這裏面配置的是監聽容器工廠,也就是ConcurrentKafkaListenerContainerFactory,配置Bean名稱
  • topics:須要監聽的Topic,可監聽多個,能夠是表達式或者佔位符關鍵字或者直接是主題名稱,如多個主題監聽:{"topic1" , "topic2"}
  • topicPattern: 此偵聽器的主題模式。條目能夠是「主題模式」、「屬性佔位符鍵」或「表達式」。框架將建立一個容器,該容器訂閱與指定模式匹配的全部主題,以獲取動態分配的分區。模式匹配將針對檢查時存在的主題週期性地執行。表達式必須解析爲主題模式(支持字符串或模式結果類型)。這使用組管理,Kafka將爲組成員分配分區。
  • topicPartitions:用於使用手動主題/分區分配時
  • errorHandler:監聽異常處理器,配置Bean名稱,默認爲空
  • groupId:消費組ID
  • idIsGroup:id是否爲GroupId
  • clientIdPrefix:消費者Id前綴
  • beanRef:真實監聽容器的Bean名稱,須要在 Bean名稱前加 "__"

@KafkaListener註解爲簡單的POJO偵聽器提供了一種機制。下面的示例演示如何使用它:

public class Listener {
    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }
}

此機制生效須要@Configuration類之一上的@EnableKafka註解和用於配置基礎ConcurrentMessageListenerContainer的偵聽器容器工廠。默認狀況下,須要名爲kafkaListenerContainerFactory的bean。如下示例演示如何使用ConcurrentMessageListenerContain:

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

注意,要設置容器屬性,必須在工廠上使用getContainerProperties()方法。它用做注入容器的實際屬性的模板。

從版本2.1.1開始,如今能夠爲註解建立的消費者設置client.id屬性。clientdprefix的後綴是-n,其中n是一個整數,表示使用併發時的容器號。

從2.2版開始,如今能夠經過使用批註自己的屬性來重寫容器工廠的併發性和自動啓動屬性。屬性能夠是簡單值、屬性佔位符或SpEL表達式。下面的示例演示瞭如何執行此操做:

@KafkaListener(id = "myListener", topics = "myTopic",
        autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
    ...
}

你還可使用顯式主題和分區(以及可選的初始偏移量)配置POJO偵聽器。下面的示例演示瞭如何執行此操做:

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

你能夠在partitionspartitionOffsets屬性中指定每一個分區,但不能同時指定二者。

使用手動AckMode時,還能夠向偵聽器提供Acknowledgment。下面的示例還演示瞭如何使用不一樣的容器工廠:

@KafkaListener(id = "cat", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}

最後,能夠從消息頭得到有關消息的元數據。你可使用如下頭名稱來檢索消息頭內容:

KafkaHeaders.OFFSET
KafkaHeaders.RECEIVED_MESSAGE_KEY
KafkaHeaders.RECEIVED_TOPIC
KafkaHeaders.RECEIVED_PARTITION_ID
KafkaHeaders.RECEIVED_TIMESTAMP
KafkaHeaders.TIMESTAMP_TYPE

示例:

@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
    ...
}
2.3.2.2 批處理偵聽器

從版本1.1開始,能夠配置@KafkaListener方法來接收從消費者接收的整批消費者記錄。要將偵聽器容器工廠配置爲建立批處理偵聽器,能夠設置batchListener屬性。下面的示例演示瞭如何執行此操做:

@Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    return factory;
}

如下示例顯示如何接收有效載荷列表:

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    ...
}

主題、分區、偏移量等在與有效負載並行的頭中可用。下面的示例演示如何使用標題:

@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}

或者,您能夠接收消息列表Message<?>對象,其中包含每一個偏移量和每一個消息中的其餘詳細信息,但它必須是惟一的參數(除了使用手動提交時的Acknowledgment和/Consumer<?, ?>參數)。下面的示例演示如何執行此操做:

@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
    ...
}

@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
    ...
}

@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
    ...
}

在這種狀況下,不會對有效載荷執行轉換。若是BatchMessagingMessageConverter配置了RecordMessageConverter,則還能夠向消息參數添加泛型類型,並轉換有效負載。有關詳細信息,請參閱使用批處理偵聽器的負載轉換。

你還能夠收到一個ConsumerRecord<?, ?>對象,但它必須是惟一的參數(當使用手動提交或Consumer<?, ?>參數時,除了可選的Acknowledgment)。下面的示例演示瞭如何執行此操做:

@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
    ...
}

@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
    ...
}

從版本2.2開始,偵聽器能夠接收poll()方法返回的完整的ConsumerRecords<?, ?>對象,容許偵聽器訪問其餘方法,例如partitions()(返回列表中的TopicPartition實例)和records(TopicPartition)(獲取選擇性記錄)。一樣,這必須是惟一的參數(當使用手動提交或Consumer<?, ?>參數時,除了可選的Acknowledgment)。下面的示例演示瞭如何執行此操做:

@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
    ...
}

2.3.3 @KafkaListener@Payload驗證

從2.2版開始,如今更容易添加驗證程序來驗證@KafkaListener`@Payload參數。之前,你必須配置一個自定義的DefaultMessageHandlerMethodFactory`並將其添加到註冊器中。如今,你能夠將驗證器添加到註冊器自己。如下代碼說明了如何執行此操做:

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(new MyValidator());
    }
}

當你在Spring Boot使用validation starter,會自動配置LocalValidatorFactoryBean,以下例所示:

@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {

    @Autowired
    private LocalValidatorFactoryBean validator;
    ...

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
      registrar.setValidator(this.validator);
    }
}

如下示例演示如何驗證:

public static class ValidatedClass {

  @Max(10)
  private int bar;

  public int getBar() {
    return this.bar;
  }

  public void setBar(int bar) {
    this.bar = bar;
  }

}
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
      containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
    ...
}

@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
    return (m, e) -> {
        ...
    };
}

2.3.4 從新平衡監聽者

ContainerProperties有一個名爲consumerRebalanceListener的屬性,該屬性接受Kafka客戶端的consumerRebalanceListener接口的實現。若是未提供此屬性,則容器將配置日誌偵聽器,該偵聽器將在信息級別記錄從新平衡事件。該框架還添加了一個子接口ConsumerRawareRebalanceListener。如下列表顯示了ConsumerRawareRebalanceListener接口定義:

public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
    void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
    void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
    void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}

2.3.5 轉發監聽者消息

從2.0版開始,若是還使用@SendTo註解註釋@KafkaListener,而且方法調用返回結果,則結果將轉發到@SendTo指定的主題。如:

@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
    ...
}

@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
    ...
}

@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
    ...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {

    @KafkaHandler
    public String foo(String in) {
        ...
    }

    @KafkaHandler
    @SendTo("!{'annotated25reply2'}")
    public String bar(@Payload(required = false) KafkaNull nul,
            @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
        ...
    }

}

2.3.6 @KafkaListener生命週期管理

@KafkaListener註解建立的偵聽器容器不是應用程序上下文中的bean。相反,它們是用KafkaListenerEndpointRegistry類型的基礎設施bean註冊的。這個bean由框架自動聲明並管理容器的生命週期;它將自動啓動任何autoStartup設置爲true的容器。全部容器工廠建立的全部容器必須處於同一phase。有關詳細信息,請參閱偵聽器容器自動啓動。你可使用註冊表以編程方式管理生命週期。啓動或中止註冊表將啓動或中止全部已註冊的容器。或者,能夠經過使用單個容器的id屬性來獲取對該容器的引用。能夠在批註上設置autoStartup ,這將覆蓋容器工廠中配置的默認設置(setAutoStartup(true))。你能夠從應用程序上下文中獲取對bean的引用,例如自動鏈接,以管理其註冊的容器。如下示例說明了如何執行此操做:

@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;

...

    this.registry.getListenerContainer("myContainer").start();

...

註冊表只維護其管理的容器的生命週期;聲明爲bean的容器不受註冊表管理,能夠從應用程序上下文中獲取。能夠經過調用註冊表的getListenerContainers()方法來獲取託管容器的集合。Spring Kafka版本2.2.5添加了一個方便方法getAllListenerContainers(),它返回全部容器的集合,包括由註冊表管理的容器和聲明爲bean的容器。返回的集合將包括任何已初始化的原型bean,但它不會初始化任何延遲bean聲明。

2.4 流處理

Spring for Apache Kafka提供了一個工廠bean來建立StreamsBuilder對象並管理其流的生命週期。只要kafka流在classpath上而且kafka流經過@EnableKafkaStreams註解開啓,Spring Boot就會自動配置所需的KafkaStreamsConfiguration bean。

啓用Kafka流意味着必須設置應用程序id和引導服務器(bootstrap servers)。前者可使用spring.kafka.streams.application-id配置,若是未設置,則默認爲spring.application.name。後者能夠全局設置,也能夠專門爲流覆寫。

使用專用屬性可使用其餘幾個屬性;可使用spring.Kafka.streams.properties命名空間設置其餘任意Kafka屬性。有關詳細信息,Additional Kafka Properties

默認狀況下,由它建立的StreamBuilder對象管理的流將自動啓動。可使用spring.kafka.streams.auto-startup屬性自定義此行爲。

要使用工廠bean,只需將StreamsBuilder鏈接到@bean,以下例所示:

@Configuration(proxyBeanMethods = false)
@EnableKafkaStreams
public static class KafkaStreamsExampleConfiguration {

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
        KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
        stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out",
                Produced.with(Serdes.Integer(), new JsonSerde<>()));
        return stream;
    }

}

默認狀況下,由它建立的StreamBuilder對象管理的流將自動啓動。可使用spring.kafka.streams.auto-startup屬性自定義此行爲。

2.5 附加配置

自動配置支持的屬性顯示在公用應用程序屬性中。注意,在大多數狀況下,這些屬性(連字符或駝峯樣式)直接映射到Apache Kafka點式屬性。有關詳細信息,請參閱Apache Kafka文檔。

前面提到的幾個屬性應用於全部組件(生產者、消費者、管理員和流),但若是但願使用不一樣的值,則能夠在組件級別指定。Apache Kafka指定重要性爲HIGHMEDIUMLOW的屬性。Spring Boot自動配置支持全部高重要性屬性、某些選定的中、低屬性以及任何沒有默認值的屬性。

只有Kafka支持的屬性的一個子集能夠經過KafkaProperties類直接使用,若是要使用不直接支持的其餘屬性配置生產者或消費者,請使用如下屬性:

spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
spring.kafka.streams.properties.prop.five=fifth

上面的參數設置示例將公共prop.oneKafka屬性設置爲first(適用於生產者、消費者和管理員),prop.two admin屬性設置爲secondprop.three consumer屬性設置爲thirdprop.four producer屬性設置爲fourthprop.five streams屬性設置爲fifth

你還能夠配置Spring Kafka JsonDeserializer,以下所示:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme

相似地,能夠禁用JsonSerializer在頭中發送類型信息的默認行爲:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
注意: 以這種方式設置的屬性將覆蓋Spring Boot顯式支持的任何配置項。

2.6 使用Embdded Kafka作測試

Spring for Apache Kafka提供了一種使用嵌入式Apache Kafka代理測試項目的便捷方法。要使用此功能,請使用Spring Kafka測試模塊中的@EmbeddedKafka註解測試類。有關更多信息,請參閱Spring For Apache Kafka參考手冊。

要使Spring Boot自動配置與前面提到的嵌入式Apache Kafka代理一塊兒工做,須要將嵌入式代理地址(由EmbeddedKafkaBroker填充)的系統屬性從新映射到Apache Kafka的Spring Boot配置屬性中。有幾種方法能夠作到這一點:

  • 提供系統屬性以將嵌入的代理地址映射到測試類中的spring.kafka.bootstrap-servers:
static {
    System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers");
}
  • @EmbeddedKafka註解上配置屬性名:
@EmbeddedKafka(topics = "someTopic",
        bootstrapServersProperty = "spring.kafka.bootstrap-servers")
  • 在配置屬性中使用佔位符:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}

2.7 Spring Integration支持

Spring Integration也有Kafka的適配器,所以咱們能夠很方便的採用Spring Integration去實現發佈訂閱,固然你也能夠不使用Spring Integration。

Spring Integration是什麼,具體有什麼做用,能夠參考另外一篇文章《Spring Integration最詳解》。

3 Spring Kafka配置參數

這裏對全部配置作個說明的是,spring kafka配置分全局配置和子模塊配置,子模塊配置會複寫全局配置,好比SSL認證能夠全局配置,可是也能夠在每一個子模塊,如消費者、生產者、流式處理中均可以單獨配置SSL(多是微服務部署,消費者和生產者不在同一個應用中)。這裏重點介紹生產者和消費者配置吧,其餘就不展開了,用到的時候再去查找和補充。

3.1 全局配置

# 用逗號分隔的主機:端口對列表,用於創建到Kafka羣集的初始鏈接。覆蓋全局鏈接設置屬性
spring.kafka.bootstrap-servers
# 在發出請求時傳遞給服務器的ID。用於服務器端日誌記錄
spring.kafka.client-id,默認無
# 用於配置客戶端的其餘屬性,生產者和消費者共有的屬性
spring.kafka.properties.*
# 消息發送的默認主題,默認無
spring.kafka.template.default-topic

3.2 生產者

Spring Boot中,Kafka 生產者相關配置(全部配置前綴爲spring.kafka.producer.):

# 生產者要求Leader在考慮請求完成以前收到的確認數
spring.kafka.producer.acks
# 默認批量大小。較小的批處理大小將使批處理不太常見,並可能下降吞吐量(批處理大小爲零將徹底禁用批處理)
spring.kafka.producer.batch-size
spring.kafka.producer.bootstrap-servers
# 生產者可用於緩衝等待發送到服務器的記錄的總內存大小。
spring.kafka.producer.buffer-memory
# 在發出請求時傳遞給服務器的ID。用於服務器端日誌記錄。
spring.kafka.producer.client-id
# 生產者生成的全部數據的壓縮類型
spring.kafka.producer.compression-type
# 鍵的序列化程序類
spring.kafka.producer.key-serializer
spring.kafka.producer.properties.*
# 大於零時,啓用失敗發送的重試次數
spring.kafka.producer.retries
spring.kafka.producer.ssl.key-password
spring.kafka.producer.ssl.key-store-location
spring.kafka.producer.ssl.key-store-password
spring.kafka.producer.ssl.key-store-type
spring.kafka.producer.ssl.protocol
spring.kafka.producer.ssl.trust-store-location
spring.kafka.producer.ssl.trust-store-password
spring.kafka.producer.ssl.trust-store-type
# 非空時,啓用對生產者的事務支持
spring.kafka.producer.transaction-id-prefix
spring.kafka.producer.value-serializer

3.3 消費者

Spring Boot中,Kafka 消費者相關配置(全部配置前綴爲spring.kafka.consumer.):

# 若是「enable.auto.commit」設置爲true,設置消費者偏移自動提交到Kafka的頻率,默認值無,單位毫秒(ms)
spring.kafka.consumer.auto-commit-interval
# 當Kafka中沒有初始偏移或服務器上再也不存在當前偏移時策略設置,默認值無,latest/earliest/none三個值設置
# earliest 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
# latest 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
# none topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常
spring.kafka.consumer.auto-offset-reset
# 用逗號分隔的主機:端口對列表,用於創建到Kafka羣集的初始鏈接。覆蓋全局鏈接設置屬性
spring.kafka.consumer.bootstrap-servers
# 在發出請求時傳遞給服務器的ID,用於服務器端日誌記錄
spring.kafka.consumer.client-id
# 消費者的偏移量是否在後臺按期提交
spring.kafka.consumer.enable-auto-commit
# 若是沒有足夠的數據來當即知足「fetch-min-size」的要求,則服務器在取回請求以前阻塞的最大時間量
spring.kafka.consumer.fetch-max-wait
# 服務器應爲獲取請求返回的最小數據量。
spring.kafka.consumer.fetch-min-size
# 標識此消費者所屬的默認消費者組的惟一字符串
spring.kafka.consumer.group-id
# 消費者協調員的預期心跳間隔時間。
spring.kafka.consumer.heartbeat-interval
# 用於讀取以事務方式寫入的消息的隔離級別。
spring.kafka.consumer.isolation-level
# 密鑰的反序列化程序類
spring.kafka.consumer.key-deserializer
# 在對poll()的單個調用中返回的最大記錄數。
spring.kafka.consumer.max-poll-records
# 用於配置客戶端的其餘特定於消費者的屬性。
spring.kafka.consumer.properties.*
# 密鑰存儲文件中私鑰的密碼。
spring.kafka.consumer.ssl.key-password
# 密鑰存儲文件的位置。
spring.kafka.consumer.ssl.key-store-location
# 密鑰存儲文件的存儲密碼。
spring.kafka.consumer.ssl.key-store-password
# 密鑰存儲的類型,如JKS
spring.kafka.consumer.ssl.key-store-type
# 要使用的SSL協議,如TLSv1.2, TLSv1.1, TLSv1
spring.kafka.consumer.ssl.protocol
# 信任存儲文件的位置。
spring.kafka.consumer.ssl.trust-store-location
# 信任存儲文件的存儲密碼。
spring.kafka.consumer.ssl.trust-store-password
# 信任存儲區的類型。
spring.kafka.consumer.ssl.trust-store-type
# 值的反序列化程序類。
spring.kafka.consumer.value-deserializer

3.4 監聽器

Spring Boot中,Kafka Listener相關配置(全部配置前綴爲spring.kafka.listener.):

# ackMode爲「COUNT」或「COUNT_TIME」時偏移提交之間的記錄數
spring.kafka.listener.ack-count=
spring.kafka.listener.ack-mode
spring.kafka.listener.ack-time
spring.kafka.listener.client-id
spring.kafka.listener.concurrency
spring.kafka.listener.idle-event-interval
spring.kafka.listener.log-container-config
# 若是Broker上不存在至少一個配置的主題(topic),則容器是否沒法啓動,
# 該設置項結合Broker設置項allow.auto.create.topics=true,若是爲false,則會自動建立不存在的topic
spring.kafka.listener.missing-topics-fatal=true
# 非響應消費者的檢查間隔時間。若是未指定持續時間後綴,則將使用秒做爲單位
spring.kafka.listener.monitor-interval
spring.kafka.listener.no-poll-threshold
spring.kafka.listener.poll-timeout
spring.kafka.listener.type

3.5 管理

spring.kafka.admin.client-id
# 若是啓動時代理不可用,是否快速失敗
spring.kafka.admin.fail-fast=false
spring.kafka.admin.properties.*
spring.kafka.admin.ssl.key-password
spring.kafka.admin.ssl.key-store-location
spring.kafka.admin.ssl.key-store-password
spring.kafka.admin.ssl.key-store-type
spring.kafka.admin.ssl.protocol
spring.kafka.admin.ssl.trust-store-location
spring.kafka.admin.ssl.trust-store-password
spring.kafka.admin.ssl.trust-store-type

3.6 受權服務(JAAS)

spring.kafka.jaas.control-flag=required
spring.kafka.jaas.enabled=false
spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule
spring.kafka.jaas.options.*

3.7 SSL認證

spring.kafka.ssl.key-password
spring.kafka.ssl.key-store-location
spring.kafka.ssl.key-store-password
spring.kafka.ssl.key-store-type
spring.kafka.ssl.protocol
spring.kafka.ssl.trust-store-location
spring.kafka.ssl.trust-store-password
spring.kafka.ssl.trust-store-type

3.8 Stream流處理

spring.kafka.streams.application-id
spring.kafka.streams.auto-startup
spring.kafka.streams.bootstrap-servers
spring.kafka.streams.cache-max-size-buffering
spring.kafka.streams.client-id
spring.kafka.streams.properties.*
spring.kafka.streams.replication-factor
spring.kafka.streams.ssl.key-password
spring.kafka.streams.ssl.key-store-location
spring.kafka.streams.ssl.key-store-password
spring.kafka.streams.ssl.key-store-type
spring.kafka.streams.ssl.protocol
spring.kafka.streams.ssl.trust-store-location
spring.kafka.streams.ssl.trust-store-password
spring.kafka.streams.ssl.trust-store-type
spring.kafka.streams.state-dir

4 Kafka訂閱發佈基本特性回顧

  • 同一消費組下全部消費者協同消費訂閱主題的全部分區

    • 同消費組,多消費者訂閱單主題單分區,則分區只會分配給其中一個消費者,除非這個消費者掛掉,纔會分配給其餘一個消費者消費消息,意思就是其餘消費者在旁邊看着吃東西
    • 同消費組,N個消費者訂閱單主題N個分區,則默認每一個消費者都會被分配一個分區
    • 同消費組,N個消費者訂閱單主題M個分區,當M > N時,則會有消費者多分配多於一個分區的狀況;當M < N時,則會有空閒消費者,相似第一條
    • 全部上面所說的消費者實例能夠是線程方式或者是進程方式存在,所說的分區分配機制叫作重平衡(rebalance)
    • 當消費者內成員個數發生變化會觸發重平衡;訂閱的主題個數發生變化會觸發重平衡;訂閱的主題分區個數發生變化會觸發重平衡;
    • 總之就是一個分區只能分配到一個消費者,一個消費者能夠被分配多個分區
  • 消費者offset管理機制

    • 每一個主題分區中的消息都有一個惟一偏移值,具備前後順序,與消費者具備對應關係,消費者每消費一條消息,偏移量加1,並記錄在消費者本地,並按期的將記錄同步到服務端(Broker),這裏的同步機制是能夠設置的
    • 消息是被持久化的,當組內全部消費者從新訂閱主題時,能夠設置是否從頭開始消費消息或者是從最後記錄的偏移值位置開始消費
  • 分區和消費者個數如何設置

    • 咱們知道主題分區是分佈在不一樣的Broker上的,每一個分區對應一個消費者,從而具備消息處理具備很高的吞吐量
    • 分區是調優Kafka並行度的最小單元,多線程消費者鏈接多分區消費消息,在實現上,經過socket鏈接,所以也會佔用文件句柄個數
    • 建立分區都是會佔用必定內存的,並非分區越多越好,固然如今kafka社區在優化這一部分,讓分區數達到更大,性能也不會有所影響

具體怎麼調優副本、分區、消費者等這裏就不展開了,後面專門來研究這個問題。

5 發佈訂閱示例

實現下面的示例須要的環境:

  • Kafka + Zookeeper單點服務器或集羣已配置好(若是環境搭建不熟悉,能夠去翻看前面寫的關於Kafka的環境搭建和測試那一篇),或者是使用Spring-kafka-test embedded Kafka Server
  • Spring Boot開發環境(2.2.1)

    • JDK(1.8或以上)
    • STS(4.4.RELEASE)
    • MARVEN構建方式

5.1 使用Embedded Kafka Server

咱們知道Kafka是Scala+Zookeeper構建的,能夠從官方網站下載部署包並在本地部署。不過,Spring Kafka Test已經封裝了Kafka測試的帶註解的一鍵式功能,以打開Kafka服務器,從而簡化了驗證Kafka相關功能的開發過程,使用起來也很是簡單。

添加依賴:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>

啓動服務,下面使用Junit測試用例直接啓動Kafka服務器服務,包括四個代理節點,Run as JUnit Test。:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ApplicationTests.class)
@EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095})
public class ApplicationTests {
    @Test
    public void contextLoads()throws IOException {
        System.in.read();
    }
}

@EmbeddedKafka中能夠設置相關參數:

  • value: 設置建立代理的個數
  • count: 同value
  • ports: 代理端口號列表
  • brokerPropertiesLocation:指定配置文件,如 "classpath:application.properties"
注意:EmbeddedKafka這樣默認是沒有建立主題的。會提示 Topic(s) [test] is/are not present and missingTopicsFatal is true錯誤。@EmbeddedKafka默認狀況是建立一個代理,該代理具備一個不帶任何參數的隨機端口,它將在啓動日誌中輸出特定端口和默認配置項。

5.2 簡單的發佈訂閱實現(無自定義配置)

下面實現一個簡單發佈訂閱功能,經過前端WEB調用一個API,而後在該API控制器中獲得請求後生產者開始發送消息,消費者後臺監聽消息,若是收到消費者消息,則打印出來。

5.2.1 添加依賴及配置Kafka

添加Kafka依賴:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>            
</dependency>

配置Kafka,這裏消費者和生產者在同一應用中,咱們只須要配置Kafka Brokers的服務地址+端口:

server: 
  port: 9000
spring:
  kafka:
    bootstrap-servers: 10.151.113.57:9092,10.151.113.57:9093,10.151.113.57:9094
    listener:
        # 設置不監聽主題錯誤,false時,若是broker設置了llow.auto.create.topics = true,生產者發送到未建立主題時,會默認自動建立主題
        # 且默認建立的主題是單副本單分區的
        missing-topics-fatal: false
    consumer:
        # 配置消費者消息offset是否自動重置(消費者重連會可以接收最開始的消息)
        auto-offset-reset: earliest

5.2.2 添加生產者

@Service
public class Producer {

    private static final Logger LOGGER = LogManager.getLogger(Producer.class);
    private static final String TOPIC = "users";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        LOGGER.info(String.format("===Producing message: {}", message));
        this.kafkaTemplate.send(TOPIC, message);
    }
}

5.2.3 添加消費者

@Service
public class Consumer {

    private static final Logger LOGGER = LogManager.getLogger(Consumer.class);

    @KafkaListener(topics = "test", groupId = "group_test")
    public void consume(String message) throws IOException {
        LOGGER.info(String.format("#### -> Consumed message -> %s", message));
    }
   
}

5.2.4 添加WEB控制器

@RestController
@RequestMapping(value = "/kafka")
public class KafkaController {

    private final Producer producer;

    @Autowired
    KafkaController(Producer producer) {
        this.producer = producer;
    }

    @GetMapping(value = "/publish")
    public void sendMessageToKafkaTopic(@RequestParam("message") String message) {
        this.producer.sendMessage(message);
    }
}

5.2.5 測試

添加Spring Boot Application:

@SpringBootApplication
public class TestKafkaApplication {
    public static void main(String[] args) {
        SpringApplication.run(TestKafkaApplication.class, args);
    }

}

啓動Kafka Brokers後,須要手動建立主題(若是想自動建立,則須要藉助KafkaAdmin,或者是Kafka Broker設置了allow.auto.create.topics=true且應用設置了listener.missing-topics-fatal=false):

# 若是對kafka-topics.sh這裏不熟悉,能夠去翻看前面寫的關於Kafka的相關文章(環境搭建和測試那一篇)
# 建立test主題
$ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --replication-factor 1 --partitions 2 --topic test

打開瀏覽器測試:

http://localhost:9000/kafka/publish?message=hello

則應用控制檯會打印hello。整個發佈訂閱的實現只使用了跟Kafka相關的@KafkaListener註解接收消息和KafkaTemplate模板發送消息,非常簡單。

5.3 基於自定義配置發佈訂閱實現

上面是簡單的經過Spring Boot依賴的Spring Kafka配置便可快速實現發佈訂閱功能,這個時候咱們是沒法在程序中操做這些配置的,所以這一小節就是利用咱們以前《Spring Boot從零入門7_最新配置文件配置及優先級詳細介紹》文章中講述的自定義配置文件方式去實現發佈訂閱功能。

實現內容有:

  • 自定義Kafka配置參數文件(非application.properties/yml)
  • 可實現多生產者(每一個生產者爲單服務單線程),多消費者(非@KafkaListener實現消息監聽)
  • 支持SSL安全配置
  • 監聽生產者
源碼不會直接貼,只給出主體部分。

配置文件:

@Configuration
@ConfigurationProperties(prefix = "m2kc")
@PropertySource("classpath:kafka.properties")
@Validated
public class M2KCKafkaConfig {

    @Value("${m2kc.kafka.bootstrap.servers}")
    private String kafkaBootStrapServers;

    @Value("${m2kc.kafka.key.serializer.class}")
    private String kafkaKeySerializerClass;

    ......
    ......
}

生產者:

@Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class KafkaProducer {
    private static final Logger LOGGER = LogManager.getLogger(KafkaProducer.class);
    private String mTopic = "test";
    private M2KCKafkaConfig mM2KCKafkaConfig;
    private KafkaTemplate<String, String> mKafkaTemplate;
   
    @Autowired
    public KafkaProducer(M2KCKafkaConfig kafkaConfig) {
        mTopic = kafkaConfig.getKafkaSourceTopic();
        mM2KCKafkaConfig = kafkaConfig;     
        mKafkaTemplate = getKafkaTemplate();
    }

    public KafkaTemplate<String, String> getKafkaTemplate() {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory());
        return kafkaTemplate;
    }

    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers());
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeySerializerClass());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueSerializerClass());        
        if (mM2KCKafkaConfig.isKafkaSslEnable()) {
            // TODO : to test
            properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol());
            properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword());

            properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword());
            properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword());            
        }

        return new DefaultKafkaProducerFactory<String, String>(properties);
    }
    
    public void sendMessage(String msg) {
        LOGGER.info("===Producing message[{}]: {}", mTopic, msg);       
        ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOGGER.info("===Producing message success");  
            }

            @Override
            public void onFailure(Throwable ex) {
                LOGGER.info("===Producing message failed");  
            }

        });
    }
}

消費者:

@Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class KafkaConsumer implements InitializingBean {
    private static final Logger LOGGER = LogManager.getLogger(KafkaConsumer.class);

    private String mTopic;
    private M2KCKafkaConfig mM2KCKafkaConfig;
    private KafkaMessageListenerContainer<String, String> mKafkaMessageListenerContainer; 

    @Autowired
    public KafkaConsumer(M2KCKafkaConfig kafkaConfig) {
        LOGGER.info("===KafkaConsumer construct");
        mTopic = kafkaConfig.getKafkaSourceTopic();
        mM2KCKafkaConfig = kafkaConfig;
    }
    
    @PostConstruct
    public void start(){
        LOGGER.info("===KafkaConsumer start");        
    }
    
    @Override  
    public void afterPropertiesSet() throws Exception {          
        LOGGER.info("===afterPropertiesSet is called");      
        createContainer();
    }  

    private void createContainer() {
        mKafkaMessageListenerContainer =  createKafkaMessageListenerContainer();
        mKafkaMessageListenerContainer.setAutoStartup(false);;
        mKafkaMessageListenerContainer.start();
        LOGGER.info("===", mKafkaMessageListenerContainer);
    }
    
    private KafkaMessageListenerContainer<String, String> createKafkaMessageListenerContainer() {
        KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(consumerFactory(),
                createContainerProperties());
        LOGGER.info("===createKafkaMessageListenerContainer");
        return container;
    }
   
    private ContainerProperties createContainerProperties() {
        ContainerProperties containerProps = new ContainerProperties(mTopic);
        containerProps.setMessageListener(createMessageListener());
        return containerProps;
    }

    private ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> properties = new HashMap<String, Object>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers());
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeyDeserializerClass());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueDeserializerClass());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, mM2KCKafkaConfig.getKafkaConsumerGroupID());
        if (mM2KCKafkaConfig.isKafkaSslEnable()) {
            // TODO : to test
            properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol());
            properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword());

            properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation());
            properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword());
            properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword());
        }

        return new DefaultKafkaConsumerFactory<String, String>(properties);
    }

    private MessageListener<String, String> createMessageListener() {
        return new MessageListener<String, String>() {
            @Override
            public void onMessage(ConsumerRecord<String, String> data) {
                // TODO Auto-generated method stub
                LOGGER.info("===Consuming msg: {}", data.value());
            }

        };
    }
}

繼承InitializingBean只是爲了初始化,也能夠去掉,將初始化寫入了構造函數中。這裏的消費者和生產者都使用@Scope,因此須要手動獲取實例,經過context去調用getBean()。另外配置文件沒有寫全,這裏須要注意。

5.3 基於Spring Integration發佈訂閱實現

Spring Integration也有對Kafka支持的適配器,採用Spring Integration,咱們也可以快速的實現發佈訂閱功能,且實現羣組多消費者批量消費功能:

  • 實現Kafka自定義配置類
  • 採用Spring Integration
  • 發佈訂閱
  • 羣組多消費者批量消費
  • 採用DSL特定領域語法去編寫
  • 生產者發佈成功與失敗異常處理

咱們能夠先看看總體的Kafka消息傳遞通道:

  • 出站通道中KafkaProducerMessageHandler用於將消息發送到主題
  • KafkaMessageDrivenChannelAdapter用於設置入站通道和消息處理

具體的Demo能夠參考Github中的一個sample :

6 總結

本篇文章詳細介紹了Spring Kafka的發送消息和接收消息功能,其餘包括Spring Kafka Stream的簡單介紹、Spring Kafka參數配置,以及在Spring Boot中如何經過三種方式去實現Kafka的發佈訂閱功能,涉及了Kafka的多消費者多訂閱者,SSL安全傳輸,Spring Integration Kafka等。文章很長,把握整體,結合實際,差很少基本內容都有所涉及了。

7 知識擴展

Spring Expression Language(簡稱SpEL),在Spring中,不一樣於屬性佔位符${...},而SpEL表達式則要放到#{...}中(除代碼塊中用Expression外)。如配置文件中有topics參數spring.kafka.topics,則能夠將配置文件中參數傳入註解@KafkaListener(id = "foo", topics = "#{'${topicOne:annotated1,foo}'.split(',')}")

SpEL表達式經常使用示例:

// 字面量
#{3.1415926}    // 浮點數
#{9.87E4}       // 科學計數法表示98700
#{'Hello'}      // String 類型
#{false}        // Boolean 類型
// 引用Bean、屬性和方法
#{sgtPeppers}                                   // 使用這個bean
#{sgtPeppers.artist}                            // 引用bean中的屬性
#{sgtPeppers.selectArtist()}                    // 引用bean中的方法
#{sgtPeppers.selectArtist().toUpperCase()}      // 方法返回值的操做
#{sgtPeppers.selectArtist()?.toUpperCase()}     // 防止selectArtist()方法返回null,?表示非null則執行toUpperCase()
// 訪問類做用域的方法和常量的話,使用T()這個關鍵的運算符
#{T(java.lang.Math)}   
#{T(java.lang.Math).PI}             // 引用PI的值
#{T(java.lang.Math).random()}       // 獲取0-1的隨機數
#{T(System).currentTimeMillis()}    // 獲取時間到當前的毫秒數
// 替代屬性佔位符獲取配置文件屬性值
@Value("#{表達式}" 
private String variable;

8 參考資料

相關文章
相關標籤/搜索