接下來是《如何在您的Spring啓動應用程序中使用Apache Kafka》「Spring和Kafka」如何在您的Spring啓動應用程序中使用Kafka ,這展現瞭如何開始使用Spring啓動和Apache Kafka®,這裏咱們將更深刻地挖掘Apache Kafka項目的Spring提供的一些附加功能。spring
Apache Kafka的Spring爲Kafka帶來了熟悉的Spring編程模型。它提供了用於發佈記錄的KafkaTemplate和用於異步執行POJO偵聽器的偵聽器容器。Spring引導自動配置鏈接了許多基礎設施,所以您能夠將精力集中在業務邏輯上。編程
考慮一下這個簡單的POJO監聽器方法:json
@KafkaListener(id = "fooGroup", topics = "topic1")app
public void listen(String in) {curl
logger.info("Received: " + in);異步
if (in.startsWith("foo")) {ide
throw new RuntimeException("failed");this
}url
}spa
默認狀況下,失敗的記錄會被簡單地記錄下來,而後咱們繼續下一個。可是,咱們能夠在偵聽器容器中配置一個錯誤處理程序來執行一些其餘操做。爲此,咱們用咱們本身的來覆蓋Spring Boot的自動配置容器工廠:
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler()); // <<<<<<
return factory;
}
注意,咱們仍然能夠利用大部分的自動配置。
SeekToCurrentErrorHandler丟棄輪詢()中的剩餘記錄,並在使用者上執行查找操做來重置偏移量,以便在下一次輪詢時再次獲取被丟棄的記錄。默認狀況下,錯誤處理程序跟蹤失敗的記錄,在10次提交嘗試後放棄,並記錄失敗的記錄。可是,咱們也能夠將失敗的消息發送到另外一個主題。咱們稱這是一個毫無心義的話題。
下面的例子把這一切放在一塊兒:
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(template), 3));
return factory;
}
@KafkaListener(id = "fooGroup", topics = "topic1")
public void listen(String in) {
logger.info("Received: " + in);
if (in.startsWith("foo")) {
throw new RuntimeException("failed");
}
}
@KafkaListener(id = "dltGroup", topics = "topic1.DLT")
public void dltListen(String in) {
logger.info("Received from DLT: " + in);
}
可是,在Spring得到記錄以前發生的反序列化異常又如何呢?進入ErrorHandlingDeserializer。此反序列化器包裝委託反序列化器並捕獲任何異常。而後將它們轉發給偵聽器容器,後者將它們直接發送給錯誤處理程序。異常包含源數據,所以能夠診斷問題。
考慮下面的例子:
@Bean
public RecordMessageConverter converter() {
return new StringJsonMessageConverter();
}
@KafkaListener(id = "fooGroup", topics = "topic1")
public void listen(Foo2 foo) {
logger.info("Received: " + foo);
if (foo.getFoo().startsWith("fail")) {
throw new RuntimeException("failed");
}
}
@KafkaListener(id = "dltGroup", topics = "topic1.DLT")
public void dltListen(Foo2 in) {
logger.info("Received from DLT: " + in);
}
注意,咱們如今正在使用類型爲Foo2的對象。消息轉換器bean推斷要轉換爲方法簽名中的參數類型的類型。
轉換器自動「信任」類型。Spring Boot自動將轉換器配置到偵聽器容器中。
在生產者方面,發送的對象能夠是一個不一樣的類(只要它的類型兼容):
@RestController
public class Controller {
@Autowired
private KafkaTemplate<Object, Object> template;
@PostMapping(path = "/send/foo/{what}")
public void sendFoo(@PathVariable String what) {
this.template.send("topic1", new Foo1(what));
}
}
和:
spring:
kafka:
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
$ curl -X POST http://localhost:8080/send/foo/fail
這裏,咱們在消費者端使用StringDeserializer和「智能」消息轉換器。
咱們還可使用單個偵聽器容器,並根據類型路由到特定的方法。此次咱們不能推斷類型,由於類型是用來選擇要調用的方法的。
相反,咱們依賴於在記錄頭中傳遞的類型信息來將源類型映射到目標類型。此外,因爲咱們沒有推斷類型,因此須要將消息轉換器配置爲「信任」映射類型的包。
在本例中,咱們將在兩端使用消息轉換器(以及StringSerializer和StringDeserializer)。下面是消費者端轉換器的例子:
@Bean
public RecordMessageConverter converter() {
StringJsonMessageConverter converter = new StringJsonMessageConverter();
DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
typeMapper.setTypePrecedence(TypePrecedence.TYPE_ID);
typeMapper.addTrustedPackages("com.common");
Map<String, Class<?>> mappings = new HashMap<>();
mappings.put("foo", Foo2.class);
mappings.put("bar", Bar2.class);
typeMapper.setIdClassMapping(mappings);
converter.setTypeMapper(typeMapper);
return converter;
}
在這裏,咱們從「foo」映射到類Foo2,從「bar」映射到類Bar2。注意,咱們必須告訴它使用TYPE_ID頭來肯定轉換的類型。一樣,Spring Boot會自動將消息轉換器配置到容器中。下面是應用程序片斷中的生產端類型映射。yml文件;格式是一個逗號分隔的令牌列表:FQCN:
spring:
kafka:
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring.json.type.mapping: foo:com.common.Foo1,bar:com.common.Bar1
這個配置將類Foo1映射到「foo」,將類Bar1映射到「bar」。
監聽器:
@Component
@KafkaListener(id = "multiGroup", topics = { "foos", "bars" })
public class MultiMethods {
@KafkaHandler
public void foo(Foo1 foo) {
System.out.println("Received: " + foo);
}
@KafkaHandler
public void bar(Bar bar) {
System.out.println("Received: " + bar);
}
@KafkaHandler(isDefault = true)
public void unknown(Object object) {
System.out.println("Received unknown: " + object);
}
}
生產者:
@RestController
public class Controller {
@Autowired
private KafkaTemplate<Object, Object> template;
@PostMapping(path = "/send/foo/{what}")
public void sendFoo(@PathVariable String what) {
this.template.send(new GenericMessage<>(new Foo1(what),
Collections.singletonMap(KafkaHeaders.TOPIC, "foos")));
}
@PostMapping(path = "/send/bar/{what}")
public void sendBar(@PathVariable String what) {
this.template.send(new GenericMessage<>(new Bar(what),
Collections.singletonMap(KafkaHeaders.TOPIC, "bars")));
}
@PostMapping(path = "/send/unknown/{what}")
public void sendUnknown(@PathVariable String what) {
this.template.send(new GenericMessage<>(what,
Collections.singletonMap(KafkaHeaders.TOPIC, "bars")));
}
}
經過在應用程序中設置transactional-id前綴來啓用事務。yml文件:
spring:
kafka:
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
transaction-id-prefix: tx.
consumer:
properties:
isolation.level: read_committed
當使用spring-kafka 1.3時。x或更高版本和支持事務的kafka-clients版本(0.11或更高版本),在@KafkaListener方法中執行的任何KafkaTemplate操做都將參與事務,而偵聽器容器將在提交事務以前向事務發送偏移量。請注意,咱們還爲使用者設置了隔離級別,使其沒法看到未提交的記錄。下面的例子暫停監聽器,這樣咱們能夠看到效果:
@KafkaListener(id = "fooGroup2", topics = "topic2")
public void listen(List foos) throws IOException {
logger.info("Received: " + foos);
foos.forEach(f -> kafkaTemplate.send("topic3", f.getFoo().toUpperCase()));
logger.info("Messages sent, hit enter to commit tx");
System.in.read();
}
@KafkaListener(id = "fooGroup3", topics = "topic3")
public void listen(String in) {
logger.info("Received: " + in);
}
本例中的生產者在一個事務中發送多條記錄:
@PostMapping(path = "/send/foos/{what}")
public void sendFoo(@PathVariable String what) {
this.template.executeInTransaction(kafkaTemplate -> {
StringUtils.commaDelimitedListToSet(what).stream()
.map(s -> new Foo1(s))
.forEach(foo -> kafkaTemplate.send("topic2", foo));
return null;
});
}
curl -X POST http://localhost:8080/send/foos/a,b,c,d,e
Received: [Foo2 [foo=a], Foo2 [foo=b], Foo2 [foo=c], Foo2 [foo=d], Foo2 [foo=e]]
Messages sent, hit Enter to commit tx
Received: [A, B, C, D, E]
在Apache Kafka中使用Spring能夠消除不少樣板代碼。它還增長了諸如錯誤處理、重試和記錄篩選等功能——而咱們只是觸及了表面。