kafka是一個消息隊列產品,基於Topic partitions的設計,能達到很是高的消息發送處理性能。Spring建立了一個項目Spring-kafka,封裝了Apache 的Kafka-client,用於在Spring項目裏快速集成kafka。除了簡單的收發消息外,Spring-kafka還提供了不少高級功能,下面咱們就來一一探祕這些用法。java
項目地址:https://github.com/spring-projects/spring-kafkagit
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.6.RELEASE</version> </dependency>
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092github
/** * @author: kl @kailing.pub * @date: 2019/5/30 */ @SpringBootApplication @RestController public class Application { private final Logger logger = LoggerFactory.getLogger(Application.class); public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Autowired private KafkaTemplate<Object, Object> template; @GetMapping("/send/{input}") public void sendFoo(@PathVariable String input) { this.template.send("topic_input", input); } @KafkaListener(id = "webGroup", topics = "topic_input") public void listen(String input) { logger.info("input value: {}" , input); } }
啓動應用後,在瀏覽器中輸入:http://localhost:8080/send/kl。就能夠在控制檯看到有日誌輸出了:input value: "kl"。基礎的使用就這麼簡單。發送消息時注入一個KafkaTemplate,接收消息時添加一個@KafkaListener註解便可。web
不過上面的代碼可以啓動成功,前提是你已經有了Kafka Server的服務環境,咱們知道Kafka是由Scala + Zookeeper構建的,能夠從官網下載部署包在本地部署。可是,我想告訴你,爲了簡化開發環節驗證Kafka相關功能,Spring-Kafka-Test已經封裝了Kafka-test提供了註解式的一鍵開啓Kafka Server的功能,使用起來也是超級簡單。本文後面的全部測試用例的Kafka都是使用這種嵌入式服務提供的。spring
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>2.2.6.RELEASE</version> <scope>test</scope> </dependency>
下面使用Junit測試用例,直接啓動一個Kafka Server服務,包含四個Broker節點。apache
@RunWith(SpringRunner.class) @SpringBootTest(classes = ApplicationTests.class) @EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095}) public class ApplicationTests { @Test public void contextLoads()throws IOException { System.in.read(); } }
如上:只須要一個註解@EmbeddedKafka便可,就能夠啓動一個功能完整的Kafka服務,是否是很酷。默認只寫註解不加參數的狀況下,是建立一個隨機端口的Broker,在啓動的日誌中會輸出具體的端口以及默認的一些配置項。不過這些咱們在Kafka安裝包配置文件中的配置項,在註解參數中均可以配置,下面詳解下@EmbeddedKafka註解中的可設置參數 :bootstrap
Kafka是多Broker架構的高可用服務,一個Topic對應多個partition,一個Partition能夠有多個副本Replication,這些Replication副本保存在多個Broker,用於高可用。可是,雖然存在多個分區副本集,當前工做副本集卻只有一個,默認就是首次分配的副本集【首選副本】爲Leader,負責寫入和讀取數據。當咱們升級Broker或者更新Broker配置時須要重啓服務,這個時候須要將partition轉移到可用的Broker。下面涉及到三種狀況api
@EmbeddedKafka(brokerProperties = {"log.index.interval.bytes = 4096","num.io.threads = 8"})
功能同上面的brokerProperties,只是Kafka Broker的可設置參數達182個之多,都像上面這樣配置確定不是最優方案,因此提供了加載本地配置文件的功能,如:數組
@EmbeddedKafka(brokerPropertiesLocation = "classpath:application.properties")
默認狀況下,若是在使用KafkaTemplate發送消息時,Topic不存在,會建立一個新的Topic,默認的分區數和副本數爲以下Broker參數來設定瀏覽器
num.partitions = 1 #默認Topic分區數
num.replica.fetchers = 1 #默認副本數
/** * @author: kl @kailing.pub * @date: 2019/5/31 */ @Configuration public class KafkaConfig { @Bean public KafkaAdmin admin(KafkaProperties properties){ KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties()); admin.setFatalIfBrokerNotAvailable(true); return admin; } @Bean public NewTopic topic2() { return new NewTopic("topic-kl", 1, (short) 1); } }
若是Kafka Broker支持(1.0.0或更高版本),則若是發現現有Topic的Partition 數少於設置的Partition 數,則會新增新的Partition分區。關於KafkaAdmin有幾個經常使用的用法以下:
setFatalIfBrokerNotAvailable(true):默認這個值是False的,在Broker不可用時,不影響Spring 上下文的初始化。若是你以爲Broker不可用影響正常業務須要顯示的將這個值設置爲True
setAutoCreate(false) : 默認值爲True,也就是Kafka實例化後會自動建立已經實例化的NewTopic對象
initialize():當setAutoCreate爲false時,須要咱們程序顯示的調用admin的initialize()方法來初始化NewTopic對象
有時候咱們在程序啓動時並不知道某個Topic須要多少Partition數合適,可是又不能一股腦的直接使用Broker的默認設置,這個時候就須要使用Kafka-Client自帶的AdminClient來進行處理。上面的Spring封裝的KafkaAdmin也是使用的AdminClient來處理的。如:
@Autowired private KafkaProperties properties; @Test public void testCreateToipc(){ AdminClient client = AdminClient.create(properties.buildAdminProperties()); if(client !=null){ try { Collection<NewTopic> newTopics = new ArrayList<>(1); newTopics.add(new NewTopic("topic-kl",1,(short) 1)); client.createTopics(newTopics); }catch (Throwable e){ e.printStackTrace(); }finally { client.close(); } } }
上面的這些建立Topic方式前提是你的spring boot版本到2.x以上了,由於spring-kafka2.x版本只支持spring boot2.x的版本。在1.x的版本中尚未這些api。下面補充一種在程序中經過Kafka_2.10建立Topic的方式
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.2</version> </dependency>
@Test public void testCreateTopic()throws Exception{ ZkClient zkClient =new ZkClient("127.0.0.1:2181", 3000, 3000, ZKStringSerializer$.MODULE$) String topicName = "topic-kl"; int partitions = 1; int replication = 1; AdminUtils.createTopic(zkClient,topicName,partitions,replication,new Properties()); }
注意下ZkClient最後一個構造入參,是一個序列化反序列化的接口實現,博主測試若是不填的話,建立的Topic在ZK上的數據是有問題的,默認的Kafka實現也很簡單,就是作了字符串UTF-8編碼處理。ZKStringSerializer$是Kafka中已經實現好的一個接口實例,是一個Scala的伴生對象,在Java中直接調用點MODULE$就能夠獲得一個實例
@Test public void testCreateTopic(){ String [] options= new String[]{ "--create", "--zookeeper","127.0.0.1:2181", "--replication-factor", "3", "--partitions", "3", "--topic", "topic-kl" }; TopicCommand.main(options); }
template.send("","").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() { @Override public void onFailure(Throwable throwable) { ...... } @Override public void onSuccess(SendResult<Object, Object> objectObjectSendResult) { .... } });
ListenableFuture<SendResult<Object,Object>> future = template.send("topic-kl","kl"); try { SendResult<Object,Object> result = future.get(); }catch (Throwable e){ e.printStackTrace(); }
默認狀況下,Spring-kafka自動生成的KafkaTemplate實例,是不具備事務消息發送能力的。須要使用以下配置激活事務特性。事務激活後,全部的消息發送只能在發生事務的方法內執行了,否則就會拋一個沒有事務交易的異常
spring.kafka.producer.transaction-id-prefix=kafka_tx.
當發送消息有事務要求時,好比,當全部消息發送成功纔算成功,以下面的例子:假設第一條消費發送後,在發第二條消息前出現了異常,那麼第一條已經發送的消息也會回滾。並且正常狀況下,假設在消息一發送後休眠一段時間,在發送第二條消息,消費端也只有在事務方法執行完成後纔會接收到消息
@GetMapping("/send/{input}") public void sendFoo(@PathVariable String input) { template.executeInTransaction(t ->{ t.send("topic_input","kl"); if("error".equals(input)){ throw new RuntimeException("failed"); } t.send("topic_input","ckl"); return true; }); }
當事務特性激活時,一樣,在方法上面加@Transactional註解也會生效
@GetMapping("/send/{input}") @Transactional(rollbackFor = RuntimeException.class) public void sendFoo(@PathVariable String input) { template.send("topic_input", "kl"); if ("error".equals(input)) { throw new RuntimeException("failed"); } template.send("topic_input", "ckl"); }
Spring-Kafka的事務消息是基於Kafka提供的事務消息功能的。而Kafka Broker默認的配置針對的三個或以上Broker高可用服務而設置的。這邊在測試的時候爲了簡單方便,使用了嵌入式服務新建了一個單Broker的Kafka服務,出現了一些問題:如
一、事務日誌副本集大於Broker數量,會拋以下異常:
Number of alive brokers '1' does not meet the required replication factor '3' for the transactions state topic (configured via 'transaction.state.log.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet.
默認Broker的配置transaction.state.log.replication.factor=3,單節點只能調整爲1
二、副本數小於副本同步隊列數目,會拋以下異常
Number of insync replicas for partition __transaction_state-13 is [1], below required minimum [2]
默認Broker的配置transaction.state.log.min.isr=2,單節點只能調整爲1
ReplyingKafkaTemplate是KafkaTemplate的一個子類,除了繼承父類的方法,新增了一個方法sendAndReceive,實現了消息發送\回覆語義
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
也就是我發送一條消息,可以拿到消費者給我返回的結果。就像傳統的RPC交互那樣。當消息的發送者須要知道消息消費者的具體的消費狀況,很是適合這個api。如,一條消息中發送一批數據,須要知道消費者成功處理了哪些數據。下面代碼演示了怎麼集成以及使用ReplyingKafkaTemplate
/** * @author: kl @kailing.pub * @date: 2019/5/30 */ @SpringBootApplication @RestController public class Application { private final Logger logger = LoggerFactory.getLogger(Application.class); public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Bean public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) { ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer("replies"); repliesContainer.getContainerProperties().setGroupId("repliesGroup"); repliesContainer.setAutoStartup(false); return repliesContainer; } @Bean public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> pf, ConcurrentMessageListenerContainer<String, String> repliesContainer) { return new ReplyingKafkaTemplate(pf, repliesContainer); } @Bean public KafkaTemplate kafkaTemplate(ProducerFactory<String, String> pf) { return new KafkaTemplate(pf); } @Autowired private ReplyingKafkaTemplate template; @GetMapping("/send/{input}") @Transactional(rollbackFor = RuntimeException.class) public void sendFoo(@PathVariable String input) throws Exception { ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input); RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record); ConsumerRecord<String, String> consumerRecord = replyFuture.get(); System.err.println("Return value: " + consumerRecord.value()); } @KafkaListener(id = "webGroup", topics = "topic-kl") @SendTo public String listen(String input) { logger.info("input value: {}", input); return "successful"; } }
前面在簡單集成中已經演示過了@KafkaListener接收消息的能力,可是@KafkaListener的功能不止如此,其餘的比較常見的,使用場景比較多的功能點以下:
@KafkaListener(id = "webGroup", topicPartitions = { @TopicPartition(topic = "topic1", partitions = {"0", "1"}), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) },concurrency = "6",errorHandler = "myErrorHandler") public String listen(String input) { logger.info("input value: {}", input); return "successful"; }
其餘的註解參數都很好理解,errorHandler須要說明下,設置這個參數須要實現一個接口KafkaListenerErrorHandler。並且註解裏的配置,是你自定義實現實例在spring上下文中的Name。好比,上面配置爲errorHandler = "myErrorHandler"。則在spring上線中應該存在這樣一個實例:
/** * @author: kl @kailing.pub * @date: 2019/5/31 */ @Service("myErrorHandler") public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler { Logger logger =LoggerFactory.getLogger(getClass()); @Override public Object handleError(Message<?> message, ListenerExecutionFailedException exception) { logger.info(message.getPayload().toString()); return null; } @Override public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) { logger.info(message.getPayload().toString()); return null; } }
手動ACK模式,由業務邏輯控制提交偏移量。好比程序在消費時,有這種語義,特別異常狀況下不確認ack,也就是不提交偏移量,那麼你只能使用手動Ack模式來作了。開啓手動首先須要關閉自動提交,而後設置下consumer的消費模式
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual
上面的設置好後,在消費時,只須要在@KafkaListener監聽方法的入參加入Acknowledgment 便可,執行到ack.acknowledge()表明提交了偏移量
@KafkaListener(id = "webGroup", topics = "topic-kl") public String listen(String input, Acknowledgment ack) { logger.info("input value: {}", input); if ("kl".equals(input)) { ack.acknowledge(); } return "successful"; }
@KafkaListener註解的監聽器的生命週期是能夠控制的,默認狀況下,@KafkaListener的參數autoStartup = "true"。也就是自動啓動消費,可是也能夠同過KafkaListenerEndpointRegistry來干預他的生命週期。KafkaListenerEndpointRegistry有三個動做方法分別如:start(),pause(),resume()/啓動,中止,繼續。以下代碼詳細演示了這種功能。
/** * @author: kl @kailing.pub * @date: 2019/5/30 */ @SpringBootApplication @RestController public class Application { private final Logger logger = LoggerFactory.getLogger(Application.class); public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Autowired private KafkaTemplate template; @GetMapping("/send/{input}") @Transactional(rollbackFor = RuntimeException.class) public void sendFoo(@PathVariable String input) throws Exception { ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input); template.send(record); } @Autowired private KafkaListenerEndpointRegistry registry; @GetMapping("/stop/{listenerID}") public void stop(@PathVariable String listenerID){ registry.getListenerContainer(listenerID).pause(); } @GetMapping("/resume/{listenerID}") public void resume(@PathVariable String listenerID){ registry.getListenerContainer(listenerID).resume(); } @GetMapping("/start/{listenerID}") public void start(@PathVariable String listenerID){ registry.getListenerContainer(listenerID).start(); } @KafkaListener(id = "webGroup", topics = "topic-kl",autoStartup = "false") public String listen(String input) { logger.info("input value: {}", input); return "successful"; } }
在上面的代碼中,listenerID就是@KafkaListener中的id值「webGroup」。項目啓動好後,分別執行以下url,就能夠看到效果了。
先發送一條消息:http://localhost:8081/send/ckl。由於autoStartup = "false",因此並不會看到有消息進入監聽器。
接着啓動監聽器:http://localhost:8081/start/webGroup。能夠看到有一條消息進來了。
暫停和繼續消費的效果使用相似方法就能夠測試出來了。
前面的消息發送響應應用裏面已經見過@SendTo,其實除了作發送響應語義外,@SendTo註解還能夠帶一個參數,指定轉發的Topic隊列。常見的場景如,一個消息須要作多重加工,不一樣的加工耗費的cup等資源不一致,那麼就能夠經過跨不一樣Topic和部署在不一樣主機上的consumer來解決了。如:
@KafkaListener(id = "webGroup", topics = "topic-kl") @SendTo("topic-ckl") public String listen(String input) { logger.info("input value: {}", input); return input + "hello!"; } @KafkaListener(id = "webGroup1", topics = "topic-ckl") public void listen2(String input) { logger.info("input value: {}", input); }
除了上面談到的經過手動Ack模式來控制消息偏移量外,其實Spring-kafka內部還封裝了可重試消費消息的語義,也就是能夠設置爲當消費數據出現異常時,重試這個消息。並且能夠設置重試達到多少次後,讓消息進入預約好的Topic。也就是死信隊列裏。下面代碼演示了這種效果:
@Autowired private KafkaTemplate template; @Bean public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<Object, Object> template) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); //最大重試三次 factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3)); return factory; } @GetMapping("/send/{input}") public void sendFoo(@PathVariable String input) { template.send("topic-kl", input); } @KafkaListener(id = "webGroup", topics = "topic-kl") public String listen(String input) { logger.info("input value: {}", input); throw new RuntimeException("dlt"); } @KafkaListener(id = "dltGroup", topics = "topic-kl.DLT") public void dltListen(String input) { logger.info("Received from DLT: " + input); }
上面應用,在topic-kl監聽到消息會,會觸發運行時異常,而後監聽器會嘗試三次調用,當到達最大的重試次數後。消息就會被丟掉重試死信隊列裏面去。死信隊列的Topic的規則是,業務Topic名字+「.DLT」。如上面業務Topic的name爲「topic-kl」,那麼對應的死信隊列的Topic就是「topic-kl.DLT」
最近業務上使用了kafka用到了Spring-kafka,因此係統性的探索了下Spring-kafka的各類用法,發現了不少好玩很酷的特性,好比,一個註解開啓嵌入式的Kafka服務、像RPC調用同樣的發送\響應語義調用、事務消息等功能。但願此博文可以幫助那些正在使用Spring-kafka或即將使用的人少走一些彎路少踩一點坑。