「Spring和Kafka」Kafka整合Spring 深刻挖掘第2部分:Kafka和Spring

在這個博客系列的第1部分以後,Apache Kafka的Spring——第1部分:錯誤處理、消息轉換和事務支持「Spring和Kafka」Kafka整合Spring 深刻挖掘 -第1部分,在這裏的第2部分中,咱們將關注另外一個加強開發者在Kafka上構建流應用程序時體驗的項目:Spring Cloud Stream。spring

咱們將在這篇文章中討論如下內容:編程

  • Spring雲流及其編程模型概述數組

  • Apache Kafka®集成在Spring雲流安全

  • Spring Cloud Stream如何讓Kafka開發人員更輕鬆地開發應用程序服務器

  • 使用Kafka流和Spring雲流進行流處理app

讓咱們首先看看什麼是Spring Cloud Stream,以及它如何與Apache Kafka一塊兒工做。框架

什麼是Spring Cloud Stream?

Spring Cloud Stream是一個框架,它容許應用程序開發人員編寫消息驅動的微服務。這是經過使用Spring Boot提供的基礎來實現的,同時還支持其餘Spring組合項目(如Spring Integration、Spring Cloud函數和Project Reactor)公開的編程模型和範例。它支持使用描述輸入和輸出組件的類型安全編程模型編寫應用程序。應用程序的常見示例包括源(生產者)、接收(消費者)和處理器(生產者和消費者)。ide

典型的Spring cloud stream 應用程序包括用於通訊的輸入和輸出組件。這些輸入和輸出被映射到Kafka主題。Spring cloud stream應用程序能夠接收來自Kafka主題的輸入數據,它能夠選擇生成另外一個Kafka主題的輸出。這些與Kafka鏈接接收器和源不一樣。有關各類Spring Cloud流開箱即用應用程序的更多信息,請訪問項目頁面。函數

消息傳遞系統和Spring cloud stream之間的橋樑是經過綁定器抽象實現的。綁定器適用於多個消息傳遞系統,但最經常使用的綁定器之一適用於Apache Kafka。微服務

Kafka綁定器擴展了Spring Boot、Apache Kafka的Spring和Spring集成的堅實基礎。因爲綁定器是一個抽象,因此其餘消息傳遞系統也有可用的實現。

Spring Cloud Stream支持發佈/訂閱語義、消費者組和本機分區,並儘量將這些職責委派給消息傳遞系統。對於Kafka綁定器,這些概念在內部映射並委託給Kafka,由於Kafka自己就支持它們。當消息傳遞系統自己不支持這些概念時,Spring Cloud Stream將它們做爲核心特性提供。

如下是綁定器抽象如何與輸入和輸出工做的圖示:

圖片

使用Spring Cloud Stream建立Kafka應用程序

Spring Initializr是使用Spring Cloud Stream建立新應用程序的最佳場所。這篇博文介紹瞭如何在Spring啓動應用程序中使用Apache Kafka,涵蓋了從Spring Initializr建立應用程序所需的全部步驟。對於Spring Cloud Stream,唯一的區別是您須要「Cloud Stream」和「Kafka」做爲組件。如下是你須要選擇的一個例子:

initializr包含開發流應用程序所需的全部依賴項。經過使用Initializr,您還能夠選擇構建工具(如Maven或Gradle)和目標JVM語言(如Java或Kotlin)。

該構建將生成一個可以做爲獨立應用程序(例如,從命令行)運行的uber JAR。

Apache Kafka的Spring cloud stream編程模型

Spring Cloud Stream提供了一個編程模型,支持與Apache Kafka的即時鏈接。應用程序須要在其類路徑中包含Kafka綁定,並添加一個名爲@EnableBinding的註釋,該註釋將Kafka主題綁定到它的輸入或輸出(或二者)。

Spring Cloud Stream提供了三個與@EnableBinding綁定的方便接口:Source(單個輸出)、Sink(單個輸入)和Processor(單個輸入和輸出)。它還能夠擴展到具備多個輸入和輸出的自定義接口。

下面的代碼片斷展現了Spring Cloud Stream的基本編程模型:

@SpringBootApplication
@EnableBinding(Processor.class)
public class UppercaseProcessor {

  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public String process(String s) {
     return s.toUpperCase();
  }
}

在這個應用程序中,注意這個方法是用@StreamListener註釋的,它是由Spring Cloud Stream提供的,用於接收來自Kafka主題的消息。一樣的方法也使用SendTo進行註釋,SendTo是將消息發送到輸出目的地的方便註釋。這是一個Spring雲流處理器應用程序,它使用來自輸入的消息並將消息生成到輸出。

