在前一篇文章《使用 Kotlin + Spring Boot 進行後端開發》中,曾介紹過嘗試使用 Kotlin 來作後端開發。這一次,嘗試 WebFlux 以及協程。html


plugins {
    id 'java'
    id 'org.jetbrains.kotlin.jvm' version '1.3.10'
    id "org.jetbrains.kotlin.plugin.allopen" version "1.3.10"

ext {
    libraries = [

            rxjava : "2.2.2",

            logback : "1.2.3",

            spring_boot : "2.1.0.RELEASE",

            kotlinx_coroutines_core : "1.0.1"

group 'com.kotlin.tutorial'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.8

def libs = rootProject.ext.libraries // 庫

repositories {

dependencies {
    compile "org.jetbrains.kotlin:kotlin-stdlib-jdk8"
    compile "org.jetbrains.kotlin:kotlin-reflect:1.3.10"
    testCompile group: 'junit', name: 'junit', version: '4.12'

    implementation "io.reactivex.rxjava2:rxjava:${libs.rxjava}"

    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:${libs.kotlinx_coroutines_core}"
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:${libs.kotlinx_coroutines_core}"

    implementation "ch.qos.logback:logback-classic:${libs.logback}"
    implementation "ch.qos.logback:logback-core:${libs.logback}"
    implementation "ch.qos.logback:logback-access:${libs.logback}"

    implementation "org.springframework.boot:spring-boot-starter-web:${libs.spring_boot}"
    implementation "org.springframework.boot:spring-boot-starter-data-mongodb-reactive:${libs.spring_boot}"

compileKotlin {
    kotlinOptions.jvmTarget = "1.8"
compileTestKotlin {
    kotlinOptions.jvmTarget = "1.8"

這次,使用了 allopen 插件。它是官方提供的插件詳見:kotlinlang.org/docs/refere…react

Kotlin 的類默認是final的,通常須要使用open關鍵字。使用了allopen插件就能夠節省open關鍵字。值得注意的是,須要打開 Intellij 的 Enable annotation processing 選項。git

這樣,建立 SpringKotlinApplication 就不須要使用opengithub

import org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.SpringBootApplication

/** * Created by tony on 2018/11/13. */
class SpringKotlinApplication

fun main(args: Array<String>) {
    SpringApplication.run(SpringKotlinApplication::class.java, *args)

另外,不要忘記配置數據庫的信息,例子採用的是 MongoDB。web


WebFlux 是 Spring 5 新增的特性,相對於傳統 MVC 的同步阻塞IO模型,它採用異步非阻塞的IO模型。spring

WebFlux 的 Flux 取自於 Reactor 中的類 Flux。Reactor 是 Spring 5 響應式開發的基礎。mongodb

Reactor 是徹底基於響應式流規範設計和實現的庫,Flux 和 Mono 是 Reactor 中的兩個基本概念。數據庫

Flux 相似 RxJava 的 Observable,它能夠觸發零到多個事件,並根據實際狀況結束處理或觸發錯誤。Mono 最多隻觸發一個事件,它跟 RxJava 的 Single 和 Maybe 相似,因此能夠把 Mono 用於在異步任務完成時發出通知。編程

1.1 建立 Model

首先,建立幾個 Model 類。

User 表示用戶對象。

import org.springframework.data.annotation.Id

/** * Created by tony on 2018/11/22. */
data class User(@Id val id: String? = null, val name: String, val age: Int, val address: Address) {

    constructor() : this(null, "", 0, Address())
    constructor(name: String, age: Int, address: Address) : this(null, name = name, age = age, address = address)

Address 記錄用戶的地址。

import org.springframework.data.annotation.Id

/** * Created by tony on 2018/11/22. */
data class Address(@Id val id: String? = null, val number: Int, val street: String, val city: String) {

    constructor() : this(null, 0, "", "")
    constructor(number: Int, street: String, city: String) : this(null, number, street, city)

Audit 用於記錄用戶操做的時間。

import org.springframework.data.annotation.Id
import java.time.LocalDateTime

/** * Created by tony on 2018/11/22. */
data class Audit(@Id val id: String? = null, val name: String, val eventDate: LocalDateTime) {

    constructor() : this(null, "",LocalDateTime.now())

    constructor(name: String, eventDate: LocalDateTime) : this(null, name, eventDate)

1.2 建立 Repository

建立 UserReactiveRepository 用於 User 對象的查詢操做,它實現 ReactiveMongoRepository 接口。

import com.kotlin.tutorial.model.User
import org.springframework.data.mongodb.repository.ReactiveMongoRepository
import org.springframework.stereotype.Repository
import reactor.core.publisher.Flux

/** * Created by tony on 2018/11/22. */
interface UserReactiveRepository : ReactiveMongoRepository<User, String> {

    fun findUserByAge(age: Int): Flux<User>

    fun findUserByAddressCity(city: String): Flux<User>

    fun findUserByAgeAndAddressCity(age: Int, city: String): Flux<User>

建立 AuditRepository 用於查詢用戶最近一條的操做時間。

import com.kotlin.tutorial.model.Audit
import org.springframework.data.repository.CrudRepository
import org.springframework.stereotype.Repository

/** * Created by tony on 2018/11/22. */
interface AuditRepository: CrudRepository<Audit, String> {

    fun findFirstByNameOrderByEventDateDesc(name: String): Audit

1.3 建立 Service

建立 UserReactiveService,經過依賴注入了 userRepository、auditRepository。

import com.kotlin.tutorial.Utils.toLower
import com.kotlin.tutorial.model.Address
import com.kotlin.tutorial.model.Audit
import com.kotlin.tutorial.model.User
import com.kotlin.tutorial.repository.AuditRepository
import com.kotlin.tutorial.repository.UserReactiveRepository
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import reactor.core.publisher.Flux
import java.time.LocalDateTime

/** * Created by tony on 2018/11/22. */
class UserReactiveService {

    lateinit var userRepository: UserReactiveRepository

    lateinit var auditRepository: AuditRepository

    companion object {

        val cities = listOf("Shanghai", "Suzhou", "Hangzhou").toLower()
        val streets = listOf("renming road", "zhongshan road").toLower()

    fun find(age: Int?, rawCity: String?): Flux<User> {
        val city = rawCity?.toLowerCase()

        return when {

            age is Int && city is String -> userRepository.findUserByAgeAndAddressCity(age, city)

            city is String -> userRepository.findUserByAddressCity(city)

            age is Int -> userRepository.findUserByAge(age)

            else -> userRepository.findAll()

    fun generateData(): Flux<User> {

        val list = listOf(20, 25, 33, 28, 34).map {

            val u = generate(it)

            auditRepository.save(Audit(u.name, LocalDateTime.now()))


        return userRepository.deleteAll().thenMany(userRepository.saveAll(list))

    private fun generate(age: Int): User {

        val address = Address(age, streets[age % streets.size], cities[age % cities.size])
        return User("Tony$age", age, address)

1.4 建立 Controller

建立 UserController 編寫兩個 reactive 的接口:

class UserController {

    lateinit var userReactiveService: UserReactiveService

    fun findByReactive(@RequestParam age: Int?, @RequestParam city: String?) = userReactiveService.find(age, city)

    fun genDataByReactive() = userReactiveService.generateData()



curl http://localhost:8080/user/reactive/generate


curl http://localhost:8080/user/reactive/find?city=suzhou

RxJava 2

RxJava 庫是 JVM 上響應式編程的先驅,也是響應式流規範(Reactive Streams)的基礎。

若是對 RxJava 2 不熟悉,也能夠購買個人《RxJava 2.x 實戰》

2.1 建立 Repository

建立 UserRxJavaRepository 功能跟 UserReactiveRepository 同樣,只是多了一個 findUserByName() 方法。

import com.kotlin.tutorial.model.User
import io.reactivex.Flowable
import org.springframework.data.repository.reactive.RxJava2CrudRepository
import org.springframework.stereotype.Repository

/** * Created by tony on 2018/11/22. */
interface UserRxJavaRepository : RxJava2CrudRepository<User, String> {

    fun findUserByName(name: String): Flowable<User>

    fun findUserByAge(age: Int): Flowable<User>

    fun findUserByAddressCity(city: String): Flowable<User>

    fun findUserByAgeAndAddressCity(age: Int, city: String): Flowable<User>

2.2 建立 JavaService

建立 UserRxJavaService ,相似於 UserReactiveService。可是,多了兩個方法:findByName()、login()。其中,調用 login() 會添加一條審計的記錄。

import com.kotlin.tutorial.Utils.toLower
import com.kotlin.tutorial.model.Address
import com.kotlin.tutorial.model.Audit
import com.kotlin.tutorial.model.User
import com.kotlin.tutorial.repository.AuditRepository
import com.kotlin.tutorial.repository.UserRxJavaRepository
import io.reactivex.Flowable
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import java.time.LocalDateTime

/** * Created by tony on 2018/11/22. */
class UserRxJavaService {

    lateinit var userRepository: UserRxJavaRepository

    lateinit var auditRepository: AuditRepository

    companion object {

        val cities = listOf("Shanghai", "Suzhou", "Hangzhou").toLower()
        val streets = listOf("renming road", "zhongshan road").toLower()

    fun findByName(name: String): Flowable<User> = userRepository.findUserByName(name)

    fun find(age: Int?, rawCity: String?): Flowable<User> {

        val city = rawCity?.toLowerCase()

        return when {

            age is Int && city is String -> userRepository.findUserByAgeAndAddressCity(age, city)

            city is String -> userRepository.findUserByAddressCity(city)

            age is Int -> userRepository.findUserByAge(age)

            else -> userRepository.findAll()

    fun generateData(): Flowable<User> {

        val list = listOf(20, 25, 33, 28, 34).map {

            val u = generate(it)

            auditRepository.save(Audit(u.name, LocalDateTime.now()))


        return userRepository.deleteAll().andThen(userRepository.saveAll(list))

    private fun generate(age: Int): User {

        val address = Address(age, streets[age % streets.size], cities[age % cities.size])

        return User("Tony$age", age, address)

    fun login(name: String) =
            .map {
                auditRepository.save(Audit(it.name, LocalDateTime.now()))

2.3 建立 Controller

在原有的 UserController 中新增兩個 rxjava 的接口:

class UserController {

    lateinit var userRxJavaService: UserRxJavaService

    fun findByRx(@RequestParam age: Int?, @RequestParam city: String?) = userRxJavaService.find(age, city)

    fun genDateByRx() = userRxJavaService.generateData()


Kotlin 1.3 的 Coroutines


Coroutines 是 Kotlin 1.1 增長的實驗的功能,到 Kotlin 1.3 已經變成了正式的功能。

先在 UserController 建立一個模擬登錄的接口,訪問該接口時會添加一條審計的記錄

    fun mockLogin(@RequestParam username: String) = userRxJavaService.login(username)

而後嘗試用傳統的 blocking 方式來編寫一個獲取登錄信息的接口:

    fun getNormalLoginMessage(@PathVariable username: String):String {

        val user = userService.findByName(username)

        val lastLoginTime = auditService.findByName(user.name).eventDate

        return "Hi ${user.name}, you have logged in since $lastLoginTime"

再嘗試用 RxJava 的方式來編寫該接口:

    fun getRxLoginMessage(@PathVariable username: String)=
                    .map {

                    .map {

                        "Hi ${username}, you have logged in since $it"

最後,使用 Coroutines 的方式來編寫接口:

    fun getLoginMessage(@PathVariable username: String) = runBlocking {

        val user = userRxJavaService.findByName(username).awaitSingle()

        val lastLoginTime = GlobalScope.async {



        "Hi ${user.name}, you have logged in since $lastLoginTime"

能夠看到,使用協程的方式相似於傳統的 blocking 的方式來編寫代碼。



使用 Coroutines 的方式獲取登錄信息:

使用 Coroutines 的方式獲取登錄信息.png

關於協程,更多能夠參考以前寫的 Coroutines 筆記:

Kotlin Coroutines 筆記 (一)Kotlin Coroutines 筆記 (二)

雖然 Kotlin 1.3 以後有些變更,可是大致是不變的。以後,也會整理更多 Kotlin Coroutines 筆記。



另外,Kotlin 1.3 以後的協程已是正式版本,Kotlin 在語言級別上支持了協程,它是異步編程的另外一個不錯的選擇。

本文 demo 的 github 地址:github.com/fengzhizi71…

