IBM的研究稱,整我的類文明所得到的所有數據中,有90%是過去兩年內產生的。在此背景下,包括NoSQL,Hadoop, Spark, Storm, Kylin在內的大批新技術應運而生。其中以RxJava和Reactor爲表明的響應式(Reactive)編程技術針對的就是經典的大數據html
4V定義: Volume,Variety,Velocity,Value)java
中的Velocity,即高併發問題,而在即將發佈的Spring 5中,也引入了響應式編程的支持。在接下來的博客文章中,我會圍繞響應式編程相關的主題與你分享個人學習心得。做爲第一篇,首先從Spring 5 和 Spring WebFlux 談起。react
響應式宣言和敏捷宣言同樣,提及響應式編程,必先提到響應式宣言。git
We want systems that are Responsive, Resilient, Elastic and Message Driven. We call these Reactive Systems. - The Reactive Manifestogithub
響應式宣言中也包含了4組關鍵詞:web
Responsive: 可響應的。要求系統儘量作到在任什麼時候候都能及時響應。
Resilient: 可恢復的。要求系統即便出錯了,也能保持可響應性。
Elastic: 可伸縮的。要求系統在各類負載下都能保持可響應性。
Message Driven: 消息驅動的。要求系統經過異步消息鏈接各個組件。
能夠看到,對於任何一個響應式系統,首先要保證的就是可響應性,不然就稱不上是響應式系統。從這個意義上來講,動不動就藍屏的Windows系統顯然不是一個響應式系統。spring
(https://docs.spring.io/spring...)編程
左側是傳統的基於Servlet的Spring Web MVC框架json
右側是5.0版本新引入的基於Reactive Streams的Spring WebFlux框架api
從上到下依次是
三個新組件。
對標@Controller,@RequestMapping等標準的Spring MVC註解,提供一套函數式風格的API,用於建立Router,Handler和Filter。
協調上下游各個組件提供響應式編程支持。
一種支持背壓(Backpressure)的異步數據流處理標準,主流實現有RxJava和Reactor,Spring WebFlux默認集成的是Reactor。
在Web容器的選擇上,Spring WebFlux既支持像Tomcat,Jetty這樣的的傳統容器(前提是支持Servlet 3.1 Non-Blocking IO API),又支持像Netty,Undertow那樣的異步容器。無論是何種容器,Spring WebFlux都會將其輸入輸出流適配成Flux<DataBuffer>格式,以便進行統一處理。
值得一提的是,除了新的Router Functions接口,Spring WebFlux同時支持使用老的Spring MVC註解聲明Reactive Controller。和傳統的MVC Controller不一樣,Reactive Controller操做的是非阻塞的ServerHttpRequest和ServerHttpResponse,而再也不是Spring MVC裏的HttpServletRequest和HttpServletResponse。
下面是示例工程詳解。
工程目錄結構
~/ak47/webflux$ tree . ├── build.gradle ├── gradle │ └── wrapper │ ├── gradle-wrapper.jar │ └── gradle-wrapper.properties ├── gradlew ├── gradlew.bat ├── src │ ├── main │ │ ├── java │ │ ├── kotlin │ │ │ └── com │ │ │ └── easy │ │ │ └── kotlin │ │ │ └── webflux │ │ │ └── WebfluxApplication.kt │ │ └── resources │ │ └── application.properties │ └── test │ ├── java │ ├── kotlin │ │ └── com │ │ └── easy │ │ └── kotlin │ │ └── webflux │ │ └── WebfluxApplicationTests.kt │ └── resources └── webflux.iml 19 directories, 11 files
項目依賴配置
buildscript { ext { kotlinVersion = '1.1.51' springBootVersion = '2.0.0.BUILD-SNAPSHOT' } repositories { mavenCentral() maven { url "https://repo.spring.io/snapshot" } maven { url "https://repo.spring.io/milestone" } } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") classpath("org.jetbrains.kotlin:kotlin-gradle-plugin:${kotlinVersion}") classpath("org.jetbrains.kotlin:kotlin-allopen:${kotlinVersion}") } } apply plugin: 'kotlin' apply plugin: 'kotlin-spring' apply plugin: 'eclipse' apply plugin: 'org.springframework.boot' apply plugin: 'io.spring.dependency-management' group = 'com.easy.kotlin' version = '0.0.1-SNAPSHOT' sourceCompatibility = 1.8 compileKotlin { kotlinOptions.jvmTarget = "1.8" } compileTestKotlin { kotlinOptions.jvmTarget = "1.8" } repositories { mavenCentral() maven { url "https://repo.spring.io/snapshot" } maven { url "https://repo.spring.io/milestone" } } dependencies { compile('org.springframework.boot:spring-boot-starter-webflux') compile("org.jetbrains.kotlin:kotlin-stdlib-jre8") compile("org.jetbrains.kotlin:kotlin-reflect") testCompile('org.springframework.boot:spring-boot-starter-test') testCompile('io.projectreactor:reactor-test') }
這是 Spring Initializr 幫咱們自動生成的樣板工程。下面咱們分別來加入 Model 、dao 、 service 、 handler 等模塊的內容。
源碼目錄結構設計以下
├── src │ ├── main │ │ ├── java │ │ ├── kotlin │ │ │ └── com │ │ │ └── easy │ │ │ └── kotlin │ │ │ └── webflux │ │ │ ├── WebfluxApplication.kt │ │ │ ├── dao │ │ │ │ └── PersonRepository.kt │ │ │ ├── handler │ │ │ │ └── PersonHandler.kt │ │ │ ├── model │ │ │ │ └── Person.kt │ │ │ ├── router │ │ │ │ └── RouterConfig.kt │ │ │ ├── server │ │ │ │ └── HttpServerConfig.kt │ │ │ └── service │ │ │ └── PersonService.kt │ │ └── resources │ │ └── application.properties
Person
package com.easy.kotlin.webflux.model import com.fasterxml.jackson.annotation.JsonProperty class Person(@JsonProperty("name") val name: String, @JsonProperty("age") val age: Int) { override fun toString(): String { return "Person{" + "name='" + name + '\'' + ", age=" + age + '}' } }
PersonRepository
package com.easy.kotlin.webflux.dao import com.easy.kotlin.webflux.model.Person import reactor.core.publisher.Flux import reactor.core.publisher.Mono interface PersonRepository { fun getPerson(id: Int): Mono<Person> fun allPeople(): Flux<Person> fun savePerson(person: Mono<Person>): Mono<Void> } /* * Mono 和 Flux 是由 Project Reactor 提供的 Reactive 類型。 * Springs 同時支持其餘 Reactive 流實現,如 RXJava。 * Mono 和 Flux 是由 Reactive 流的 Publisher 中實現的。 * Mono 是一個用來發送 0 或者單值數據的發佈器, * Flux 能夠用來發送 0 到 N 個值。 * * 這很是相似 Flowable 和 RxJava 中的 Observable 。它們表示在訂閱這些發佈服務時發送數值流。 */
PersonService
package com.easy.kotlin.webflux.service import com.easy.kotlin.webflux.model.Person import com.easy.kotlin.webflux.dao.PersonRepository import org.springframework.stereotype.Service import reactor.core.publisher.Flux import reactor.core.publisher.Mono @Service class PersonService : PersonRepository { var persons: MutableMap<Int, Person> = hashMapOf() constructor() { this.persons[1] = Person("Jack", 20) this.persons[2] = Person("Rose", 16) } override fun getPerson(id: Int): Mono<Person> { return Mono.justOrEmpty(this.persons[id]) } override fun allPeople(): Flux<Person> { return Flux.fromIterable(this.persons.values) } override fun savePerson(person: Mono<Person>): Mono<Void> { return person.doOnNext { val id = this.persons.size + 1 persons.put(id, it) println("Saved ${person} with ${id}") }.thenEmpty(Mono.empty()) } }
PersonHandler
package com.easy.kotlin.webflux.handler import com.easy.kotlin.webflux.dao.PersonRepository import com.easy.kotlin.webflux.model.Person import org.springframework.beans.factory.annotation.Autowired import reactor.core.publisher.Mono import org.springframework.web.reactive.function.server.ServerRequest import org.springframework.web.reactive.function.server.ServerResponse import org.springframework.http.MediaType.APPLICATION_JSON import org.springframework.stereotype.Service import org.springframework.web.reactive.function.BodyInserters.fromObject @Service class PersonHandler { @Autowired lateinit var repository: PersonRepository fun getPerson(request: ServerRequest): Mono<ServerResponse> { val personId = Integer.valueOf(request.pathVariable("id"))!! val notFound = ServerResponse.notFound().build() val personMono = this.repository.getPerson(personId) return personMono .flatMap { person -> ServerResponse.ok().contentType(APPLICATION_JSON).body(fromObject(person)) } .switchIfEmpty(notFound) } fun createPerson(request: ServerRequest): Mono<ServerResponse> { val person = request.bodyToMono(Person::class.java) return ServerResponse.ok().build(this.repository.savePerson(person)) } fun listPeople(request: ServerRequest): Mono<ServerResponse> { val people = this.repository.allPeople() return ServerResponse.ok().contentType(APPLICATION_JSON).body(people, Person::class.java) } }
RouterConfig
package com.easy.kotlin.webflux.router import com.easy.kotlin.webflux.handler.PersonHandler import org.springframework.beans.factory.annotation.Autowired import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.http.MediaType.APPLICATION_JSON import org.springframework.web.reactive.function.server.HandlerFunction import org.springframework.web.reactive.function.server.RequestPredicates.GET import org.springframework.web.reactive.function.server.RequestPredicates.accept import org.springframework.web.reactive.function.server.RouterFunction import org.springframework.web.reactive.function.server.RouterFunctions.route @Configuration class RouterConfig { @Autowired lateinit var personHandler: PersonHandler @Bean fun routerFunction(): RouterFunction<*> { return route(GET("/api/person").and(accept(APPLICATION_JSON)), HandlerFunction { personHandler.listPeople(it) }) .and(route(GET("/api/person/{id}").and(accept(APPLICATION_JSON)), HandlerFunction { personHandler.getPerson(it) })) } }
HttpServerConfig
package com.easy.kotlin.webflux.server import org.springframework.beans.factory.annotation.Autowired import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.core.env.Environment import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter import org.springframework.web.reactive.function.server.RouterFunction import org.springframework.web.reactive.function.server.RouterFunctions import reactor.ipc.netty.http.server.HttpServer @Configuration class HttpServerConfig { @Autowired lateinit var environment: Environment @Bean fun httpServer(routerFunction: RouterFunction<*>): HttpServer { val httpHandler = RouterFunctions.toHttpHandler(routerFunction) val adapter = ReactorHttpHandlerAdapter(httpHandler) val server = HttpServer.create("localhost", environment.getProperty("server.port").toInt()) server.newHandler(adapter) return server } }
項目啓動入口類 WebfluxApplication
package com.easy.kotlin.webflux import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.runApplication @SpringBootApplication class WebfluxApplication fun main(args: Array<String>) { runApplication<WebfluxApplication>(*args) }
啓動運行
注意到這行:
Mapped ((GET && /api/person) && Accept: [application/json]) -> com.easy.kotlin.webflux.router.RouterConfig$routerFunction$1@46292372 ((GET && /api/person/{id}) && Accept: [application/json]) -> com.easy.kotlin.webflux.router.RouterConfig$routerFunction$2@126be319
完整啓動日誌
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.0.0.BUILD-SNAPSHOT) 2017-11-04 00:39:46.046 INFO 2884 --- [ main] c.e.kotlin.webflux.WebfluxApplicationKt : Starting WebfluxApplicationKt on jacks-MacBook-Air.local with PID 2884 (/Users/jack/ak47/webflux/out/production/classes started by jack in /Users/jack/ak47/webflux) 2017-11-04 00:39:46.077 INFO 2884 --- [ main] c.e.kotlin.webflux.WebfluxApplicationKt : No active profile set, falling back to default profiles: default 2017-11-04 00:39:46.247 INFO 2884 --- [ main] .r.c.ReactiveWebServerApplicationContext : Refreshing org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext@4b0b0854: startup date [Sat Nov 04 00:39:46 CST 2017]; root of context hierarchy 2017-11-04 00:39:48.995 INFO 2884 --- [ main] o.s.w.r.f.s.s.RouterFunctionMapping : Mapped ((GET && /api/person) && Accept: [application/json]) -> com.easy.kotlin.webflux.router.RouterConfig$routerFunction$1@46292372 ((GET && /api/person/{id}) && Accept: [application/json]) -> com.easy.kotlin.webflux.router.RouterConfig$routerFunction$2@126be319 2017-11-04 00:39:49.017 INFO 2884 --- [ main] o.s.w.r.handler.SimpleUrlHandlerMapping : Mapped URL path [/webjars/**] onto handler of type [class org.springframework.web.reactive.resource.ResourceWebHandler] 2017-11-04 00:39:49.017 INFO 2884 --- [ main] o.s.w.r.handler.SimpleUrlHandlerMapping : Mapped URL path [/**] onto handler of type [class org.springframework.web.reactive.resource.ResourceWebHandler] 2017-11-04 00:39:49.215 INFO 2884 --- [ main] o.s.w.r.r.m.a.ControllerMethodResolver : Looking for @ControllerAdvice: org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext@4b0b0854: startup date [Sat Nov 04 00:39:46 CST 2017]; root of context hierarchy 2017-11-04 00:39:50.309 INFO 2884 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup 2017-11-04 00:39:50.459 INFO 2884 --- [ctor-http-nio-1] r.ipc.netty.tcp.BlockingNettyContext : Started HttpServer on /0:0:0:0:0:0:0:0:9000 2017-11-04 00:39:50.459 INFO 2884 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 9000 2017-11-04 00:39:50.466 INFO 2884 --- [ main] c.e.kotlin.webflux.WebfluxApplicationKt : Started WebfluxApplicationKt in 5.047 seconds (JVM running for 6.276)
$ curl http://127.0.0.1:9000/api/person [{"name":"Jack","age":20},{"name":"Rose","age":16}] $ curl http://127.0.0.1:9000/api/person/1 {"name":"Jack","age":20} $ curl http://127.0.0.1:9000/api/person/2 {"name":"Rose","age":16}
Spring Web 是一個命令式的編程框架,能夠很方便的進行開發和調試。你須要根據實際狀況去決定採用 Spring 5 Reactive 或者是 Spring Web 命令式框架。在不少狀況下,命令式的編程風格就能夠知足,但當你的應用須要高可伸縮性,那麼 Reactive 非堵塞方式是最適合的。
本章工程源代碼:https://github.com/EasyKotlin...
參考書籍:
《 Kotlin + Spring Boot : K2EE 服務端開發實戰 》
《 Kotlin 極簡教程》
參考文章:
Spring Framework 5.0 M5 Update : https://spring.io/blog/2017/0...