在前面的代碼中沒有提到Kafka主題。此時可能出現的一個天然問題是,「這個應用程序如何與Kafka通訊?」答案是:入站和出站主題是經過使用Spring Boot支持的許多配置選項之一來配置的。在本例中,咱們使用一個名爲application的YAML配置文件。yml,它是默認搜索的。下面是輸入和輸出目的地的配置:

spring.cloud.stream.bindings:
  input:
    destination: topic1
  output:
    destination: topic2

 

Spring Cloud Stream將輸入映射到topic1,將輸出映射到topic2。這是一組很是少的配置,可是可使用更多的選項來進一步定製應用程序。默認狀況下,主題是用單個分區建立的,可是能夠由應用程序覆蓋。更多信息請參考這些文檔。

最重要的是,開發人員能夠簡單地專一於編寫核心業務邏輯,讓Spring Cloud Stream和Spring Boot來處理基礎設施問題(好比鏈接到Kafka、配置和調優應用程序等等)。

下面的例子展現了另外一個簡單的應用程序(消費者):

@SpringBootApplication
@EnableBinding(Sink.class)
public class LoggingConsumerApplication {

  @StreamListener(Sink.INPUT)
  public void handle(Person person) {
     System.out.println("Received: " + person);
  }

  public static class Person {
     private String name;
     public String getName() {
        return name;
     }
     public void setName(String name) {
        this.name = name;
     }
     public String toString() {
        return this.name;
     }
  }
}

 

注意,@EnableBinding提供了一個接收器,這代表這是一個消費者。與前一個應用程序的一個主要區別是,使用@StreamListener註釋的方法將一個名爲Person的POJO做爲參數,而不是字符串。來自Kafka主題的消息是如何轉換成這個POJO的?Spring Cloud Stream提供了自動的內容類型轉換。默認狀況下,它使用application/JSON做爲內容類型,但也支持其餘內容類型。您能夠經過使用屬性spring.cloud.stream.binding .input來提供內容類型。而後將其設置爲適當的內容類型,如application/Avro。

適當的消息轉換器由Spring Cloud Stream根據這個配置來選擇。若是應用程序但願使用Kafka提供的本地序列化和反序列化,而不是使用Spring Cloud Stream提供的消息轉換器,那麼能夠設置如下屬性。

序列化:

spring.cloud.stream.bindings.output.useNativeEncoding=true

反序列化:

spring.cloud.stream.bindings.input.useNativeDecoding=true

Auto-provisioning of topic

Apache Kafka綁定器提供了一個在啓動時配置主題的配置程序。若是在代理上啓用了主題建立,Spring Cloud Stream應用程序能夠在應用程序啓動時建立和配置Kafka主題。

例如,能夠向供應者提供分區和其餘主題級配置。這些定製能夠在綁定器級別進行,綁定器級別將應用於應用程序中使用的全部主題,也能夠在單獨的生產者和消費者級別進行。這很是方便,特別是在應用程序的開發和測試期間。有許多關於如何爲多個分區配置主題的示例。

支持使用者組和分區

可使用Spring Cloud Stream配置衆所周知的屬性,如用戶組和分區。消費者組能夠經過屬性設置:

spring.cloud.stream.bindings.input.group =組名稱

如前所述,在內部,這個組將被翻譯成Kafka的消費者組。

在編寫生產者應用程序時,Spring Cloud Stream提供了將數據發送到特定分區的選項。一樣,在內部,框架將這些職責委託給Kafka。

對於使用者,若是禁用自動再平衡(這是一個須要覆蓋的簡單配置屬性),則特定的應用程序實例能夠限制爲使用來自一組特定分區的消息。有關詳細信息,請參閱這些配置選項。

綁定可視化和控制

經過使用Spring Boot的致動器機制,咱們如今可以控制Spring cloud stream中的各個綁定。

在運行時,可使用執行器端點來中止、暫停、恢復等,執行器端點是Spring Boot的機制,用於在將應用程序推向生產環境時監視和管理應用程序。該特性使用戶可以對應用程序處理來自Kafka的數據的方式有更多的控制。若是應用程序因綁定而暫停,那麼來自該特定主題的處理記錄將暫停,直到恢復。

Spring Cloud Stream還集成了Micrometer,以啓用更豐富的指標、發出混亂的速率並提供其餘與監視相關的功能。這些系統能夠與許多其餘監測系統進一步集成。Kafka綁定器提供了擴展的度量功能,爲主題的消費者滯後提供了額外的看法。

