Kotlin 使用 Spring WebFlux 實現響應式編程

Kotlin 使用 Spring WebFlux 實現響應式編程

IBM的研究稱,整我的類文明所得到的所有數據中,有90%是過去兩年內產生的。在此背景下,包括NoSQL,Hadoop, Spark, Storm, Kylin在內的大批新技術應運而生。其中以RxJava和Reactor爲表明的響應式(Reactive)編程技術針對的就是經典的大數據html

4V定義: Volume,Variety,Velocity,Value)java

中的Velocity,即高併發問題,而在即將發佈的Spring 5中,也引入了響應式編程的支持。在接下來的博客文章中,我會圍繞響應式編程相關的主題與你分享個人學習心得。做爲第一篇,首先從Spring 5 和 Spring WebFlux 談起。react

響應式宣言

響應式宣言和敏捷宣言同樣,提及響應式編程,必先提到響應式宣言。git

We want systems that are Responsive, Resilient, Elastic and Message Driven. We call these Reactive Systems. - The Reactive Manifestogithub

響應式宣言中也包含了4組關鍵詞:web

Responsive: 可響應的。要求系統儘量作到在任什麼時候候都能及時響應。
Resilient: 可恢復的。要求系統即便出錯了,也能保持可響應性。
Elastic: 可伸縮的。要求系統在各類負載下都能保持可響應性。
Message Driven: 消息驅動的。要求系統經過異步消息鏈接各個組件。
能夠看到,對於任何一個響應式系統,首先要保證的就是可響應性,不然就稱不上是響應式系統。從這個意義上來講,動不動就藍屏的Windows系統顯然不是一個響應式系統。spring

Spring 5 響應式Web框架架構圖

https://docs.spring.io/spring...編程

左側是傳統的基於Servlet的Spring Web MVC框架json

右側是5.0版本新引入的基於Reactive Streams的Spring WebFlux框架api

從上到下依次是

  • Router Functions
  • WebFlux
  • Reactive Streams

三個新組件。

Router Functions:

對標@Controller,@RequestMapping等標準的Spring MVC註解,提供一套函數式風格的API,用於建立Router,Handler和Filter。

WebFlux: 核心組件

協調上下游各個組件提供響應式編程支持。

Reactive Streams

一種支持背壓(Backpressure)的異步數據流處理標準,主流實現有RxJava和Reactor,Spring WebFlux默認集成的是Reactor。

在Web容器的選擇上,Spring WebFlux既支持像Tomcat,Jetty這樣的的傳統容器(前提是支持Servlet 3.1 Non-Blocking IO API),又支持像Netty,Undertow那樣的異步容器。無論是何種容器,Spring WebFlux都會將其輸入輸出流適配成Flux<DataBuffer>格式,以便進行統一處理。

值得一提的是,除了新的Router Functions接口,Spring WebFlux同時支持使用老的Spring MVC註解聲明Reactive Controller。和傳統的MVC Controller不一樣,Reactive Controller操做的是非阻塞的ServerHttpRequest和ServerHttpResponse,而再也不是Spring MVC裏的HttpServletRequest和HttpServletResponse。

下面是示例工程詳解。

示例工程詳解

螢幕快照 2017-11-03 22.24.39.png

螢幕快照 2017-11-03 22.25.54.png

螢幕快照 2017-11-03 22.26.04.png

螢幕快照 2017-11-03 22.26.14.png

螢幕快照 2017-11-03 22.47.01.png

工程目錄結構

~/ak47/webflux$ tree
.
├── build.gradle
├── gradle
│   └── wrapper
│       ├── gradle-wrapper.jar
│       └── gradle-wrapper.properties
├── gradlew
├── gradlew.bat
├── src
│   ├── main
│   │   ├── java
│   │   ├── kotlin
│   │   │   └── com
│   │   │       └── easy
│   │   │           └── kotlin
│   │   │               └── webflux
│   │   │                   └── WebfluxApplication.kt
│   │   └── resources
│   │       └── application.properties
│   └── test
│       ├── java
│       ├── kotlin
│       │   └── com
│       │       └── easy
│       │           └── kotlin
│       │               └── webflux
│       │                   └── WebfluxApplicationTests.kt
│       └── resources
└── webflux.iml

