Spring爲Kafka帶來了熟悉的Spring編程模型。它提供了KafkaTemplate用於發佈記錄和用於異步執行POJO偵聽器的偵聽器容器。Spring Boot自動配置鏈接了大部分基礎架構,所以您能夠專一於業務邏輯。程序員
考慮這個簡單的POJO偵聽器方法:spring
@KafkaListener(id = "fooGroup", topics = "topic1") public void listen(String in) { logger.info("Received: " + in); if (in.startsWith("foo")) { throw new RuntimeException("failed"); } }
默認狀況下,只記錄失敗的記錄,而後咱們繼續下一個記錄。可是,咱們能夠在偵聽器容器中配置錯誤處理程序以執行其餘操做。爲此,咱們使用咱們本身的方法覆蓋Spring Boot的自動配置容器工廠:docker
@Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); factory.setErrorHandler(new SeekToCurrentErrorHandler()); // <<<<<< return factory; }
請注意,咱們仍然能夠利用大部分自動配置。編程
SeekToCurrentErrorHandler丟棄來自poll()剩下的記錄,並執行seek操做實現消費者操做偏移offset復位,使丟棄記錄在下一次輪詢再取出。默認狀況下,錯誤處理程序會跟蹤失敗的記錄,在10次傳遞嘗試後放棄並記錄失敗的記錄。可是,咱們也能夠將失敗的消息發送到另外一個主題。咱們稱之爲死信主題。json
下面是合在一塊兒代碼:架構
@Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<Object, Object> template) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); factory.setErrorHandler(new SeekToCurrentErrorHandler( new DeadLetterPublishingRecoverer(template), 3)); return factory; } @KafkaListener(id = "fooGroup", topics = "topic1") public void listen(String in) { logger.info("Received: " + in); if (in.startsWith("foo")) { throw new RuntimeException("failed"); } } @KafkaListener(id = "dltGroup", topics = "topic1.DLT") public void dltListen(String in) { logger.info("Received from DLT: " + in); }
可是,在Spring得到記錄以前發生的反序列化異常呢?使用ErrorHandlingDeserializer。此反序列化器包裝委託反序列化器並捕獲任何異常。而後將它們轉發到偵聽器容器,該容器將它們直接發送到錯誤處理程序。該異常包含源數據,所以您能夠診斷問題。app
請考慮如下示例:curl
@Bean public RecordMessageConverter converter() { return new StringJsonMessageConverter(); } @KafkaListener(id = "fooGroup", topics = "topic1") public void listen(Foo2 foo) { logger.info("Received: " + foo); if (foo.getFoo().startsWith("fail")) { throw new RuntimeException("failed"); } } @KafkaListener(id = "dltGroup", topics = "topic1.DLT") public void dltListen(Foo2 in) { logger.info("Received from DLT: " + in); }
請注意,咱們如今正在使用類型的對象Foo2。消息轉換器bean推斷要轉換爲方法簽名中的參數類型的類型。轉換器自動「信任」該類型。Spring Boot自動將轉換器配置到偵聽器容器中。異步
在生產者方面,發送的對象能夠是不一樣的類(只要它是類型兼容的):學習
@RestController public class Controller { @Autowired private KafkaTemplate<Object, Object> template; @PostMapping(path = "/send/foo/{what}") public void sendFoo(@PathVariable String what) { this.template.send("topic1", new Foo1(what)); } }
配置:
spring: kafka: producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer $ curl -X POST http://localhost:8080/send/foo/fail
在這裏,咱們使用StringDeserializer,並在消費者端使用一個「智能」消息轉換器。
咱們還可使用單個偵聽器容器並根據類型路由到特定方法。因爲有多個方法,類型須要選擇要調用的方法,所以這裏咱們就沒法推斷類型了。
相反,咱們依賴於記錄頭中傳遞的類型信息來從源類型映射到目標類型。此外,因爲咱們不推斷類型,咱們須要配置消息轉換器以「信任」包的映射類型。
在這種狀況下,咱們將在兩側使用消息轉換器( StringSerializer和StringDeserializer 一塊兒使用)。如下消費者側轉換器示例將它們放在一塊兒:
@Bean public RecordMessageConverter converter() { StringJsonMessageConverter converter = new StringJsonMessageConverter(); DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper(); typeMapper.setTypePrecedence(TypePrecedence.TYPE_ID); typeMapper.addTrustedPackages("com.common"); Map<String, Class<?>> mappings = new HashMap<>(); mappings.put("foo", Foo2.class); mappings.put("bar", Bar2.class); typeMapper.setIdClassMapping(mappings); converter.setTypeMapper(typeMapper); return converter; }
在這裏,咱們將「foo」映射到類Foo2,將「bar」 映射到類Bar2。請注意,咱們必須告訴它使用TYPE_ID標頭來肯定轉換的類型。一樣,Spring Boot會自動將消息轉換器配置到容器中。下面是application.yml文件片斷中的生產者端類型映射; 格式是以冒號分隔的token:FQCN列表:
spring: kafka: producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer properties: spring.json.type.mapping: foo:com.common.Foo1,bar:com.common.Bar1
此配置將類映射Foo1到「foo」,將類映射Bar1到「bar」。
監聽器:
@Component @KafkaListener(id = "multiGroup", topics = { "foos", "bars" }) public class MultiMethods { @KafkaHandler public void foo(Foo1 foo) { System.out.println("Received: " + foo); } @KafkaHandler public void bar(Bar bar) { System.out.println("Received: " + bar); } @KafkaHandler(isDefault = true) public void unknown(Object object) { System.out.println("Received unknown: " + object); } }
生產者:
@RestController public class Controller { @Autowired private KafkaTemplate<Object, Object> template; @PostMapping(path = "/send/foo/{what}") public void sendFoo(@PathVariable String what) { this.template.send(new GenericMessage<>(new Foo1(what), Collections.singletonMap(KafkaHeaders.TOPIC, "foos"))); } @PostMapping(path = "/send/bar/{what}") public void sendBar(@PathVariable String what) { this.template.send(new GenericMessage<>(new Bar(what), Collections.singletonMap(KafkaHeaders.TOPIC, "bars"))); } @PostMapping(path = "/send/unknown/{what}") public void sendUnknown(@PathVariable String what) { this.template.send(new GenericMessage<>(what, Collections.singletonMap(KafkaHeaders.TOPIC, "bars"))); } }
事務
經過在application.yml文件中設置transactional-id-prefix來啓用事務:
spring: kafka: producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer transaction-id-prefix: tx. consumer: properties: isolation.level: read_committed
當使用spring-kafka 1.3.x或更高版本以及支持事務的kafka-clients版本(0.11或更高版本)時,方法中KafkaTemplate執行的任何操做@KafkaListener都將參與事務,而且偵聽器容器將在提交以前將偏移發送到事務它。認識到咱們還爲消費者設置了隔離級別,使其沒法查看未提交的記錄。如下示例暫停偵聽器,以便咱們能夠看到此效果:
@KafkaListener(id = "fooGroup2", topics = "topic2") public void listen(List foos) throws IOException { logger.info("Received: " + foos); foos.forEach(f -> kafkaTemplate.send("topic3", f.getFoo().toUpperCase())); logger.info("Messages sent, hit enter to commit tx"); System.in.read(); } @KafkaListener(id = "fooGroup3", topics = "topic3") public void listen(String in) { logger.info("Received: " + in); }
此示例的生產者在單個事務中發送多個記錄:
@PostMapping(path = "/send/foos/{what}") public void sendFoo(@PathVariable String what) { this.template.executeInTransaction(kafkaTemplate -> { StringUtils.commaDelimitedListToSet(what).stream() .map(s -> new Foo1(s)) .forEach(foo -> kafkaTemplate.send("topic2", foo)); return null; }); } curl -X POST http://localhost:8080/send/foos/a,b,c,d,e Received: [Foo2 [foo=a], Foo2 [foo=b], Foo2 [foo=c], Foo2 [foo=d], Foo2 [foo=e]] Messages sent, hit Enter to commit tx Received: [A, B, C, D, E]
針對Java程序員,給你們推薦兩大專題,不止是Spring Boot和Apache Kafka,更是包含了Mybatis、spring cloud、spring MVC、spring boot、spring五、IOC源碼、AOP源碼、docker、dubbo等架構技術。同時我在在網上收集了一些關於以上技術的學習資料和視頻,但願可以幫助到你們
怎麼獲取→加入粉絲羣963944895
,點擊加入羣聊,私信管理員便可