Spring Boot經過一個特殊的健康情況端點提供應用程序健康情況檢查。Kafka綁定器提供了一個健康指示器的特殊實現,它考慮到代理的鏈接性,並檢查全部的分區是否都是健康的。若是發現任何分區沒有leader,或者代理沒法鏈接,那麼health check將報告相應的狀態。

Kafka流在Spring cloud stream中的支持概述

在編寫流處理應用程序時,Spring Cloud stream提供了另外一個專門用於Kafka流的綁定器。與常規的Kafka綁定器同樣,Kafka Streams綁定器也關注開發人員的生產力,所以開發人員能夠專一於爲KStream、KTable、GlobalKTable等編寫業務邏輯,而不是編寫基礎結構代碼。綁定器負責鏈接到Kafka,以及建立、配置和維護流和主題。例如,若是應用程序方法具備KStream簽名,則綁定器將鏈接到目標主題,並在後臺從該主題生成流。應用程序開發人員沒必要顯式地這樣作,由於綁定器已經爲應用程序提供了綁定。

其餘類型(如KTable和GlobalKTable)也是如此。底層的KafkaStreams對象由綁定器提供,用於依賴注入,所以,應用程序不直接維護它。更確切地說,它是由春天的雲流爲你作的。

要使用Spring Cloud Stream開始Kafka流,請轉到Spring Initializr並選擇以下圖所示的選項,以生成一個應用程序,該應用程序帶有使用Spring Cloud Stream編寫Kafka流應用程序的依賴項:

上面的例子展現了一個用Spring Cloud Stream編寫的Kafka Streams應用程序:

@SpringBootApplication
public class KafkaStreamsTableJoin {

  @EnableBinding(StreamTableProcessor.class)
  public static class KStreamToTableJoinApplication {

     @StreamListener
     @SendTo("output")
     public KStream<String, Long> process(@Input("input1") KStream<String, Long> userClicksStream,
                                 @Input("input2") KTable<String, String> userRegionsTable) {

        return userClicksStream
              .leftJoin(userRegionsTable,
                    (clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
              .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
              .groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
              .reduce((firstClicks, secondClicks) -> firstClicks + secondClicks)
              .toStream();
     }
  }

  interface StreamTableProcessor {

     @Input("input1")
     KStream inputStream();

     @Output("output")
     KStreamoutputStream();

     @Input("input2")
     KTable inputTable();
  }
}

在前面的代碼中有幾件事情須要注意。在@StreamListener方法中,沒有用於設置Kafka流組件的代碼。應用程序不須要構建流拓撲,以便將KStream或KTable與Kafka主題關聯起來,啓動和中止流,等等。全部這些機制都是由Kafka流的Spring Cloud Stream binder處理的。在調用該方法時,已經建立了一個KStream和一個KTable供應用程序使用。

應用程序建立一個名爲StreamTableProcessor的自定義接口,該接口指定用於輸入和輸出綁定的Kafka流類型。此接口與@EnableBinding一塊兒使用。此接口的使用方式與咱們在前面的處理器和接收器接口示例中使用的方式相同。與常規的Kafka綁定器相似,Kafka上的目的地也是經過使用Spring雲流屬性指定的。您能夠爲前面的應用程序提供這些配置選項來建立必要的流和表:

spring.cloud.stream.bindings.input1.destination=userClicksTopic spring.cloud.stream.bindings.input2.destination=userRegionsTopic spring.cloud-stream.bindings.output.destination=userClickRegionsTopic

咱們使用兩個Kafka主題來建立傳入流:一個用於將消息消費爲KStream,另外一個用於消費爲KTable。框架根據自定義接口StreamTableProcessor中提供的綁定適當地使用所需的類型。而後,這些類型將與方法簽名配對,以便在應用程序代碼中使用。在出站時,出站的KStream被髮送到輸出Kafka主題。

Kafka流中可查詢的狀態存儲支持

Kafka流爲編寫有狀態應用程序提供了第一類原語。當使用Spring Cloud Stream和Kafka流構建有狀態應用程序時,就有可能使用RESTful應用程序從RocksDB的持久狀態存儲中提取信息。下面是一個Spring REST應用程序的例子,它依賴於Kafka流中的狀態存儲:

@RestController
public class FooController {

  private final Log logger = LogFactory.getLog(getClass());