19 directories, 11 files

項目依賴配置

buildscript {
    ext {
        kotlinVersion = '1.1.51'
        springBootVersion = '2.0.0.BUILD-SNAPSHOT'
    }
    repositories {
        mavenCentral()
        maven { url "https://repo.spring.io/snapshot" }
        maven { url "https://repo.spring.io/milestone" }
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
        classpath("org.jetbrains.kotlin:kotlin-gradle-plugin:${kotlinVersion}")
        classpath("org.jetbrains.kotlin:kotlin-allopen:${kotlinVersion}")
    }
}

apply plugin: 'kotlin'
apply plugin: 'kotlin-spring'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'com.easy.kotlin'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8
compileKotlin {
    kotlinOptions.jvmTarget = "1.8"
}
compileTestKotlin {
    kotlinOptions.jvmTarget = "1.8"
}

repositories {
    mavenCentral()
    maven { url "https://repo.spring.io/snapshot" }
    maven { url "https://repo.spring.io/milestone" }
}


dependencies {
    compile('org.springframework.boot:spring-boot-starter-webflux')
    compile("org.jetbrains.kotlin:kotlin-stdlib-jre8")
    compile("org.jetbrains.kotlin:kotlin-reflect")
    testCompile('org.springframework.boot:spring-boot-starter-test')
    testCompile('io.projectreactor:reactor-test')
}

這是 Spring Initializr 幫咱們自動生成的樣板工程。下面咱們分別來加入 Model 、dao 、 service 、 handler 等模塊的內容。

源碼目錄結構設計以下

├── src
│   ├── main
│   │   ├── java
│   │   ├── kotlin
│   │   │   └── com
│   │   │       └── easy
│   │   │           └── kotlin
│   │   │               └── webflux
│   │   │                   ├── WebfluxApplication.kt
│   │   │                   ├── dao
│   │   │                   │   └── PersonRepository.kt
│   │   │                   ├── handler
│   │   │                   │   └── PersonHandler.kt
│   │   │                   ├── model
│   │   │                   │   └── Person.kt
│   │   │                   ├── router
│   │   │                   │   └── RouterConfig.kt
│   │   │                   ├── server
│   │   │                   │   └── HttpServerConfig.kt
│   │   │                   └── service
│   │   │                       └── PersonService.kt
│   │   └── resources
│   │       └── application.properties

Person

package com.easy.kotlin.webflux.model

import com.fasterxml.jackson.annotation.JsonProperty

class Person(@JsonProperty("name") val name: String, @JsonProperty("age") val age: Int) {

    override fun toString(): String {
        return "Person{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}'
    }
}

PersonRepository

package com.easy.kotlin.webflux.dao

import com.easy.kotlin.webflux.model.Person
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono


interface PersonRepository {

    fun getPerson(id: Int): Mono<Person>

    fun allPeople(): Flux<Person>

    fun savePerson(person: Mono<Person>): Mono<Void>
}


/*
 * Mono 和 Flux 是由 Project Reactor 提供的 Reactive 類型。
 * Springs 同時支持其餘 Reactive 流實現,如 RXJava。
 * Mono 和 Flux 是由 Reactive 流的 Publisher 中實現的。
 * Mono 是一個用來發送 0 或者單值數據的發佈器,
 * Flux 能夠用來發送 0 到 N 個值。
 *
 * 這很是相似 Flowable 和 RxJava 中的 Observable 。它們表示在訂閱這些發佈服務時發送數值流。 */

PersonService

package com.easy.kotlin.webflux.service

import com.easy.kotlin.webflux.model.Person
import com.easy.kotlin.webflux.dao.PersonRepository
import org.springframework.stereotype.Service
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono

@Service
class PersonService : PersonRepository {
    var persons: MutableMap<Int, Person> = hashMapOf()

    constructor() {
        this.persons[1] = Person("Jack", 20)
        this.persons[2] = Person("Rose", 16)
    }


