寫在前面:2020年面試必備的Java後端進階面試題總結了一份複習指南在Github上,內容詳細,圖文並茂,有須要學習的朋友能夠Star一下!
GitHub地址:https://github.com/abel-max/Java-Study-Note/tree/master前端
前言
在當今軟件開發中 Cloud Native 隨着企業功能的日益完善逐漸被愈來愈多的公司所接受。這一變更每每不只是技術選型的改變,而更可能是在開發、運維,項目管理理念上的變化。好比以前咱們在開發企業軟件的時候會將先後端寫在一個大型應用中,在團隊組織上分爲開發,運維,測試團隊,架構師,且用 Waterfall 的方式管理整個項目交付。但在 Cloud Native 文化中這些已經被一組新的方法和技術棧(或者說是一種新的文化)所替代,好比:DevOps、持續交付(CI/CD)、微服務(MicroServices)、全棧開發、敏捷開發,領域驅動開發 (Domain Driven Development), 測試驅動開發 (Test Driven Development), Event Sourcing 等等。再配合上人工智能在業務中的嵌入使得咱們的應用在商業上產生了更大的價值。java
今天介紹的 Micronaut 屬於微服務,後端技術。目前本人看好它之後大機率替代 Springboot 成爲 JVM 語言中最流行的服務端框架!mysql
本文將從基礎的概念開始介紹 Mircornaut,並集成 Kafka Producer, Kafka Streams 和 GraphQL 寫一個簡單的後端 Web 應用,來展現如何利用它們完成 Event Sourcing 系統中數據歷史存儲和更新當前 View 功能的。讀者能夠以這個爲小模板做爲基礎將其擴展成本身 Event Sourcing 系統中的一個 Micronaut MicroService 後端。react
1 爲何使用 Micronaut?
常常有人問我,爲何個人文章都是圍繞 NLP,Kafka,GraphQL, MicroService 展開的?個人理由很簡單,這些技術的組合是我我的認爲開發企業級微服務軟件目前最好的技術搭配,其強大致如今以下方面:git
惋惜以 Java 爲首的企業級框架在實際中一般都是小快靈的反例,好比 Springboot 應該說是 Java 生態圈最近幾年最流行的框架,且 Spring Cloud 在這個基礎上又對雲服務作了進一步的擴展。雖然對比以前的 Spring 下降了很大複雜度,但 Springboot 底層仍是多少延續了 Spring 框架過於冗餘的弱點,即使作成微服務,但因爲它內部反射機制的存在,使 build 時間、內存佔用等指標與 Go,Node.js 相比顯得笨重了不少。程序員
與 Springboot(還內置了 Tomcat)不一樣,Micronaut 不但只是選擇性提供了構建微服務應用程序所需的工具,同時針對啓動速度和內存開銷等方面作了進一步的優化。且在設計上延續了 Spring 依賴注入序等優良傳統,還去掉了諸多 Spring 中冗餘模塊、耗內存的反射機制等,使應用的開發、測試、部署、運維變得更加高效、簡潔。github
本文會以 Springboot 程序員的視角,從零開始設計一個基於 Event Sourcing 的完整微服務項目。面試
2. 命令行工具配置
Micronaut 提供了強大的命令行工具來幫助咱們創建項目,我我的使用的是 MacOS 系統,但其餘 Unix like 的系統操做應該都是相似的。redis
首先下載安裝 SDKMAN算法
$ curl -s https://get.sdkman.io | bash $ source "$HOME/.sdkman/bin/sdkman-init.sh"
檢查下是否配置成功:
$ sdk ==== BROADCAST ================================================================= * 2020-04-16: Jbang 0.22.0.2 released on SDKMAN! See https://github.com/maxandersen/jbang/releases/tag/v0.22.0.2 #jbang * 2020-04-15: Gradle 6.4-rc-1 released on SDKMAN! #gradle * 2020-04-15: Kotlin 1.3.72 released on SDKMAN! #kotlin ================================================================================ Usage: sdk <command> [candidate] [version] sdk offline <enable|disable> commands: install or i <candidate> [version] [local-path] uninstall or rm <candidate> <version> list or ls [candidate] use or u <candidate> <version> default or d <candidate> [version] current or c [candidate] upgrade or ug [candidate] version or v broadcast or b help or h offline [enable|disable] selfupdate [force] update flush <broadcast|archives|temp> candidate : the SDK to install: groovy, scala, grails, gradle, kotlin, etc. use list command for comprehensive list of candidates eg: $ sdk list version : where optional, defaults to latest stable if not provided eg: $ sdk install groovy local-path : optional path to an existing local installation eg: $ sdk install groovy 2.4.13-local /opt/groovy-2.4.13
安裝 Micronaut CLI
$ sdk install micronaut Downloading: micronaut 1.3.4 In progress... ########################################################################################################################################################################### 100,0%########################################################################################################################################################################### 100,0% Installing: micronaut 1.3.4 Done installing! Setting micronaut 1.3.4 as default.
3 建立新項目
下面使用 Micronaut 命令行工具建立 Kotlin,Kafka Client,Kafka Streams 和 GraphQL 集成的項目。
$ mn create-app micronaut-kafka-graphql -f kafka-streams,graphql,kafka -l kotlin
-f: 表示須要添加的依賴
-l: 語言設置
更詳細的關於 create-app 選項能夠經過輸入 mn 後在交互式命令行下使用 help API 查看:
mn> help create-app Usage: mn create-app [-hinvVx] [-b=BUILD-TOOL] [-l=LANG] [-p=PROFILE] [-f=FEATURE[,FEATURE...]]... [NAME] Creates an application [NAME] The name of the application to create. -b, --build=BUILD-TOOL Which build tool to configure. Possible values: gradle, maven. -f, --features=FEATURE[,FEATURE...] The features to use. Possible values: annotation-api, application, asciidoctor, aws-api-gateway, aws-api-gateway-graal, cassandra, config-consul, data-hibernate-jpa, data-jdbc, discovery-consul, discovery-eureka, ehcache, elasticsearch, file-watch, flyway, graal-native-image, graphql, hazelcast, hibernate-gorm, hibernate-jpa, http-client, http-server, jdbc-dbcp, jdbc-hikari, jdbc-tomcat, jib, jrebel, junit, kafka, kafka-streams, kotlintest, kubernetes, liquibase, log4j2, logback, management, micrometer, micrometer-appoptics, micrometer-atlas, micrometer-azure-monitor, micrometer-cloudwatch, micrometer-datadog, micrometer-dynatrace, micrometer-elastic, micrometer-ganglia, micrometer-graphite, micrometer-humio, micrometer-influx, micrometer-jmx, micrometer-kairos, micrometer-new-relic, micrometer-prometheus, micrometer-signalfx, micrometer-stackdriver, micrometer-statsd, micrometer-wavefront, mongo-gorm, mongo-reactive, neo4j-bolt, neo4j-gorm, netflix-archaius, netflix-hystrix, netflix-ribbon, picocli, postgres-reactive, rabbitmq, redis-lettuce, security-jwt, security-session, spek, spock, springloaded, swagger-groovy, swagger-java, swagger-kotlin, tracing-jaeger, tracing-zipkin, vertx-mysql-client, vertx-pg-client -h, --help Show this help message and exit. -i, --inplace Create a service using the current directory -l, --lang=LANG Which language to use. Possible values: java, groovy, kotlin. -n, --plain-output Use plain text instead of ANSI colors and styles. -p, --profile=PROFILE The profile to use. Possible values: base, cli, configuration, federation, function, function-aws, function-aws-alexa, grpc, kafka, profile, rabbitmq, service. -v, --verbose Create verbose output. -V, --version Print version information and exit. -x, --stacktrace Show full stack trace when exceptions occur.
項目建立成功以後在 Intellij 中打開:
圖1:Micronaut 項目初始化截圖
如圖1 所示,命令行工具成功爲咱們生成了項目及 Gradle 配置,並且提供了部署應用的 Dockerfile 模板。
4 實現一個簡易的 Event Sourcing 系統
在該項目設計的 Event Sourcing 系統中,Kafka 是最核心的組成部分,它具有着良好的高吞吐、高容錯、分佈式水平擴展能力,並且在此基礎上保證數據零丟失。在系統設計上 Kafka 能夠被看做成一箇中央 Event Bus,一切業務邏輯的異步操做、微服務間的通信都經過它中轉。
爲了方便作功能上的展現,我在這個項目中會着重實現幾個業務中最多見的場景:
實現對前端服務提供 GraphQL Mutation 接口,這個接口負責接收寫操做給後端
實現 Kafka Producer 用於將從 GraphQL 接口得到的數據寫入 Kafka Broker
實現 Kafka Streams 將已經寫入 Kafka 的數據作進行進一步 Transformation 操做,且經過 Kafka Streams 內置的 RocksDB 對當前數據更新,並提供查詢的 View
實現 GraphQL Query 經過訪問 View 獲取最新的數據,並將結果返回給前端
假設,咱們的需求是收集市場變化的信息:一方面咱們但願在後臺存儲過去全部市場的歷史變化,用於數據分析、統計;另外一方面咱們須要給前端查詢只返回最新的市場信息。
咱們簡單定義一個 Kotlin 數據類,等同於 Java 的 Pojo 類來表示市場數據模型(在項目下創建 model 目錄,並創建一個 Market 的數據類):
data class Market ( var marketId: String, val currentStatus: String, val country: String, val zipcode: String, val timestamp: Long = System.currentTimeMillis() )
4.1 GraphQL Mutation 增添數據接口定義
爲了能使前端框架與咱們後臺創建聯繫,首先咱們須要一個接口,該接口負責處理前端寫數據的請求。對大數據,數據倉庫有經驗的同窗都知道,在海量數據中咱們是不會作行級 Update 操做的。全部寫入的數據都會以 Append 的方式寫入 Kafka,Kafka 的本質其實也就是分佈式的日誌文件系統,因此這些在 Event Sourcing 系統中增、刪、查、改的操做體如今 GraphQL 或者 HTTP 層面上均可以歸結成 POST 請求。在 GraphQL 中咱們把除了查找以外的接口都定義成 Mutation。
首先咱們在 model 文件夾下 定義一個類映射前端的 Request 輸入,命名爲:
MarketInput: data class MarketInput ( val marketId: String, val currentStatus: String, val country: String, val zipcode: String )
而後配置 Micronaut GraphQL 接口。在項目 resources 文件夾下創建 schema.graphqls 文件,並定義接口:
type Market { marketId: ID! currentStatus: String! country: String! zipcode: String! timestamp: Long! } input MarketInput { marketId: ID! currentStatus: String! country: String! zipcode: String! } type Mutation { createMarket(marketInput: MarketInput): Market } type Query { allMarkets: [Market] } schema { mutation: Mutation query: Query }
這兩個接口分別爲:
createMarket 方法從前端接收 MarketInput 對象,並根據這個輸入生成 Market 對象(給 MarketInput 打時間戳 -> 設計簡單是爲了方便項目展現,實際意義不大)、存入後端
allMarkets 方法返回當前全部市場信息(因此是個 Market 列表)
其餘部分都是標準的 GraphQL 配置語法,這裏不作詳細解釋。
4.2 實現 Kafka Producer
在咱們配置 GraphQL 接口前,須要實現一個 Service 來完成寫數據進 Kafka 的操做。在項目下創建 service 文件夾,並建立類 CreateMarketService,具體實現以下:
package micronaut.kafka.graphql.service import io.micronaut.configuration.kafka.annotation.KafkaClient import io.micronaut.configuration.kafka.annotation.KafkaKey import io.micronaut.configuration.kafka.annotation.Topic import micronaut.kafka.graphql.model.Market @KafkaClient interface CreateMarketService { @Topic("markt-event-store") fun createMarket(@KafkaKey id: String, market: Market) }
Micronaut 中 被@KafkaClient 修飾的接口在運行時會自動生成相應的 Producer 實現,@Topic 定義了接收 Producer 數據的 Kafka Topic。若是在方法參數中使用 @KafkaKey 修飾,那麼這個參數被做爲 Topic 的 Key對待。若是不作特殊配置 Micronaut 默認使用 JSON 序列化 Pojo 對象。
4.3 配置 GraphQL Mutation 接口
在有了接口的定義和 Service 的實現後, 咱們如今要作的就是把這兩個部分銜接起來。
在項目目錄下創建新目錄 graphql,在這個目錄下添加兩個類:
工廠類 GraphQLFactory 用於註冊全部 GraphQL 接口須要的 Query 和 Mutation:
package micronaut.kafka.graphql.graphql import graphql.GraphQL import graphql.schema.idl.RuntimeWiring import graphql.schema.idl.SchemaGenerator import graphql.schema.idl.SchemaParser import graphql.schema.idl.TypeDefinitionRegistry import io.micronaut.context.annotation.Bean import io.micronaut.context.annotation.Factory import io.micronaut.core.io.ResourceResolver import java.io.BufferedReader import java.io.InputStreamReader import javax.inject.Singleton @SuppressWarnings("Duplicates") @Factory class GraphQLFactory { @Bean @Singleton fun graphQL(resourceResolver: ResourceResolver, createMarketDataFetcher: CreateMarketDataFetcher): GraphQL { val schemaParser = SchemaParser() val schemaGenerator = SchemaGenerator() val typeRegistry = TypeDefinitionRegistry() typeRegistry.merge(schemaParser.parse(BufferedReader(InputStreamReader( resourceResolver.getResourceAsStream("classpath:schema.graphqls").get())))) val runtimeWiring = RuntimeWiring.newRuntimeWiring() .type("Mutation") { typeWiring -> typeWiring .dataFetcher("createMarket", createMarketDataFetcher) } .build() val graphQLSchema = schemaGenerator.makeExecutableSchema(typeRegistry, runtimeWiring) return GraphQL.newGraphQL(graphQLSchema).build() } }
如代碼所示,咱們提供了這個工廠類,並提供了 Mutation 接口,且註冊了 GraphQL 的方法 createMarket。
該類須要被 Factory 修飾, graphQl 方法被 Singleton 和 Bean 修飾,這樣保證了在整個應用上只註冊了惟一的一個 GraphQL 工廠。
在 graphql 文件夾下建立 MarketDataFetcher 實現類 CreateMarketDataFetcher:
package micronaut.kafka.graphql.graphql import com.fasterxml.jackson.databind.ObjectMapper import graphql.schema.DataFetcher import graphql.schema.DataFetchingEnvironment import micronaut.kafka.graphql.model.Market import micronaut.kafka.graphql.model.MarketInput import micronaut.kafka.graphql.service.CreateMarketService import micronaut.kafka.graphql.service.CurrentMarketStore import javax.inject.Singleton @Singleton @SuppressWarnings("Duplicates") class CreateMarketDataFetcher(private val createMarketService: CreateMarketService, private val objectMapper: ObjectMapper) : DataFetcher<Market> { override fun get(env: DataFetchingEnvironment): Market { val marketInput = objectMapper.convertValue(env.getArgument("marketInput"), MarketInput::class.java) val market = Market( marketId = marketInput.marketId, currentStatus = marketInput.currentStatus, country = marketInput.country, zipcode = marketInput.zipcode ) createMarketService.createMarket(id = market.marketId, market = market) return market } }
這個類必須繼承 DataFetcher 且一樣被 Singleton 修飾,且自動裝配了咱們以前定義的 CreateMarketService 和 ObjectMapper(用於序列,反序列化 JSON 數據)。
爲了能讓這個類經過工廠註冊 GraphQL 接口,咱們須要本身實現 get 方法。由於這個類生成的對象是爲 Mutation 中的 createMarket(inputMarket: InputMarket) 提供實現的,因此咱們須要先獲取 inputMarket 這個參數。DataFetchingEnvironment 提供了方便的方式,經過 getArgument 方法咱們便可得到這個傳進來的參數。若是是 primitive 數據類型的參數,那它能夠直接解析出來,但若是像咱們的狀況,傳入參數是一個數據對象,用 getArgument 會獲得一個 JSON 格式的字符串,類自動裝載的 ObjectMapper 就是方便咱們用來解析它的,語法見代碼。
在得到了 InputMarket 的對象後,咱們就能夠經過這一輸入信息建立 Market 對象,並經過 createMarketService(以前實現的 Kafka Producer)寫入數據進 Kafka Topic,並將返回 market 對象給前端做爲 Response。
4.4 配置 Kafka Streams
Kafka 之因此能成爲當今最強大的中間件,很大一部分緣由是出於它對各類流式處理提供了豐富的流式處理 API 好比 Kafka Streams ,利用這些 API 咱們能夠輕鬆地完成對實時數據各類變換,Join,窗口操做,聚合函數等複雜操做,也能夠用 Kafka Streams 自帶的 RocksDB 經過 Statestore 對前端查詢提供 View。
在 service 文件夾下創建 MarketStream 類,具體實現以下:
package micronaut.kafka.graphql.service import com.fasterxml.jackson.databind.ObjectMapper import io.micronaut.configuration.kafka.serde.JsonSerde import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder import io.micronaut.context.annotation.Factory import micronaut.kafka.graphql.model.Market import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.Serdes import org.apache.kafka.streams.StreamsConfig import org.apache.kafka.streams.kstream.* import org.apache.kafka.streams.state.Stores import javax.inject.Named import javax.inject.Singleton const val MARKET_EVENT_TOPIC = "market-event-store" const val CURRENT_MARKET_STORE = "current-market-store" const val MARKET_APP_ID = "market-stream" @Factory class MarketStream { @Singleton @Named(MARKET_APP_ID) fun buildMarketStream(builder: ConfiguredStreamBuilder, objectMapper: ObjectMapper): KStream<String, Market>? { val marketStore = Stores.inMemoryKeyValueStore(CURRENT_MARKET_STORE) builder.configuration[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = StreamsConfig.EXACTLY_ONCE builder.configuration[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" builder.configuration[ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG] = true val stream = builder.stream(MARKET_EVENT_TOPIC, Consumed.with(Serdes.String(), JsonSerde(objectMapper, Market::class.java))) stream.groupBy( { _, value -> value.country }, Grouped.with( Serdes.String(), JsonSerde(objectMapper, Market::class.java))) .reduce( {value1, value2 -> if(value1.timestamp <= value2.timestamp) { return@reduce value2 } else { return@reduce value1 } }, Materialized .`as`<String, Market>(marketStore) .withKeySerde(Serdes.String()) .withValueSerde(JsonSerde(objectMapper, Market::class.java)) ) .toStream() .print(Printed.toSysOut()) return stream } }
Micronaut Kafka Streams 的實現與 native 的 Streams API 如出一轍,只是用 Factory(返回 KStream 對象)修飾了一下。
這段代碼大概實現的目的是,當我獲得相同 marketId 的對象後,經過對比 timestamp 的大小,判斷哪條數據是最新的,而且將最新數據儲存在 Statestore 中(爲了代碼方便我只往 store 中寫和更新數據,不包括刪除指定數據)。
聚合操做後能夠經過 Materialized 將聚合操做結果寫入 Statestore 中。
4.5 提供 Statestore 訪問
Streams 只是對 Statestore 進行寫的操做,咱們還須要一個接口用來將寫進的數據讀出來。因此繼續在 service 文件夾下,創建一個新類 CurrentMarketStore,實現以下:
package micronaut.kafka.graphql.service import io.micronaut.configuration.kafka.streams.InteractiveQueryService import micronaut.kafka.graphql.model.Market import org.apache.kafka.streams.state.QueryableStoreTypes import javax.inject.Singleton @Singleton class CurrentMarketStore(private val interactiveQueryService: InteractiveQueryService) { fun getAllMarkets(): List<Market> { val marketStore = interactiveQueryService .getQueryableStore(CURRENT_MARKET_STORE, QueryableStoreTypes.keyValueStore<String, Market>()) return marketStore .map { kvStore -> kvStore.all().asSequence().map { v -> v.value }.toList() } .orElse( emptyList<Market>()) } }
這個類下的方法將 Statestore 中全部的數據都以 List<Market> 形式返回,因爲邏輯實在太簡單,就很少講了,具體見 API 描述。
4.6 建立 GraphQL Query 讀取數據
在 schema.graphqls 文件中添加一個新的 Query 方法:
type Query { allMarkets: [Market] } 在 graphql 下的 MarketDataFetchers 中,添加新類: @Singleton @SuppressWarnings("Duplicates") class AllMarketDataFetcher(private val currentMarketStore: CurrentMarketStore) : DataFetcher<List<Market>> { override fun get(env: DataFetchingEnvironment): List<Market> { return currentMarketStore.getAllMarkets() } }
這個類與以前的 Mutation 很像,不一樣的只是此次不須要傳遞參數,且自動裝配上面講過的 CurrentMarketStore,經過調用 getAllMarkets() 方法得到所有 Statestore 中的 Market 對象。
最後再將這個新的 DataFetcher 的對象註冊在 GraphQL Factory 中。
這個類最終是這樣的:
package micronaut.kafka.graphql.graphql import graphql.GraphQL import graphql.schema.idl.RuntimeWiring import graphql.schema.idl.SchemaGenerator import graphql.schema.idl.SchemaParser import graphql.schema.idl.TypeDefinitionRegistry import io.micronaut.context.annotation.Bean import io.micronaut.context.annotation.Factory import io.micronaut.core.io.ResourceResolver import java.io.BufferedReader import java.io.InputStreamReader import javax.inject.Singleton @SuppressWarnings("Duplicates") @Factory class GraphQLFactory { @Bean @Singleton fun graphQL(resourceResolver: ResourceResolver, createMarketDataFetcher: CreateMarketDataFetcher, allMarketDataFetcher: AllMarketDataFetcher ): GraphQL { val schemaParser = SchemaParser() val schemaGenerator = SchemaGenerator() val typeRegistry = TypeDefinitionRegistry() typeRegistry.merge(schemaParser.parse(BufferedReader(InputStreamReader( resourceResolver.getResourceAsStream("classpath:schema.graphqls").get())))) val runtimeWiring = RuntimeWiring.newRuntimeWiring() .type("Mutation") { typeWiring -> typeWiring .dataFetcher("createMarket", createMarketDataFetcher) } .type("Query") { typeWiring -> typeWiring .dataFetcher("allMarkets", allMarketDataFetcher) } .build() val graphQLSchema = schemaGenerator.makeExecutableSchema(typeRegistry, runtimeWiring) return GraphQL.newGraphQL(graphQLSchema).build() } }
如代碼所示, 新的 Query 方法 allMarkets 如今也被註冊在 GraphQL 的對象上,這樣咱們的程序的代碼部分就算完成了。
5 完整流程展現
爲了看下效果,咱們將整個 Event Sourcing 流程從數據輸入,Kafka 存儲、數據狀態更新, 到讀出 完整的操做一遍。在本地運行代碼時,除了咱們寫好的程序外,還須要安裝一個本地的 Kafka Cluster。關於本地 Kafka 安裝,最簡單的途徑是從 Confluent(Apache Kafka 的商業版)官網拉一個 Docker 鏡像下來。若是不習慣用 Docker 也能夠本身手動配置一個,具體見官方文檔。
假設你配置好了本地了 Confluent,首先啓動 Zookeeper 和 Kafka Broker(不包括 Confluent 其餘服務):
$ confluent start kafka
在 Zookeeper 和 Kafka 啓動以後,須要咱們先手動建立一個 」market-event-store「 Topic 用於儲存歷史數據(建立新的 Topic 也能夠用 Kafka AdminClient 完成,但生產環境中我我的不推薦這種作法),因爲咱們只有一個節點因此將 replication-factor 設爲 1:
$ kafka-topics --create --zookeeper localhost:2181 --topic market-event-store --partitions 10 --replication-factor 1 GraphQL 自帶 Web 工具,能夠在 application.yml 文件中添加: graphql: graphiql: enabled: true
而後用 gradle run 啓動後臺程序(你會驚訝地發現,啓動變得多麼快!)。
後臺啓動好以後能夠經過 http://localhost:8080/graphiql 寫 GraphQL 語句與後臺交互。咱們先從 mutation 開始,建立一個新的 Market 對象:
mutation { createMarket(marketInput: { marketId: "id-1", currentStatus: "closed", country: "china", zipcode:"130000" }) { marketId currentStatus country zipcode timestamp } }
成功後會返回:
而後在輸入一個新的 Market 此次設置 marketId 爲 id-2,以後獲得:
而後咱們用 getAllMarkets 查詢一下當前狀態:
能夠看到咱們有兩條數據,且 currentStatus 狀態都爲 closed。下面咱們將 marketId 爲 id-1 的數據 currentStatus 改成 open:
再經過查詢看下當前 Statestore 狀態:
發現,依然返回兩條數據,可是其中一條 marketId 爲 」id-1「 的數據 currentStatus 改成了 open。恰好達到了咱們更新 Statestore 的效果。
最後再檢查一下是否全部數據變更都被成功捕獲到 Kafka 後臺,在命令行輸入:
$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic market-event-store 命令行輸出: {"marketId":"id-1","currentStatus":"closed","country":"china","zipcode":"130000","timestamp":1587549158784} {"marketId":"id-2","currentStatus":"closed","country":"germany","zipcode":"81477","timestamp":1587549166942} {"marketId":"id-1","currentStatus":"open","country":"china","zipcode":"130000","timestamp":1587549177136}
能夠觀察到,marketId 爲 id-1 的數據在歷史數據中一共出現了兩次,也符合咱們以前的需求。
6 總結
從代碼能夠看出,Micronaut 總體實現起來的思路和 Springboot 是十分相像的,Micronaut 延續了依賴注入和相似於 Bean 的概念,並且在此基礎上增強了對 Cloud Native 的支持,好比 AWS Serverless 支持,Docker 部署,health check, metrics,distributed tracking 等。
在服務端開發時,我刻意從 Java 改成了 Kotlin,但同時會在 Kotlin 中引用一些經典的 Java 生態的庫,兩者無縫銜接使開發更加輕鬆、歡樂。代碼中目前缺乏測試,這塊知識涉及到的內容比較廣,但願之後能夠慢慢補全。
在數據無損地錄入 Kafka 以後,能夠藉助 Kafka Connect 這類工具導入進企業的數據倉庫或者 Hadoop 這種分佈式系統支撐的 Data Lake 中,再在這個基礎上對數據倉庫建模(這部分也能夠用 Kafka Streams 操做),這樣就能夠爲機器學習提供高質量的數據。同時這種 ELT(區別於ETL)的流式數據處理方式也使得整個數據倉庫能更加高效,低延時地作實時歷史數據統計,爲企業提供更多商業價值。
Event Sourcing 系統的另外一大受益者是 NLP 領域的知識圖譜。其實知識圖譜的搭建本質上就是一個圖數據庫建模的過程,但這個圖數據庫的數據狀態要想產生更大的價值是須要按期更新的。經過 Kafka Streams 支撐的 Event Sourcing 系統使咱們能夠實時地更新圖譜中各個節點間的狀態、關係,並結合機器學習,圖算法和分佈式流式處理作更花式的操做。
性能方面,若是碰到高併發的業務場景能夠經過 Kubernetes 輕鬆給 Kafka Streams 擴容,或者提供更高效的分佈式數據庫給後臺提供訪問,若是有須要,徹底能夠將這個項目中的 寫(Producer),數據 Transformation和讀 作進一步的微服務拆分,不須要 Web 服務的地方就去掉 Netty Server。
本文沒有涉及到異步操做,測試開發,監控,部署,運維,安全等話題,但願往後能有機會作適當補充。有些領域好比「安全「也不在我我的技術棧範圍以內,並且部署方式在每一個公司都不太同樣,最多隻能作基本介紹。