  @Autowired
  private InteractiveQueryService interactiveQueryService;

@RequestMapping("/song/id")
public SongBean song(@RequestParam(value="id") Long id) {

     final ReadOnlyKeyValueStore<Long, Song> songStore =
           interactiveQueryService.getQueryableStore(「STORE-NAME」,
 QueryableStoreTypes.<Long, Song>keyValueStore());

     final Song song = songStore.get(id);
     if (song == null) {
        throw new IllegalArgumentException("Song not found.");
     }
     return new SongBean(song.getArtist(), song.getAlbum(), song.getName());
  }
}

 

InteractiveQueryService是Apache Kafka Streams綁定器提供的一個API,應用程序可使用它從狀態存儲中檢索數據。應用程序可使用此服務按名稱查詢狀態存儲,而不是直接經過底層流基礎設施訪問狀態存儲。當Kafka Streams應用程序的多個實例運行時,該服務還提供了用戶友好的方式來訪問服務器主機信息,這些實例之間有分區。

一般在這種狀況下,應用程序必須經過直接訪問Kafka Streams API來找到密鑰所在的分區所在的主機。InteractiveQueryService提供了這些API方法的包裝器。一旦應用程序得到了對狀態存儲的訪問權,它就能夠經過查詢來造成進一步的看法。最終,能夠經過上面所示的REST端點來提供這些看法。您能夠在GitHub上找到一個使用Spring Cloud Stream編寫的Kafka Streams應用程序的示例,在這個示例中,它使用本節中提到的特性來適應Kafka音樂示例。

Branching in Kafka Streams

經過使用SendTo註釋,能夠在Spring Cloud流中原生地使用Kafka流的分支特性。

@StreamListener("input")
@SendTo({「englishTopic」, 「frenchTopic」, 「spanishTopic」})
public KStream<?, WordCount>[] process(KStream<Object, String> input) {

  Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
  Predicate<Object, WordCount> isFrench =  (k, v) -> v.word.equals("french");
  Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

  return input
        .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
        .groupBy((key, value) -> value)
        .windowedBy(timeWindows)
        .count(Materialized.as("WordCounts-1"))
        .toStream()
        .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))
        .branch(isEnglish, isFrench, isSpanish);
}

注意,SendTo註釋有三個不一樣輸出的綁定,方法自己返回一個KStream[]。Spring Cloud Stream在內部將分支發送到輸出綁定到的Kafka主題。觀察SendTo註釋中指定的輸出順序。這些輸出綁定將與輸出的KStream[]按其在數組中的順序配對。

數組的第一個索引中的第一個KStream能夠映射到englishTopic,而後將下一個映射到frenchTopic,以此類推。這裏的想法是,應用程序能夠專一於功能方面的事情,並使用Spring Cloud Stream設置全部這些輸出流,不然開發人員將不得不爲每一個流單獨作這些工做。

Spring cloud stream中的錯誤處理

Spring Cloud Stream提供了錯誤處理機制來處理失敗的消息。它們能夠被髮送到死信隊列(DLQ),這是Spring Cloud Stream建立的一個特殊的Kafka主題。當失敗的記錄被髮送到DLQ時,頭信息被添加到記錄中,其中包含關於失敗的更多信息,如異常堆棧跟蹤、消息等。

發送到DLQ是可選的,框架提供各類配置選項來定製它。

對於Spring Cloud Stream中的Kafka Streams應用程序,錯誤處理主要集中在反序列化錯誤上。Apache Kafka Streams綁定器提供了使用Kafka Streams提供的反序列化處理程序的能力。它還提供了在主流繼續處理時將失敗的記錄發送到DLQ的能力。當應用程序須要返回來訪問錯誤記錄時,這是很是有用的。

模式演化和Confluent 模式註冊

Spring Cloud Stream支持模式演化,它提供了與Confluent模式註冊中心以及Spring Cloud Stream提供的本地模式註冊中心服務器一塊兒工做的功能。應用程序經過在應用程序級別上包含@EnableSchemaRegistryClient註釋來啓用模式註冊表。Spring Cloud Stream提供了各類基於Avro的消息轉換器,能夠方便地與模式演化一塊兒使用。在使用Confluent模式註冊表時,Spring Cloud Stream提供了一個應用程序須要做爲SchemaRegistryClient bean提供的特殊客戶端實現(ConfluentSchemaRegistryClient)。

結論

Spring Cloud Stream經過自動處理其餘同等重要的非功能需求(如供應、自動內容轉換、錯誤處理、配置管理、用戶組、分區、監視、健康檢查等),使應用程序開發人員更容易關注業務邏輯,從而提升了使用Apache Kafka的生產率。

相關文章
相關標籤/搜索