    override fun getPerson(id: Int): Mono<Person> {
        return Mono.justOrEmpty(this.persons[id])
    }

    override fun allPeople(): Flux<Person> {
        return Flux.fromIterable(this.persons.values)
    }

    override fun savePerson(person: Mono<Person>): Mono<Void> {
        return person.doOnNext {
            val id = this.persons.size + 1
            persons.put(id, it)
            println("Saved ${person} with ${id}")
        }.thenEmpty(Mono.empty())

    }
}

PersonHandler

package com.easy.kotlin.webflux.handler

import com.easy.kotlin.webflux.dao.PersonRepository
import com.easy.kotlin.webflux.model.Person
import org.springframework.beans.factory.annotation.Autowired
import reactor.core.publisher.Mono

import org.springframework.web.reactive.function.server.ServerRequest
import org.springframework.web.reactive.function.server.ServerResponse

import org.springframework.http.MediaType.APPLICATION_JSON
import org.springframework.stereotype.Service
import org.springframework.web.reactive.function.BodyInserters.fromObject


@Service
class PersonHandler {

    @Autowired lateinit var repository: PersonRepository


    fun getPerson(request: ServerRequest): Mono<ServerResponse> {
        val personId = Integer.valueOf(request.pathVariable("id"))!!
        val notFound = ServerResponse.notFound().build()
        val personMono = this.repository.getPerson(personId)
        return personMono
            .flatMap { person -> ServerResponse.ok().contentType(APPLICATION_JSON).body(fromObject(person)) }
            .switchIfEmpty(notFound)
    }


    fun createPerson(request: ServerRequest): Mono<ServerResponse> {
        val person = request.bodyToMono(Person::class.java)
        return ServerResponse.ok().build(this.repository.savePerson(person))
    }

    fun listPeople(request: ServerRequest): Mono<ServerResponse> {
        val people = this.repository.allPeople()
        return ServerResponse.ok().contentType(APPLICATION_JSON).body(people, Person::class.java)
    }


}

RouterConfig

package com.easy.kotlin.webflux.router


import com.easy.kotlin.webflux.handler.PersonHandler
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.http.MediaType.APPLICATION_JSON
import org.springframework.web.reactive.function.server.HandlerFunction
import org.springframework.web.reactive.function.server.RequestPredicates.GET
import org.springframework.web.reactive.function.server.RequestPredicates.accept
import org.springframework.web.reactive.function.server.RouterFunction
import org.springframework.web.reactive.function.server.RouterFunctions.route


@Configuration
class RouterConfig {

    @Autowired lateinit var personHandler: PersonHandler

    @Bean
    fun routerFunction(): RouterFunction<*> {
        return route(GET("/api/person").and(accept(APPLICATION_JSON)),
                HandlerFunction { personHandler.listPeople(it) })

            .and(route(GET("/api/person/{id}").and(accept(APPLICATION_JSON)),
                    HandlerFunction { personHandler.getPerson(it) }))
    }

}

HttpServerConfig

package com.easy.kotlin.webflux.server

import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.core.env.Environment
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter
import org.springframework.web.reactive.function.server.RouterFunction
import org.springframework.web.reactive.function.server.RouterFunctions
import reactor.ipc.netty.http.server.HttpServer


@Configuration
class HttpServerConfig {
    @Autowired
    lateinit var environment: Environment

    @Bean
    fun httpServer(routerFunction: RouterFunction<*>): HttpServer {
        val httpHandler = RouterFunctions.toHttpHandler(routerFunction)
        val adapter = ReactorHttpHandlerAdapter(httpHandler)
        val server = HttpServer.create("localhost", environment.getProperty("server.port").toInt())
        server.newHandler(adapter)
        return server
    }

}

項目啓動入口類 WebfluxApplication

package com.easy.kotlin.webflux

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@SpringBootApplication
class WebfluxApplication

fun main(args: Array<String>) {
    runApplication<WebfluxApplication>(*args)
}

啓動運行

螢幕快照 2017-11-04 00.40.18.png

