本文屬於原創,轉載註明出處,歡迎關注微信小程序小白AI博客
微信公衆號小白AI
或者網站 https://xiaobaiai.nethtml
[TOC]前端
本篇文章內容很全,很長,很細!不要心急,慢慢看!我都寫完了,相信你看完確定能夠的,有任何問題能夠隨時交流!java
本篇文章內容很全,很長,很細!不要心急,慢慢看!我都寫完了,相信你看完確定能夠的,有任何問題能夠隨時交流!git
本篇文章內容很全,很長,很細!不要心急,慢慢看!我都寫完了,相信你看完確定能夠的,有任何問題能夠隨時交流!github
本篇文章主要介紹Spring Kafka的經常使用配置、主題自動建立、發佈消息到集羣、訂閱消息(羣組)、流處理配置以及嵌入式Kafka作測試配置相關內容,最後經過兩種方式去實現消息的發佈和訂閱功能,其中一種是基於Spring Integration
方式。本文內容基於Spring Kafka2.3.3文檔及Spring Boot Kafka相關文檔,Spring建立了一個名爲Spring kafka
的項目,它封裝了Apache的kafka客戶端部分(生產者/消費者/流處理等),以便在Spring項目中快速集成kafka,Spring-Kafka項目提供了Apache Kafka自動化配置,經過Spring Boot的簡化配置(以spring.kafka.*
做爲前綴的配置參數),在Spring Boot中使用Kafka特別簡單。而且Spring Boot還提供了一個嵌入式Kafka代理方便作測試。spring
spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.consumer.group-id=myGroup
實現下面的所涉及到的功能實現,須要有以下環境:apache
更多的配置能夠參考《Kafka,ZK集羣開發或部署環境搭建及實驗》
這一篇文章。編程
本文儘可能作到闡述邏輯清晰,主要路線就是全局介紹Spring Kafka的主要功能及重點配置,而Spring Boot對Spring Kafka進一步簡化配置,經過Spring Boot中的Kafka幾大註解實現發佈訂閱功能,同時經過Spring Integration + 自定義Kafka配置方式實現一個較爲複雜的Kafka發佈訂閱功能,本文經過本身實驗和整理了較久的時間,涵蓋了Spring Kafka大部份內容,但願你們耐心讀下來,有什麼問題隨時反饋,一塊兒學習。
Spring Kafka、Spring Integration和Kafka客戶端版本聯繫或者兼容性以下(截至2019年12月9日):json
Spring for Apache Kafka | Spring Integration for Apache Kafka Version | kafka-clients |
---|---|---|
2.3.x | 3.2.x | 2.3.1 |
2.2.x | 3.1.x | 2.0.1, 2.1.x, 2.2.x |
2.1.x | 3.0.x | 1.0.x, 1.1.x, 2.0.0 |
1.3.x | 2.3.x | 0.11.0.x, 1.0.x |
具體更多版本特色能夠看官網,spring kafka當前最新爲2.3.4版本。
Spring Kafka相關的註解有以下幾個:bootstrap
註解類型 | 描述 |
---|---|
EnableKafka | 啓用由AbstractListenerContainerFactory 在封面(covers)下建立的Kafka監聽器註解端點,用於配置類; |
EnableKafkaStreams | 啓用默認的Kafka流組件 |
KafkaHandler | 在用KafkaListener註解的類中,將方法標記爲Kafka消息監聽器的目標的註解 |
KafkaListener | 將方法標記爲指定主題上Kafka消息監聽器的目標的註解 |
KafkaListeners | 聚合多個KafkaListener註解的容器註解 |
PartitionOffset | 用於向KafkaListener添加分區/初始偏移信息 |
TopicPartition | 用於向KafkaListener添加主題/分區信息 |
如使用@EnableKafka
能夠監聽AbstractListenerContainerFactory
子類目標端點,如ConcurrentKafkaListenerContainerFactory
是AbstractKafkaListenerContainerFactory
的子類。
public class ConcurrentKafkaListenerContainerFactory<K,V> extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K,V>,K,V>
@Configuration @EnableKafka public class AppConfig { @Bean public ConcurrentKafkaListenerContainerFactory myKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(4); return factory; } // other @Bean definitions }
@EnableKafka
並非在Spring Boot中啓用Kafka必須的,Spring Boot附帶了Spring Kafka的自動配置,所以不須要使用顯式的@EnableKafka
。若是想要本身實現Kafka配置類,則須要加上@EnableKafka
,若是你不想要Kafka自動配置,好比測試中,須要作的只是移除KafkaAutoConfiguration
:
@SpringBootTest("spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration")
💡 要在應用啓動時就建立主題,能夠添加NewTopic
類型的Bean。若是該主題已經存在,則忽略Bean。
Spring的KafkaTemplate
是自動配置的,你能夠直接在本身的Bean中自動鏈接它,以下例所示:
@Component public class MyBean { private final KafkaTemplate kafkaTemplate; @Autowired public MyBean(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } // ... }
KafkaTemplate
包裝了一個生產者,並提供了向kafka主題發送數據的方便方法。提供異步和同步(發送阻塞)方法,異步(發送非阻塞)方法返回ListenableFuture
,以此監聽異步發送狀態,成功仍是失敗,KafkaTemplate提供以下接口:
ListenableFuture<SendResult<K, V>> sendDefault(V data); ListenableFuture<SendResult<K, V>> sendDefault(K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, V data); ListenableFuture<SendResult<K, V>> send(String topic, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data); ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record); ListenableFuture<SendResult<K, V>> send(Message<?> message); Map<MetricName, ? extends Metric> metrics(); List<PartitionInfo> partitionsFor(String topic); <T> T execute(ProducerCallback<K, V, T> callback); // Flush the producer. void flush(); interface ProducerCallback<K, V, T> { T doInKafka(Producer<K, V> producer); }
sendDefault
API 要求已向模板提供默認主題。部分API接受一個時間戳做爲參數,並將該時間戳存儲在記錄中,如何存儲用戶提供的時間戳取決於Kafka主題上配置的時間戳類型,若是主題配置爲使用CREATE_TIME
,則記錄用戶指定的時間戳(若是未指定則生成)。若是將主題配置爲使用LOG_APPEND_TIME
,則忽略用戶指定的時間戳,而且代理將添加本地代理時間。metrics
和 partitionsFor
方法委託給底層Producer上的相同方法。execute方法提供對底層生產者的直接訪問
要使用模板,能夠配置一個生產者工廠並在模板的構造函數中提供它。下面的示例演示瞭如何執行此操做:
@Bean public ProducerFactory<Integer, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // See https://kafka.apache.org/documentation/#producerconfigs for more properties return props; } @Bean public KafkaTemplate<Integer, String> kafkaTemplate() { // KafkaTemplate構造函數中輸入生產者工廠配置 return new KafkaTemplate<Integer, String>(producerFactory()); }
而後,要使用模板,能夠調用其方法之一發送消息。
當你使用包含Message<?>
參數的方法時,主題、分區和鍵信息在消息頭中提供,有以下子項:
KafkaHeaders.TOPIC KafkaHeaders.PARTITION_ID KafkaHeaders.MESSAGE_KEY KafkaHeaders.TIMESTAMP
如訪問頭部信息中某一項信息:
public void handleMessage(Message<?> message) throws MessagingException { LOGGER.debug("===Received Msg Topic: {}", message.getHeaders().get(KafkaHeaders.TOPIC)); }
可選的功能是,可使用ProducerListener
配置KafkaTemplate
,以得到帶有發送結果(成功或失敗)的異步回調,而不是等待未來完成。如下列表顯示了ProducerListener
接口的定義:
public interface ProducerListener<K, V> { void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata); void onError(String topic, Integer partition, K key, V value, Exception exception); boolean isInterestedInSuccess(); }
默認狀況下,模板配置有LoggingProducerListener
,它只記錄錯誤,在發送成功時不執行任何操做。只有當isInterestedInSuccess
返回true時才調用onSuccess
。
爲了方便起見,若是你只想實現其中一個方法,那麼將提供抽象ProducerListenerAdapter
。對於isInterestedInSuccess
,它返回false。下面演示了異步結果回調:
public void sendMessage(String msg) { LOGGER.info("===Producing message[{}]: {}", mTopic, msg); ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { LOGGER.info("===Producing message success"); } @Override public void onFailure(Throwable ex) { LOGGER.info("===Producing message failed"); } }); }
若是但願阻止式發送線程等待結果,能夠調用future
的get()
方法。你可能但願在等待以前調用flush()
,或者爲了方便起見,模板有一個帶有autoFlush
參數的構造函數,該構造函數在每次發送時都會致使模板flush()
。不過,請注意,刷新可能會顯著下降性能:
public void sendToKafka(final MyOutputData data) { final ProducerRecord<String, String> record = createRecord(data); try { template.send(record).get(10, TimeUnit.SECONDS); handleSuccess(data); } catch (ExecutionException e) { handleFailure(data, record, e.getCause()); } catch (TimeoutException | InterruptedException e) { handleFailure(data, record, e); } }
使用DefaultKafkaProducerFactory:
如上面使用KafkaTemplate
中所示,ProducerFactory
用於建立生產者。默認狀況下,當不使用事務時,DefaultKafkaProducerFactory
會建立一個供全部客戶機使用的單例生產者,如KafkaProducer
javadocs中所建議的那樣。可是,若是對模板調用flush(),這可能會致使使用同一個生產者的其餘線程延遲。從2.3版開始,DefaultKafkaProducerFactory
有一個新屬性producerPerThread
。當設置爲true
時,工廠將爲每一個線程建立(和緩存)一個單獨的生產者,以免此問題。
當producerPerThread
爲true時,當再也不須要生產者時,用戶代碼必須在工廠上調用closeThreadBoundProducer()
。這將實際關閉生產者並將其從ThreadLocal
中移除。調用reset()或destroy()不會清理這些生產者。
建立DefaultKafkaProducerFactory
時,能夠經過調用只接受屬性映射的構造函數(請參閱使用KafkaTemplate中的示例)從配置中獲取鍵和/或值序列化器類,或者序列化程序實例能夠傳遞給DefaultKafkaProducerFactory
構造函數(在這種狀況下,全部生產者共享相同的實例)。或者,能夠提供Supplier<Serializer> s
(從版本2.3開始),用於爲每一個生產者獲取單獨的Serializer
實例:
@Bean public ProducerFactory<Integer, CustomValue> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer()); } @Bean public KafkaTemplate<Integer, CustomValue> kafkaTemplate() { return new KafkaTemplate<Integer, CustomValue>(producerFactory()); }
使用ReplyingKafkaTemplate:
版本2.1.3
引入了KafkaTemplate
的一個子類來提供請求/應答語義。這個類名爲ReplyingKafkaTemplate
,而且有一個方法(除了超類中的那些方法以外)。下面的列表顯示了方法簽名:
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record); RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record, Duration replyTimeout);
結果是一個ListenableFuture
,它被結果異步填充(或者超時時出現異常)。結果還有一個sendFuture
屬性,這是調用KafkaTemplate.send()
的結果。你可使用此Future肯定發送操做的結果。這裏就不展開了。
能夠經過配置MessageListenerContainer
並提供消息監聽器或使用@KafkaListener
註解來接收消息。
使用消息監聽器容器(message listener container)時,必須提供監聽器才能接收數據。目前有八個消息監聽器支持的接口。下面的列表顯示了這些接口:
// 使用自動提交或容器管理的提交方法之一時,使用此接口處理從Kafka 消費者 poll() 做接收的單個ConsumerRecord實例 public interface MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data); } // 使用手動提交方法之一時,使用此接口處理從Kafka 消費者 poll() 操做接收的單個ConsumerRecord實例 public interface AcknowledgingMessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment); } // 使用自動提交或容器管理的提交方法之一時,使用此接口處理從Kafka 消費者 poll() 操做接收的單個ConsumerRecord實例。提供對消費者對象的訪問。 public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer); } // 使用手動提交方法之一時,使用此接口處理從Kafka 消費者 poll() 操做接收的單個ConsumerRecord實例。提供對消費者對象的訪問。 public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); } // 使用自動提交或容器管理的提交方法之一時,使用此接口處理從Kafka 消費者 poll() 操做接收的全部ConsumerRecord實例。使用此接口時不支持AckMode.RECORD,由於監聽器已得到完整的批處理。 public interface BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data); } // 使用手動提交方法之一時,使用此接口處理從Kafka 消費者 poll() 操做接收的全部ConsumerRecord實例。 public interface BatchAcknowledgingMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment); } // 使用自動提交或容器管理的提交方法之一時,使用此接口處理從Kafka 消費者 poll() 操做接收的全部ConsumerRecord實例。使用此接口時不支持AckMode.RECORD,由於監聽器已得到完整的批處理。提供對使用者對象的訪問。 public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer); } // 使用手動提交方法之一時,使用此接口處理從Kafka 消費者 poll() 操做接收的全部ConsumerRecord實例。提供對使用者對象的訪問。 public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); }
上述消費者對象不是線程安全的。只能在調用偵聽器的線程上調用其方法。
提供了兩個MessageListenerContainer
的實現:
KafkaMessageListenerContainer
從單個線程上的全部主題或分區接收全部消息(即一個分區只能分配到一個消費者,一個消費者能夠被分配多個分區)。ConcurrentMessageListenerContainer
委託給一個或多個KafkaMessageListenerContainer
實例,以提供多線程使用,從多線程上去處理主題或分區的全部消息。
從Spring Kafka2.2.7版開始,你能夠將RecordInterceptor
添加到偵聽器容器中;在調用偵聽器以容許檢查或修改記錄以前,將調用它。若是攔截器返回null,則不調用偵聽器。偵聽器是批處理偵聽器時不調用偵聽器。從2.3版開始,CompositeRecordInterceptor
可用於調用多個攔截器。
默認狀況下,使用事務時,偵聽器在事務啓動後調用。從2.3.4版開始,你能夠設置偵聽器容器的interceptBeforeTx
屬性,以便在事務啓動以前調用偵聽器。沒有爲批處理偵聽器提供偵聽器,由於Kafka已經提供了ConsumerInterceptor
。
有以下構造函數可用:
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties) public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties, TopicPartitionOffset... topicPartitions)
每一個都獲取一個ConsumerFactory
以及有關主題和分區的信息,以及ContainerProperties
對象中的其餘配置。ConcurrentMessageListenerContainer
(稍後介紹)使用第二個構造函數跨使用者實例分發TopicPartitionOffset
。ContainerProperties
具備如下構造函數:
public ContainerProperties(TopicPartitionOffset... topicPartitions) public ContainerProperties(String... topics) public ContainerProperties(Pattern topicPattern)
第一個構造函數接受一個TopicPartitionOffset
參數數組來顯式地指示容器要使用哪些分區(使用消費者的 assign()方法)和可選的初始偏移量。默認狀況下,正值是絕對偏移量。默認狀況下,負值是相對於分區內的當前最後偏移量。提供了TopicPartitionOffset
的構造函數,該構造函數接受一個附加的布爾參數。若是是true,則初始偏移(正偏移或負偏移)相對於該消耗器的當前位置。容器啓動時應用偏移量。第二個是主題數組,Kafka基於group.id
屬性:在組中分佈分區來分配分區。第三個使用regex表達式來選擇主題。
要將MessageListener
分配給容器,能夠在建立容器時使用ContainerProps.setMessageListener
方法。下面的示例演示瞭如何執行此操做:
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2"); containerProps.setMessageListener(new MessageListener<Integer, String>() { ... }); DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps()); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps); return container;
注意當建立一個Defaultkafkafkaconsumerfactory
時,使用構造器,該構造器僅以其特性爲基礎,就意味着從配置中獲取了key/value的Deserializer類別。或者,反序列化程序實例能夠傳遞給key/value的DefaultKafkaConsumerFactory
構造函數,在這種狀況下,全部消費者共享相同的實例。另外一個選項是提供Supplier<Deserializer>s
(從版本2.3開始),用於爲每一個使用者獲取單獨的反序列化程序實例:
DefaultKafkaConsumerFactory<Integer, CustomValue> cf = new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer()); KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps); return container;
有關能夠設置的各類屬性的更多信息,請參閱Javadoc 中ContainerProperties
。
從版本Spring Kafka 2.1.1開始,一個名爲logContainerConfig
的新屬性就可用了。當啓用true和INFO日誌記錄時,每一個偵聽器容器都會寫入一條日誌消息,總結其配置屬性。
例如,要將日誌級別更改成INFO,可使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.level.INFO)
。
從版本Spring Kafka 2.2開始,添加了名爲missingtopicsfailal
的新容器屬性(默認值:true)。若是代理上不存在任何客戶端發佈或訂閱涉及到的主題,這將阻止容器啓動。若是容器配置爲偵聽主題模式(regex),則不適用。之前,容器線程在consumer.poll()
方法中循環,等待在記錄許多消息時出現主題。除了日誌,沒有跡象代表有問題。要恢復之前的行爲,能夠將屬性設置爲false,這個時候,Broker設置項allow.auto.create.topics=true,且這個容器屬性爲false,則會自動建立不存在的topic。
單個構造函數相似於第一個KafkaListenerContainer
構造函數。下面的列表顯示了構造函數的簽名:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties)
它還有一個併發屬性。例如,container.setConcurrency(3)
即表示建立三個KafkaMessageListenerContainer
實例。對於第一個構造函數,Kafka使用它的組管理功能將分區分佈到消費者之間。
當監聽多個主題時,默認的分區分佈可能不是你指望的那樣。例如,若是你有三個主題,每一個主題有五個分區,而且但願使用concurrency=15
,那麼你只看到五個活動的消費者,每一個消費者從每一個主題中分配一個分區,其餘十個消費者處於空閒狀態。這是由於默認的KafkaPartitionAssignor
是RangeAssignor
(參見其Javadoc)。對於這種狀況,你可能須要考慮改用RoundRobinAssignor
,它將分區分佈到全部使用者。而後,爲每一個使用者分配一個主題或分區。若要更改PartitionAssignor
,你能夠在提供給DefaultKafkaConsumerFactory
的屬性中設置partition.assignment.strategy
消費者配置參數(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
)。使用Spring Boot時,能夠按以下方式分配設置策略:
spring.kafka.consumer.properties.partition.assignment.strategy=\ org.apache.kafka.clients.consumer.RoundRobinAssignor
對於第二個構造函數,ConcurrentMessageListenerContainer
將TopicPartition
實例分佈在委託KafkaMessageListenerContainer
實例上。
例如,若是提供了六個TopicPartition
實例,併發性爲3;每一個容器獲得兩個分區。對於五個TopicPartition
實例,兩個容器獲得兩個分區,第三個容器獲得一個分區。若是併發性大於TopicPartitions
的數量,則會向下調整併發性,以便每一個容器得到一個分區。調整分區的方式可使用命令行工具kafka-topics.sh
查詢和調整主題上的分區數。還能夠添加一個NewTopic
Bean,若是NewTopic設定的數目大於當前數目,spring boot的自動配置的KafkaAdmin
將向上調整分區。
client.id屬性(若是已設置)將附加
-n
,其中n是對應於併發的消費者實例。當啓用JMX時,這是爲MBeans提供惟一名稱所必需的。
從版本Spring Kafka 1.3開始,MessageListenerContainer
提供了對底層KafkaConsumer
的度量的訪問。對於ConcurrentMessageListenerContainer
,metrics()
方法返回全部目標KafkaMessageListenerContainer
實例的度量(metrics)。根據爲底層KafkaConsumer
提供的client-id
度量被分組到Map<MetricName, ?extends Metric>
。
從2.3版開始,ContainerProperties
提供了一個idleBetweenPolls
選項,容許偵聽器容器中的主循環在KafkaConsumer.poll()
調用之間睡眠。從提供的選項中選擇實際睡眠間隔做爲最小值,而且選擇max.poll.interval.ms
消費者配置和當前記錄批處理時間之間的差別。
提供了幾個提交偏移量的選項。若是enable.auto.commit
使用者屬性爲true
,則Kafka將根據其配置自動提交偏移量。若是爲false
,則容器支持多個AckMode
設置(在下一個列表中描述)。默認的確認模式是批處理。從2.3版開始,框架將enable.auto.commit
設置爲false
,除非在配置中顯式設置。之前,若是未設置屬性,則使用Kafka默認值(true)。消費者 poll()
方法返回一個或多個ConsumerRecords
。爲每一個記錄調用MessageListener
。如下列表描述了容器對每一個AckMode
採起的操做:
poll()
返回的全部記錄後提交偏移量。poll()
返回的全部記錄後提交偏移量,只要超過上次提交後的ackTime
poll()
返回的全部記錄後提交偏移量,只要上次提交後收到ackCount
記錄。TIME
和COUNT
,但若是兩個條件都爲true,則執行提交。acknowledge()
和Acknowledgment
。以後,應用與BATCH相同的語義。Acknowledgement.acknowledge()
方法時當即提交偏移量。MANUAL和MANUAL_IMMEDIATE 要求偵聽器是AcknowledgingMessageListener
或BatchAcknowledgingMessageListener
。請參見消息偵聽器。
根據syncCommits
容器屬性,使用消費者上的commitSync()
或commitAsync()
方法。默認狀況下,syncCommits
爲true;另請參閱setSyncCommitTimeout
。請參閱setCommitCallback
以獲取異步提交的結果;默認回調是LoggingCommitCallback
,它記錄錯誤(以及調試級別的成功)。
由於偵聽器容器有本身的提交偏移的機制,因此它但願Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
爲false。從2.3版開始,除非在使用者工廠或容器的使用者屬性重寫中特別設置,不然它將無條件地將其設置爲false。
Acknowledgment
有如下方法:
public interface Acknowledgment { void acknowledge(); }
此方法使偵聽器能夠控制什麼時候提交偏移。
從版本2.3開始,確認接口有兩個附加方法nack(long sleep)
和nack(int index, long sleep)
。第一個用於記錄偵聽器,第二個用於批處理偵聽器。爲偵聽器類型調用錯誤的方法將引起IllegalStateException
。
nack()只能在調用偵聽器的消費者線程上調用。
使用批處理偵聽器時,能夠在發生故障的批內指定索引。調用nack()
時,將在對失敗和丟棄的記錄的分區執行索引和查找以前提交記錄的偏移量,以便在下次poll()
時從新傳遞這些偏移量。這是對SeekToCurrentBatchErrorHandler
的改進,SeekToCurrentBatchErrorHandler
只能查找整個批次以便從新交付。
注意:經過組管理使用分區分配時,確保sleep參數(加上處理上一次輪詢記錄所花費的時間)小於
consumer max.poll.interval.ms
屬性很是重要。
偵聽器容器實現了SmartLifecycle
(經過SmartLifecycle
在Spring加載和初始化全部bean後,接着執行一些任務或者啓動須要的異步服務),默認狀況下autoStartup
爲true
。容器在後期啓動(Integer.MAX-VALUE - 100
)。實現SmartLifecycle
以處理來自偵聽器的數據的其餘組件應該在較早的階段啓動。-100
爲之後的階段留出了空間,使組件可以在容器以後自動啓動。好比咱們經過@Bean
將監聽器容器交給Spring管理,這個時候經過SmartLifecycle
自動執行了初始化的任務,可是當咱們手動經過new監聽器容器實例,則後初始化則不會執行,好比KafkaMessageListenerContainer
實例須要手動執行start()
。
autoStartup
在手動執行start中設置true與false沒有做用,能夠參見@KafkaListener
聲明週期管理這一小節。
@KafkaListener
註解用於將bean方法指定爲偵聽器容器的偵聽器。bean包裝在一個MessagingMessageListenerAdapter
中,該適配器配置有各類功能,如轉換器,用於轉換數據(若有必要)以匹配方法參數。經過使用屬性佔位符(${…}
),或者可使用SpEL(#{…}
)配置註釋上的大多數屬性。有關更多信息,請參閱Javadoc。
@KafkaListener
:
id
:listener惟一id,當GroupId沒有被配置的時候,默認id爲自動產生,此值指定後會覆蓋group id。containerFactory
:上面提到了@KafkaListener區分單數據仍是多數據消費只須要配置一下註解的containerFactory屬性就能夠了,這裏面配置的是監聽容器工廠,也就是ConcurrentKafkaListenerContainerFactory
,配置Bean名稱topics
:須要監聽的Topic,可監聽多個,能夠是表達式或者佔位符關鍵字或者直接是主題名稱,如多個主題監聽:{"topic1" , "topic2"}
topicPattern
: 此偵聽器的主題模式。條目能夠是「主題模式」、「屬性佔位符鍵」或「表達式」。框架將建立一個容器,該容器訂閱與指定模式匹配的全部主題,以獲取動態分配的分區。模式匹配將針對檢查時存在的主題週期性地執行。表達式必須解析爲主題模式(支持字符串或模式結果類型)。這使用組管理,Kafka將爲組成員分配分區。topicPartitions
:用於使用手動主題/分區分配時errorHandler
:監聽異常處理器,配置Bean名稱,默認爲空groupId
:消費組IDidIsGroup
:id是否爲GroupIdclientIdPrefix
:消費者Id前綴beanRef
:真實監聽容器的Bean名稱,須要在 Bean名稱前加 "__"@KafkaListener
註解爲簡單的POJO偵聽器提供了一種機制。下面的示例演示如何使用它:
public class Listener { @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId") public void listen(String data) { ... } }
此機制生效須要@Configuration
類之一上的@EnableKafka
註解和用於配置基礎ConcurrentMessageListenerContainer
的偵聽器容器工廠。默認狀況下,須要名爲kafkaListenerContainerFactory
的bean。如下示例演示如何使用ConcurrentMessageListenerContain
:
@Configuration @EnableKafka public class KafkaConfig { @Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()); ... return props; } }
注意,要設置容器屬性,必須在工廠上使用getContainerProperties()
方法。它用做注入容器的實際屬性的模板。
從版本2.1.1開始,如今能夠爲註解建立的消費者設置client.id
屬性。clientdprefix
的後綴是-n
,其中n是一個整數,表示使用併發時的容器號。
從2.2版開始,如今能夠經過使用批註自己的屬性來重寫容器工廠的併發性和自動啓動屬性。屬性能夠是簡單值、屬性佔位符或SpEL表達式。下面的示例演示瞭如何執行此操做:
@KafkaListener(id = "myListener", topics = "myTopic", autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}") public void listen(String data) { ... }
你還可使用顯式主題和分區(以及可選的初始偏移量)配置POJO偵聽器。下面的示例演示瞭如何執行此操做:
@KafkaListener(id = "thing2", topicPartitions = { @TopicPartition(topic = "topic1", partitions = { "0", "1" }), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) }) public void listen(ConsumerRecord<?, ?> record) { ... }
你能夠在partitions
或partitionOffsets
屬性中指定每一個分區,但不能同時指定二者。
使用手動AckMode
時,還能夠向偵聽器提供Acknowledgment
。下面的示例還演示瞭如何使用不一樣的容器工廠:
@KafkaListener(id = "cat", topics = "myTopic", containerFactory = "kafkaManualAckListenerContainerFactory") public void listen(String data, Acknowledgment ack) { ... ack.acknowledge(); }
最後,能夠從消息頭得到有關消息的元數據。你可使用如下頭名稱來檢索消息頭內容:
KafkaHeaders.OFFSET KafkaHeaders.RECEIVED_MESSAGE_KEY KafkaHeaders.RECEIVED_TOPIC KafkaHeaders.RECEIVED_PARTITION_ID KafkaHeaders.RECEIVED_TIMESTAMP KafkaHeaders.TIMESTAMP_TYPE
示例:
@KafkaListener(id = "qux", topicPattern = "myTopic1") public void listen(@Payload String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts ) { ... }
從版本1.1開始,能夠配置@KafkaListener
方法來接收從消費者接收的整批消費者記錄。要將偵聽器容器工廠配置爲建立批處理偵聽器,能夠設置batchListener
屬性。下面的示例演示瞭如何執行此操做:
@Bean public KafkaListenerContainerFactory<?, ?> batchFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setBatchListener(true); return factory; }
如下示例顯示如何接收有效載荷列表:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<String> list) { ... }
主題、分區、偏移量等在與有效負載並行的頭中可用。下面的示例演示如何使用標題:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<String> list, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics, @Header(KafkaHeaders.OFFSET) List<Long> offsets) { ... }
或者,您能夠接收消息列表Message<?>
對象,其中包含每一個偏移量和每一個消息中的其餘詳細信息,但它必須是惟一的參數(除了使用手動提交時的Acknowledgment和/
或Consumer<?, ?>
參數)。下面的示例演示如何執行此操做:
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory") public void listen14(List<Message<?>> list) { ... } @KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory") public void listen15(List<Message<?>> list, Acknowledgment ack) { ... } @KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory") public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) { ... }
在這種狀況下,不會對有效載荷執行轉換。若是BatchMessagingMessageConverter
配置了RecordMessageConverter
,則還能夠向消息參數添加泛型類型,並轉換有效負載。有關詳細信息,請參閱使用批處理偵聽器的負載轉換。
你還能夠收到一個ConsumerRecord<?, ?>
對象,但它必須是惟一的參數(當使用手動提交或Consumer<?, ?>
參數時,除了可選的Acknowledgment)。下面的示例演示瞭如何執行此操做:
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<ConsumerRecord<Integer, String>> list) { ... } @KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) { ... }
從版本2.2開始,偵聽器能夠接收poll()
方法返回的完整的ConsumerRecords<?, ?>
對象,容許偵聽器訪問其餘方法,例如partitions()
(返回列表中的TopicPartition
實例)和records
(TopicPartition)(獲取選擇性記錄)。一樣,這必須是惟一的參數(當使用手動提交或Consumer<?, ?>
參數時,除了可選的Acknowledgment)。下面的示例演示瞭如何執行此操做:
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory") public void pollResults(ConsumerRecords<?, ?> records) { ... }
從2.2版開始,如今更容易添加驗證程序來驗證@KafkaListener
`@Payload參數。之前,你必須配置一個自定義的
DefaultMessageHandlerMethodFactory`並將其添加到註冊器中。如今,你能夠將驗證器添加到註冊器自己。如下代碼說明了如何執行此操做:
@Configuration @EnableKafka public class Config implements KafkaListenerConfigurer { ... @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setValidator(new MyValidator()); } }
當你在Spring Boot使用validation starter
,會自動配置LocalValidatorFactoryBean
,以下例所示:
@Configuration @EnableKafka public class Config implements KafkaListenerConfigurer { @Autowired private LocalValidatorFactoryBean validator; ... @Override public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) { registrar.setValidator(this.validator); } }
如下示例演示如何驗證:
public static class ValidatedClass { @Max(10) private int bar; public int getBar() { return this.bar; } public void setBar(int bar) { this.bar = bar; } }
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler", containerFactory = "kafkaJsonListenerContainerFactory") public void validatedListener(@Payload @Valid ValidatedClass val) { ... } @Bean public KafkaListenerErrorHandler validationErrorHandler() { return (m, e) -> { ... }; }
ContainerProperties
有一個名爲consumerRebalanceListener
的屬性,該屬性接受Kafka客戶端的consumerRebalanceListene
r接口的實現。若是未提供此屬性,則容器將配置日誌偵聽器,該偵聽器將在信息級別記錄從新平衡事件。該框架還添加了一個子接口ConsumerRawareRebalanceListener
。如下列表顯示了ConsumerRawareRebalanceListener
接口定義:
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener { void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions); void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions); void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions); }
從2.0版開始,若是還使用@SendTo
註解註釋@KafkaListener
,而且方法調用返回結果,則結果將轉發到@SendTo
指定的主題。如:
@KafkaListener(topics = "annotated21") @SendTo("!{request.value()}") // runtime SpEL public String replyingListener(String in) { ... } @KafkaListener(topics = "${some.property:annotated22}") @SendTo("#{myBean.replyTopic}") // config time SpEL public Collection<String> replyingBatchListener(List<String> in) { ... } @KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler") @SendTo("annotated23reply") // static reply topic definition public String replyingListenerWithErrorHandler(String in) { ... } ... @KafkaListener(topics = "annotated25") @SendTo("annotated25reply1") public class MultiListenerSendTo { @KafkaHandler public String foo(String in) { ... } @KafkaHandler @SendTo("!{'annotated25reply2'}") public String bar(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) { ... } }
爲@KafkaListener
註解建立的偵聽器容器不是應用程序上下文中的bean。相反,它們是用KafkaListenerEndpointRegistry
類型的基礎設施bean註冊的。這個bean由框架自動聲明並管理容器的生命週期;它將自動啓動任何autoStartup
設置爲true
的容器。全部容器工廠建立的全部容器必須處於同一phase
。有關詳細信息,請參閱偵聽器容器自動啓動。你可使用註冊表以編程方式管理生命週期。啓動或中止註冊表將啓動或中止全部已註冊的容器。或者,能夠經過使用單個容器的id屬性來獲取對該容器的引用。能夠在批註上設置autoStartup
,這將覆蓋容器工廠中配置的默認設置(setAutoStartup(true)
)。你能夠從應用程序上下文中獲取對bean的引用,例如自動鏈接,以管理其註冊的容器。如下示例說明了如何執行此操做:
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false") public void listen(...) { ... }
@Autowired private KafkaListenerEndpointRegistry registry; ... this.registry.getListenerContainer("myContainer").start(); ...
註冊表只維護其管理的容器的生命週期;聲明爲bean的容器不受註冊表管理,能夠從應用程序上下文中獲取。能夠經過調用註冊表的getListenerContainers()
方法來獲取託管容器的集合。Spring Kafka版本2.2.5添加了一個方便方法getAllListenerContainers()
,它返回全部容器的集合,包括由註冊表管理的容器和聲明爲bean的容器。返回的集合將包括任何已初始化的原型bean,但它不會初始化任何延遲bean聲明。
Spring for Apache Kafka
提供了一個工廠bean來建立StreamsBuilder
對象並管理其流的生命週期。只要kafka流在classpath上而且kafka流經過@EnableKafkaStreams
註解開啓,Spring Boot就會自動配置所需的KafkaStreamsConfiguration
bean。
啓用Kafka流意味着必須設置應用程序id和引導服務器(bootstrap servers)。前者可使用spring.kafka.streams.application-id
配置,若是未設置,則默認爲spring.application.name
。後者能夠全局設置,也能夠專門爲流覆寫。
使用專用屬性可使用其餘幾個屬性;可使用spring.Kafka.streams.properties
命名空間設置其餘任意Kafka屬性。有關詳細信息,Additional Kafka Properties 。
默認狀況下,由它建立的StreamBuilder
對象管理的流將自動啓動。可使用spring.kafka.streams.auto-startup
屬性自定義此行爲。
要使用工廠bean,只需將StreamsBuilder
鏈接到@bean
,以下例所示:
@Configuration(proxyBeanMethods = false) @EnableKafkaStreams public static class KafkaStreamsExampleConfiguration { @Bean public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) { KStream<Integer, String> stream = streamsBuilder.stream("ks1In"); stream.map((k, v) -> new KeyValue<>(k, v.toUpperCase())).to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>())); return stream; } }
默認狀況下,由它建立的StreamBuilder
對象管理的流將自動啓動。可使用spring.kafka.streams.auto-startup
屬性自定義此行爲。
自動配置支持的屬性顯示在公用應用程序屬性中。注意,在大多數狀況下,這些屬性(連字符或駝峯樣式)直接映射到Apache Kafka點式屬性。有關詳細信息,請參閱Apache Kafka
文檔。
前面提到的幾個屬性應用於全部組件(生產者、消費者、管理員和流),但若是但願使用不一樣的值,則能夠在組件級別指定。Apache Kafka指定重要性爲HIGH
、MEDIUM
或LOW
的屬性。Spring Boot自動配置支持全部高重要性屬性、某些選定的中、低屬性以及任何沒有默認值的屬性。
只有Kafka支持的屬性的一個子集能夠經過KafkaProperties
類直接使用,若是要使用不直接支持的其餘屬性配置生產者或消費者,請使用如下屬性:
spring.kafka.properties.prop.one=first spring.kafka.admin.properties.prop.two=second spring.kafka.consumer.properties.prop.three=third spring.kafka.producer.properties.prop.four=fourth spring.kafka.streams.properties.prop.five=fifth
上面的參數設置示例將公共prop.one
Kafka屬性設置爲first
(適用於生產者、消費者和管理員),prop.two
admin屬性設置爲second
,prop.three
consumer屬性設置爲third
,prop.four
producer屬性設置爲fourth
,prop.five
streams屬性設置爲fifth
。
你還能夠配置Spring Kafka JsonDeserializer
,以下所示:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme
相似地,能夠禁用JsonSerializer
在頭中發送類型信息的默認行爲:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.producer.properties.spring.json.add.type.headers=false
注意: 以這種方式設置的屬性將覆蓋Spring Boot顯式支持的任何配置項。
Spring for Apache Kafka提供了一種使用嵌入式Apache Kafka代理測試項目的便捷方法。要使用此功能,請使用Spring Kafka測試模塊中的@EmbeddedKafka
註解測試類。有關更多信息,請參閱Spring For Apache Kafka參考手冊。
要使Spring Boot自動配置與前面提到的嵌入式Apache Kafka代理一塊兒工做,須要將嵌入式代理地址(由EmbeddedKafkaBroker
填充)的系統屬性從新映射到Apache Kafka的Spring Boot配置屬性中。有幾種方法能夠作到這一點:
spring.kafka.bootstrap-servers
:static { System.setProperty(EmbeddedKafkaBroker.BROKER_LIST_PROPERTY, "spring.kafka.bootstrap-servers"); }
@EmbeddedKafka
註解上配置屬性名:@EmbeddedKafka(topics = "someTopic", bootstrapServersProperty = "spring.kafka.bootstrap-servers")
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
Spring Integration也有Kafka的適配器,所以咱們能夠很方便的採用Spring Integration去實現發佈訂閱,固然你也能夠不使用Spring Integration。
Spring Integration是什麼,具體有什麼做用,能夠參考另外一篇文章《Spring Integration最詳解》。
這裏對全部配置作個說明的是,spring kafka配置分全局配置和子模塊配置,子模塊配置會複寫全局配置,好比SSL認證能夠全局配置,可是也能夠在每一個子模塊,如消費者、生產者、流式處理中均可以單獨配置SSL(多是微服務部署,消費者和生產者不在同一個應用中)。這裏重點介紹生產者和消費者配置吧,其餘就不展開了,用到的時候再去查找和補充。
# 用逗號分隔的主機:端口對列表,用於創建到Kafka羣集的初始鏈接。覆蓋全局鏈接設置屬性 spring.kafka.bootstrap-servers # 在發出請求時傳遞給服務器的ID。用於服務器端日誌記錄 spring.kafka.client-id,默認無 # 用於配置客戶端的其餘屬性,生產者和消費者共有的屬性 spring.kafka.properties.* # 消息發送的默認主題,默認無 spring.kafka.template.default-topic
Spring Boot中,Kafka 生產者
相關配置(全部配置前綴爲spring.kafka.producer.
):
# 生產者要求Leader在考慮請求完成以前收到的確認數 spring.kafka.producer.acks # 默認批量大小。較小的批處理大小將使批處理不太常見,並可能下降吞吐量(批處理大小爲零將徹底禁用批處理) spring.kafka.producer.batch-size spring.kafka.producer.bootstrap-servers # 生產者可用於緩衝等待發送到服務器的記錄的總內存大小。 spring.kafka.producer.buffer-memory # 在發出請求時傳遞給服務器的ID。用於服務器端日誌記錄。 spring.kafka.producer.client-id # 生產者生成的全部數據的壓縮類型 spring.kafka.producer.compression-type # 鍵的序列化程序類 spring.kafka.producer.key-serializer spring.kafka.producer.properties.* # 大於零時,啓用失敗發送的重試次數 spring.kafka.producer.retries spring.kafka.producer.ssl.key-password spring.kafka.producer.ssl.key-store-location spring.kafka.producer.ssl.key-store-password spring.kafka.producer.ssl.key-store-type spring.kafka.producer.ssl.protocol spring.kafka.producer.ssl.trust-store-location spring.kafka.producer.ssl.trust-store-password spring.kafka.producer.ssl.trust-store-type # 非空時,啓用對生產者的事務支持 spring.kafka.producer.transaction-id-prefix spring.kafka.producer.value-serializer
Spring Boot中,Kafka 消費者相關配置(全部配置前綴爲spring.kafka.consumer.
):
# 若是「enable.auto.commit」設置爲true,設置消費者偏移自動提交到Kafka的頻率,默認值無,單位毫秒(ms) spring.kafka.consumer.auto-commit-interval # 當Kafka中沒有初始偏移或服務器上再也不存在當前偏移時策略設置,默認值無,latest/earliest/none三個值設置 # earliest 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 # latest 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 # none topic各分區都存在已提交的offset時,從offset後開始消費;只要有一個分區不存在已提交的offset,則拋出異常 spring.kafka.consumer.auto-offset-reset # 用逗號分隔的主機:端口對列表,用於創建到Kafka羣集的初始鏈接。覆蓋全局鏈接設置屬性 spring.kafka.consumer.bootstrap-servers # 在發出請求時傳遞給服務器的ID,用於服務器端日誌記錄 spring.kafka.consumer.client-id # 消費者的偏移量是否在後臺按期提交 spring.kafka.consumer.enable-auto-commit # 若是沒有足夠的數據來當即知足「fetch-min-size」的要求,則服務器在取回請求以前阻塞的最大時間量 spring.kafka.consumer.fetch-max-wait # 服務器應爲獲取請求返回的最小數據量。 spring.kafka.consumer.fetch-min-size # 標識此消費者所屬的默認消費者組的惟一字符串 spring.kafka.consumer.group-id # 消費者協調員的預期心跳間隔時間。 spring.kafka.consumer.heartbeat-interval # 用於讀取以事務方式寫入的消息的隔離級別。 spring.kafka.consumer.isolation-level # 密鑰的反序列化程序類 spring.kafka.consumer.key-deserializer # 在對poll()的單個調用中返回的最大記錄數。 spring.kafka.consumer.max-poll-records # 用於配置客戶端的其餘特定於消費者的屬性。 spring.kafka.consumer.properties.* # 密鑰存儲文件中私鑰的密碼。 spring.kafka.consumer.ssl.key-password # 密鑰存儲文件的位置。 spring.kafka.consumer.ssl.key-store-location # 密鑰存儲文件的存儲密碼。 spring.kafka.consumer.ssl.key-store-password # 密鑰存儲的類型,如JKS spring.kafka.consumer.ssl.key-store-type # 要使用的SSL協議,如TLSv1.2, TLSv1.1, TLSv1 spring.kafka.consumer.ssl.protocol # 信任存儲文件的位置。 spring.kafka.consumer.ssl.trust-store-location # 信任存儲文件的存儲密碼。 spring.kafka.consumer.ssl.trust-store-password # 信任存儲區的類型。 spring.kafka.consumer.ssl.trust-store-type # 值的反序列化程序類。 spring.kafka.consumer.value-deserializer
Spring Boot中,Kafka Listener相關配置(全部配置前綴爲spring.kafka.listener.
):
# ackMode爲「COUNT」或「COUNT_TIME」時偏移提交之間的記錄數 spring.kafka.listener.ack-count= spring.kafka.listener.ack-mode spring.kafka.listener.ack-time spring.kafka.listener.client-id spring.kafka.listener.concurrency spring.kafka.listener.idle-event-interval spring.kafka.listener.log-container-config # 若是Broker上不存在至少一個配置的主題(topic),則容器是否沒法啓動, # 該設置項結合Broker設置項allow.auto.create.topics=true,若是爲false,則會自動建立不存在的topic spring.kafka.listener.missing-topics-fatal=true # 非響應消費者的檢查間隔時間。若是未指定持續時間後綴,則將使用秒做爲單位 spring.kafka.listener.monitor-interval spring.kafka.listener.no-poll-threshold spring.kafka.listener.poll-timeout spring.kafka.listener.type
spring.kafka.admin.client-id # 若是啓動時代理不可用,是否快速失敗 spring.kafka.admin.fail-fast=false spring.kafka.admin.properties.* spring.kafka.admin.ssl.key-password spring.kafka.admin.ssl.key-store-location spring.kafka.admin.ssl.key-store-password spring.kafka.admin.ssl.key-store-type spring.kafka.admin.ssl.protocol spring.kafka.admin.ssl.trust-store-location spring.kafka.admin.ssl.trust-store-password spring.kafka.admin.ssl.trust-store-type
spring.kafka.jaas.control-flag=required spring.kafka.jaas.enabled=false spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule spring.kafka.jaas.options.*
spring.kafka.ssl.key-password spring.kafka.ssl.key-store-location spring.kafka.ssl.key-store-password spring.kafka.ssl.key-store-type spring.kafka.ssl.protocol spring.kafka.ssl.trust-store-location spring.kafka.ssl.trust-store-password spring.kafka.ssl.trust-store-type
spring.kafka.streams.application-id spring.kafka.streams.auto-startup spring.kafka.streams.bootstrap-servers spring.kafka.streams.cache-max-size-buffering spring.kafka.streams.client-id spring.kafka.streams.properties.* spring.kafka.streams.replication-factor spring.kafka.streams.ssl.key-password spring.kafka.streams.ssl.key-store-location spring.kafka.streams.ssl.key-store-password spring.kafka.streams.ssl.key-store-type spring.kafka.streams.ssl.protocol spring.kafka.streams.ssl.trust-store-location spring.kafka.streams.ssl.trust-store-password spring.kafka.streams.ssl.trust-store-type spring.kafka.streams.state-dir
同一消費組下全部消費者協同消費訂閱主題的全部分區
消費者offset管理機制
分區和消費者個數如何設置
具體怎麼調優副本、分區、消費者等這裏就不展開了,後面專門來研究這個問題。
實現下面的示例須要的環境:
Spring-kafka-test
embedded Kafka Server
Spring Boot開發環境(2.2.1)
咱們知道Kafka是Scala+Zookeeper
構建的,能夠從官方網站下載部署包並在本地部署。不過,Spring Kafka Test已經封裝了Kafka測試的帶註解的一鍵式功能,以打開Kafka服務器,從而簡化了驗證Kafka相關功能的開發過程,使用起來也很是簡單。
添加依賴:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency>
啓動服務,下面使用Junit測試用例直接啓動Kafka服務器服務,包括四個代理節點,Run as JUnit Test
。:
@RunWith(SpringRunner.class) @SpringBootTest(classes = ApplicationTests.class) @EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095}) public class ApplicationTests { @Test public void contextLoads()throws IOException { System.in.read(); } }
@EmbeddedKafka
中能夠設置相關參數:
注意:EmbeddedKafka這樣默認是沒有建立主題的。會提示
Topic(s) [test] is/are not present and missingTopicsFatal is true
錯誤。@EmbeddedKafka默認狀況是建立一個代理,該代理具備一個不帶任何參數的隨機端口,它將在啓動日誌中輸出特定端口和默認配置項。
下面實現一個簡單發佈訂閱功能,經過前端WEB調用一個API,而後在該API控制器中獲得請求後生產者開始發送消息,消費者後臺監聽消息,若是收到消費者消息,則打印出來。
添加Kafka依賴:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
配置Kafka,這裏消費者和生產者在同一應用中,咱們只須要配置Kafka Brokers的服務地址+端口:
server: port: 9000 spring: kafka: bootstrap-servers: 10.151.113.57:9092,10.151.113.57:9093,10.151.113.57:9094 listener: # 設置不監聽主題錯誤,false時,若是broker設置了llow.auto.create.topics = true,生產者發送到未建立主題時,會默認自動建立主題 # 且默認建立的主題是單副本單分區的 missing-topics-fatal: false consumer: # 配置消費者消息offset是否自動重置(消費者重連會可以接收最開始的消息) auto-offset-reset: earliest
@Service public class Producer { private static final Logger LOGGER = LogManager.getLogger(Producer.class); private static final String TOPIC = "users"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String message) { LOGGER.info(String.format("===Producing message: {}", message)); this.kafkaTemplate.send(TOPIC, message); } }
@Service public class Consumer { private static final Logger LOGGER = LogManager.getLogger(Consumer.class); @KafkaListener(topics = "test", groupId = "group_test") public void consume(String message) throws IOException { LOGGER.info(String.format("#### -> Consumed message -> %s", message)); } }
@RestController @RequestMapping(value = "/kafka") public class KafkaController { private final Producer producer; @Autowired KafkaController(Producer producer) { this.producer = producer; } @GetMapping(value = "/publish") public void sendMessageToKafkaTopic(@RequestParam("message") String message) { this.producer.sendMessage(message); } }
添加Spring Boot Application:
@SpringBootApplication public class TestKafkaApplication { public static void main(String[] args) { SpringApplication.run(TestKafkaApplication.class, args); } }
啓動Kafka Brokers後,須要手動建立主題(若是想自動建立,則須要藉助KafkaAdmin,或者是Kafka Broker設置了allow.auto.create.topics=true
且應用設置了listener.missing-topics-fatal=false
):
# 若是對kafka-topics.sh這裏不熟悉,能夠去翻看前面寫的關於Kafka的相關文章(環境搭建和測試那一篇) # 建立test主題 $ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --replication-factor 1 --partitions 2 --topic test
打開瀏覽器測試:
http://localhost:9000/kafka/publish?message=hello
則應用控制檯會打印hello
。整個發佈訂閱的實現只使用了跟Kafka相關的@KafkaListener
註解接收消息和KafkaTemplate
模板發送消息,非常簡單。
上面是簡單的經過Spring Boot依賴的Spring Kafka配置便可快速實現發佈訂閱功能,這個時候咱們是沒法在程序中操做這些配置的,所以這一小節就是利用咱們以前《Spring Boot從零入門7_最新配置文件配置及優先級詳細介紹》文章中講述的自定義配置文件方式去實現發佈訂閱功能。
實現內容有:
@KafkaListener
實現消息監聽)源碼不會直接貼,只給出主體部分。
配置文件:
@Configuration @ConfigurationProperties(prefix = "m2kc") @PropertySource("classpath:kafka.properties") @Validated public class M2KCKafkaConfig { @Value("${m2kc.kafka.bootstrap.servers}") private String kafkaBootStrapServers; @Value("${m2kc.kafka.key.serializer.class}") private String kafkaKeySerializerClass; ...... ...... }
生產者:
@Service @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class KafkaProducer { private static final Logger LOGGER = LogManager.getLogger(KafkaProducer.class); private String mTopic = "test"; private M2KCKafkaConfig mM2KCKafkaConfig; private KafkaTemplate<String, String> mKafkaTemplate; @Autowired public KafkaProducer(M2KCKafkaConfig kafkaConfig) { mTopic = kafkaConfig.getKafkaSourceTopic(); mM2KCKafkaConfig = kafkaConfig; mKafkaTemplate = getKafkaTemplate(); } public KafkaTemplate<String, String> getKafkaTemplate() { KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<String, String>(producerFactory()); return kafkaTemplate; } public ProducerFactory<String, String> producerFactory() { Map<String, Object> properties = new HashMap<String, Object>(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeySerializerClass()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueSerializerClass()); if (mM2KCKafkaConfig.isKafkaSslEnable()) { // TODO : to test properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol()); properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation()); properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword()); properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation()); properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword()); properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword()); } return new DefaultKafkaProducerFactory<String, String>(properties); } public void sendMessage(String msg) { LOGGER.info("===Producing message[{}]: {}", mTopic, msg); ListenableFuture<SendResult<String, String>> future = mKafkaTemplate.send(mTopic, msg); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { LOGGER.info("===Producing message success"); } @Override public void onFailure(Throwable ex) { LOGGER.info("===Producing message failed"); } }); } }
消費者:
@Service @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public class KafkaConsumer implements InitializingBean { private static final Logger LOGGER = LogManager.getLogger(KafkaConsumer.class); private String mTopic; private M2KCKafkaConfig mM2KCKafkaConfig; private KafkaMessageListenerContainer<String, String> mKafkaMessageListenerContainer; @Autowired public KafkaConsumer(M2KCKafkaConfig kafkaConfig) { LOGGER.info("===KafkaConsumer construct"); mTopic = kafkaConfig.getKafkaSourceTopic(); mM2KCKafkaConfig = kafkaConfig; } @PostConstruct public void start(){ LOGGER.info("===KafkaConsumer start"); } @Override public void afterPropertiesSet() throws Exception { LOGGER.info("===afterPropertiesSet is called"); createContainer(); } private void createContainer() { mKafkaMessageListenerContainer = createKafkaMessageListenerContainer(); mKafkaMessageListenerContainer.setAutoStartup(false);; mKafkaMessageListenerContainer.start(); LOGGER.info("===", mKafkaMessageListenerContainer); } private KafkaMessageListenerContainer<String, String> createKafkaMessageListenerContainer() { KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(consumerFactory(), createContainerProperties()); LOGGER.info("===createKafkaMessageListenerContainer"); return container; } private ContainerProperties createContainerProperties() { ContainerProperties containerProps = new ContainerProperties(mTopic); containerProps.setMessageListener(createMessageListener()); return containerProps; } private ConsumerFactory<String, String> consumerFactory() { Map<String, Object> properties = new HashMap<String, Object>(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, mM2KCKafkaConfig.getKafkaBootStrapServers()); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaKeyDeserializerClass()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, mM2KCKafkaConfig.getKafkaValueDeserializerClass()); properties.put(ConsumerConfig.GROUP_ID_CONFIG, mM2KCKafkaConfig.getKafkaConsumerGroupID()); if (mM2KCKafkaConfig.isKafkaSslEnable()) { // TODO : to test properties.put("security.protocol", mM2KCKafkaConfig.getKafkaSslProtocol()); properties.put("ssl.truststore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation()); properties.put("ssl.truststore.password", mM2KCKafkaConfig.getKafkaSslTrustStorePassword()); properties.put("ssl.keystore.location", mM2KCKafkaConfig.getKafkaSslStoreLocation()); properties.put("ssl.keystore.password", mM2KCKafkaConfig.getKafkaSslKeyStorePassword()); properties.put("ssl.key.password", mM2KCKafkaConfig.getKafkaSslKeyPassword()); } return new DefaultKafkaConsumerFactory<String, String>(properties); } private MessageListener<String, String> createMessageListener() { return new MessageListener<String, String>() { @Override public void onMessage(ConsumerRecord<String, String> data) { // TODO Auto-generated method stub LOGGER.info("===Consuming msg: {}", data.value()); } }; } }
繼承InitializingBean
只是爲了初始化,也能夠去掉,將初始化寫入了構造函數中。這裏的消費者和生產者都使用@Scope
,因此須要手動獲取實例,經過context去調用getBean()。另外配置文件沒有寫全,這裏須要注意。
Spring Integration也有對Kafka支持的適配器,採用Spring Integration,咱們也可以快速的實現發佈訂閱功能,且實現羣組多消費者批量消費功能:
咱們能夠先看看總體的Kafka消息傳遞通道:
具體的Demo能夠參考Github中的一個sample :
本篇文章詳細介紹了Spring Kafka的發送消息和接收消息功能,其餘包括Spring Kafka Stream的簡單介紹、Spring Kafka參數配置,以及在Spring Boot中如何經過三種方式去實現Kafka的發佈訂閱功能,涉及了Kafka的多消費者多訂閱者,SSL安全傳輸,Spring Integration Kafka等。文章很長,把握整體,結合實際,差很少基本內容都有所涉及了。
Spring Expression Language(簡稱SpEL),在Spring中,不一樣於屬性佔位符${...}
,而SpEL
表達式則要放到#{...}
中(除代碼塊中用Expression外)。如配置文件中有topics參數spring.kafka.topics
,則能夠將配置文件中參數傳入註解@KafkaListener(id = "foo", topics = "#{'${topicOne:annotated1,foo}'.split(',')}")
。
SpEL
表達式經常使用示例:
// 字面量 #{3.1415926} // 浮點數 #{9.87E4} // 科學計數法表示98700 #{'Hello'} // String 類型 #{false} // Boolean 類型 // 引用Bean、屬性和方法 #{sgtPeppers} // 使用這個bean #{sgtPeppers.artist} // 引用bean中的屬性 #{sgtPeppers.selectArtist()} // 引用bean中的方法 #{sgtPeppers.selectArtist().toUpperCase()} // 方法返回值的操做 #{sgtPeppers.selectArtist()?.toUpperCase()} // 防止selectArtist()方法返回null,?表示非null則執行toUpperCase() // 訪問類做用域的方法和常量的話,使用T()這個關鍵的運算符 #{T(java.lang.Math)} #{T(java.lang.Math).PI} // 引用PI的值 #{T(java.lang.Math).random()} // 獲取0-1的隨機數 #{T(System).currentTimeMillis()} // 獲取時間到當前的毫秒數 // 替代屬性佔位符獲取配置文件屬性值 @Value("#{表達式}" private String variable;