你們提到Kafka時第一印象就是它是一個快速的異步消息處理系統,不一樣於一般tomcat之類應用服務器和前端之間的請求/響應方式請求,客戶端發出一個請求,必然會等到一個響應,這種方式對Kafka來講並不天然,Kafka是一種事件驅動方式,事件激活而後響應,這種方式對不少人接受起來不方便,爲了實現請求 - 響應模型,開發人員必須在消息的生產者記錄中構建相關ID系統,並將其與消息的消費者記錄中的ID進行匹配,找到那個請求ID再使用Kafka的一個隊列進行回覆。前端
下圖是本案例的演示架構圖,這個案例是以同步行爲返回兩個數字總和的結果。spring
客戶端 --->請求---> RESTcontroll ---> Spring replying kafka 模板 -->Kafka的請求主題 -->Spring Kafka監聽器 | | |<----- 響應 <----RESTcontroll <-- Spring replying kafka 模板 <-- Kafka的響應主題<---------|
下面咱們開始看看開發這個演示步驟:sql
設置Springboot啓動類bootstrap
首先須要在pom.xml引入Spring kafka模板:tomcat
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
代碼以下:服務器
@SpringBootApplication public class RequestReplyKafkaApplication { public static void main(String[] args) { SpringApplication.run(RequestReplyKafkaApplication.class, args); } }
設置Spring ReplyingKafkaTemplate架構
咱們須要在Springboot配置類的KafkaConfig對Spring kafka模板進行配置:併發
@Configuration public class KafkaConfig {
在這個配置類中,咱們須要配置核心的ReplyingKafkaTemplate類,這個類繼承了 KafkaTemplate 提供請求/響應的的行爲;還有一個生產者工廠(參見 ProducerFactory 下面的代碼)和 KafkaMessageListenerContainer。這是最基本的設置,由於請求響應模型須要對應到消息生產者和消費者的行爲。app
// 這是核心的ReplyingKafkaTemplate @Bean public ReplyingKafkaTemplate<String, Model, Model> replyKafkaTemplate(ProducerFactory<String, Model> pf, KafkaMessageListenerContainer<String, Model> container) { return new ReplyingKafkaTemplate<>(pf, container); } // 配件:監聽器容器Listener Container to be set up in ReplyingKafkaTemplate @Bean public KafkaMessageListenerContainer<String, Model> replyContainer(ConsumerFactory<String, Model> cf) { ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic); return new KafkaMessageListenerContainer<>(cf, containerProperties); } // 配件:生產者工廠Default Producer Factory to be used in ReplyingKafkaTemplate @Bean public ProducerFactory<String,Model> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } // 配件:kafka生產者的Kafka配置Standard KafkaProducer settings - specifying brokerand serializer @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return props; }
設置spring-Kafka的監聽器異步
這與一般建立的Kafka消費者相同。惟一的變化是額外是在工廠中設置ReplyTemplate,這是必須的,由於消費者須要將計算結果放入到Kafka的響應主題。
//消費者工廠 Default Consumer Factory @Bean public ConsumerFactory<String, Model> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs(),new StringDeserializer(),new JsonDeserializer<>(Model.class)); } // 併發監聽器容器Concurrent Listner container factory @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Model>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Model> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); // NOTE - set up of reply template 設置響應模板 factory.setReplyTemplate(kafkaTemplate()); return factory; } // Standard KafkaTemplate @Bean public KafkaTemplate<String, Model> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); }
編寫咱們的kafka消費者
這是過去建立的Kafka消費者同樣。惟一的變化是附加了@SendTo註釋,此註釋用於在響應主題上返回業務結果。
@KafkaListener(topics = "${kafka.topic.request-topic}") @SendTo public Model listen(Model request) throws InterruptedException { int sum = request.getFirstNumber() + request.getSecondNumber(); request.setAdditionalProperty("sum", sum); return request; }
這個消費者用於業務計算,把客戶端經過請求傳入的兩個數字進行相加,而後返回這個請求,經過@SendTo發送到Kafka的響應主題。
總結服務
如今,讓咱們將全部這些都結合在一塊兒放在RESTcontroller,步驟分爲幾步,先建立生產者記錄,並在記錄頭部中設置接受響應的Kafka主題,這樣
把請求和響應在Kafka那裏對應起來,而後經過模板發佈消息到Kafka,再經過future.get()堵塞等待Kafka的響應主題發送響應結果過來。這時再
打印結果記錄中的頭部信息,會看到Spring自動生成相關ID。
@ResponseBody @PostMapping(value="/sum",produces=MediaType.APPLICATION_JSON_VALUE,consumes=MediaType.APPLICATION_JSON_VALUE) public Model sum(@RequestBody Model request)throws InterruptedException,ExecutionException { //建立生產者記錄 ProducerRecord<String,Model> record = new ProducerRecord<String,Model>(requestTopic,request); //在記錄頭部中設置響應主題 record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes())); //發佈到kafka主題中 RequestReplyFuture<String, Model, Model> sendAndReceive = kafkaTemplate.sendAndReceive(record); //確認生產者是否成功生產 SendResult<String, Model> sendResult = sendAndReceive.getSendFuture().get(); //打印結果記錄中全部頭部信息 會看到Spring自動生成的相關ID,這個ID是由消費端@SendTo 註釋返回的值。 sendResult.getProducerRecord().headers().forEach(header -> System.out.println(header.key() + ":" + header.value().toString())); //獲取消費者記錄 ConsumerRecord<String, Model> consumerRecord = sendAndReceive.get(); //返回消費者結果 return consumerRecord.value(); }
併發消費者
即便你要建立請求主題在三個分區中,三個併發的消費者的響應仍然合併到一個Kafka響應主題,這樣,Spring偵聽器的容器可以完成匹配相關ID的繁重工做。
整個請求/響應的模型是一致的。
如今咱們能夠再修改啓動類以下:
@ComponentScan(basePackages = { "com.gauravg.config", "com.gauravg.consumer", "com.gauravg.controller", "com.gauravg.model" }) @SpringBootApplication public class RequestReplyKafkaApplication { public static void main(String[] args) { SpringApplication.run(RequestReplyKafkaApplication.class, args); } }
post數據:
{ "firstNumber": "111", "secondNumber": "2222" }
返回結果是:
{ "firstNumber": 111, "secondNumber": 2222, "sum": 2333 }
在控制檯輸出記錄頭部信息:
kafka_replyTopic:[B@1f59b198 kafka_correlationId:[B@356a7326 __TypeId__:[B@1a9111f
可見,Spring自動生成聚合ID(correlationId),無需咱們本身手工比對了。
歡迎工做一到五年的Java工程師朋友們加入Java架構開發: 855835163 羣內提供免費的Java架構學習資料(裏面有高可用、高併發、高性能及分佈式、Jvm性能調優、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用本身每一分每一秒的時間來學習提高本身,不要再用"沒有時間「來掩飾本身思想上的懶惰!趁年輕,使勁拼,給將來的本身一個交代!