注意到這行:

Mapped ((GET && /api/person) && Accept: [application/json]) -> com.easy.kotlin.webflux.router.RouterConfig$routerFunction$1@46292372
((GET && /api/person/{id}) && Accept: [application/json]) -> com.easy.kotlin.webflux.router.RouterConfig$routerFunction$2@126be319

完整啓動日誌

.   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::  (v2.0.0.BUILD-SNAPSHOT)

2017-11-04 00:39:46.046  INFO 2884 --- [           main] c.e.kotlin.webflux.WebfluxApplicationKt  : Starting WebfluxApplicationKt on jacks-MacBook-Air.local with PID 2884 (/Users/jack/ak47/webflux/out/production/classes started by jack in /Users/jack/ak47/webflux)
2017-11-04 00:39:46.077  INFO 2884 --- [           main] c.e.kotlin.webflux.WebfluxApplicationKt  : No active profile set, falling back to default profiles: default
2017-11-04 00:39:46.247  INFO 2884 --- [           main] .r.c.ReactiveWebServerApplicationContext : Refreshing org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext@4b0b0854: startup date [Sat Nov 04 00:39:46 CST 2017]; root of context hierarchy
2017-11-04 00:39:48.995  INFO 2884 --- [           main] o.s.w.r.f.s.s.RouterFunctionMapping      : Mapped ((GET && /api/person) && Accept: [application/json]) -> com.easy.kotlin.webflux.router.RouterConfig$routerFunction$1@46292372
((GET && /api/person/{id}) && Accept: [application/json]) -> com.easy.kotlin.webflux.router.RouterConfig$routerFunction$2@126be319
2017-11-04 00:39:49.017  INFO 2884 --- [           main] o.s.w.r.handler.SimpleUrlHandlerMapping  : Mapped URL path [/webjars/**] onto handler of type [class org.springframework.web.reactive.resource.ResourceWebHandler]
2017-11-04 00:39:49.017  INFO 2884 --- [           main] o.s.w.r.handler.SimpleUrlHandlerMapping  : Mapped URL path [/**] onto handler of type [class org.springframework.web.reactive.resource.ResourceWebHandler]
2017-11-04 00:39:49.215  INFO 2884 --- [           main] o.s.w.r.r.m.a.ControllerMethodResolver   : Looking for @ControllerAdvice: org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext@4b0b0854: startup date [Sat Nov 04 00:39:46 CST 2017]; root of context hierarchy
2017-11-04 00:39:50.309  INFO 2884 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2017-11-04 00:39:50.459  INFO 2884 --- [ctor-http-nio-1] r.ipc.netty.tcp.BlockingNettyContext     : Started HttpServer on /0:0:0:0:0:0:0:0:9000
2017-11-04 00:39:50.459  INFO 2884 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 9000
2017-11-04 00:39:50.466  INFO 2884 --- [           main] c.e.kotlin.webflux.WebfluxApplicationKt  : Started WebfluxApplicationKt in 5.047 seconds (JVM running for 6.276)

測試輸出

$ curl http://127.0.0.1:9000/api/person
[{"name":"Jack","age":20},{"name":"Rose","age":16}]


$ curl http://127.0.0.1:9000/api/person/1
{"name":"Jack","age":20}



$ curl http://127.0.0.1:9000/api/person/2
{"name":"Rose","age":16}

小結

Spring Web 是一個命令式的編程框架,能夠很方便的進行開發和調試。你須要根據實際狀況去決定採用 Spring 5 Reactive 或者是 Spring Web 命令式框架。在不少狀況下,命令式的編程風格就能夠知足,但當你的應用須要高可伸縮性,那麼 Reactive 非堵塞方式是最適合的。

本章工程源代碼:https://github.com/EasyKotlin...

參考資料

參考書籍:

《 Kotlin + Spring Boot : K2EE 服務端開發實戰 》

《 Kotlin 極簡教程》

參考文章:
Spring Framework 5.0 M5 Update : https://spring.io/blog/2017/0...

https://github.com/poutsma/we...

相關文章
相關標籤/搜索