Kotlin Coroutine 初探

1、前言

Kotlin 是近兩年興起的一門編程語言,最近一年的發展速度很快。在2017年,Google 宣佈 Kotlin 成爲 Android 的官方開發語言。同時,做爲 Java 服務器端開發領域的帶頭大哥之一的 Spring 也對 Kotlin 提供了全面的支持。html

在 Kotlin 衆多的特性中,在 1.1 中做爲實驗特性加入的 Coroutine(協程,能夠簡單看做是輕量級線程)技術是很是值得關注的。由於你們都知道,近些年大紅大紫的 Go 語言的流行同其協程特性就有着很大的關係,而這也讓 Java 的線程技術看上去顯得很落後。由於傳統的 Java 應用提升併發的主要方式就是開啓更多的線程,但線程太多會致使資源的浪費,過低又容易致使併發不夠。雖然如 Netty 這樣的技術能解決 IO 密集場景下的併發問題,可是使用門檻比較高,學習曲線比較陡,不易於大面積使用。java

而 Kotlin 的 Coroutine 特性爲 JVM 上的高併發應用開發帶了一個很是有但願的新選擇,所以是十分值得關注的。本篇文章將向你們介紹 Kotlin Coroutine 是什麼、如何使用、以及能解決哪些問題等內容。react

本文的內容基於 Kotlin Coroutine 0.21.2 版本。git

2、Kotlin Coroutine 是什麼

Kotlin Coroutine 是 Kotlin 爲了實現更好的異步和併發程序所提供的一個特性,從 1.1 版本開始引入。不一樣於其它的編程語言,Kotlin 將其 Coroutine 特性的大部份內容做爲了一個擴展庫:kolinx.coroutines,語言層面僅提供了有限的支持。github

例如,C#、ECMAScript 等語言都將 asyncawait 作爲了關鍵字,但在 Kotlin 中,這些只是普通的方法。在 Kotlin 中,和 Coroutine 相關的關鍵字僅有 suspend編程

不只如此,Kotlin 還將 Coroutine 庫作了進一步拆分,分紅了核心模塊 kotlinx-coroutines-core 和與其它異步、併發技術集成的模塊,如:kotlinx-coroutines-jdk8kotlinx-coroutines-reactivekotlinx-coroutines-rx1/rx2kotlinx-coroutines-reactor 等。服務器

目前(Kotlin 1.2),Kotlin Coroutine 還只是一個實驗特性。因此,Kotlin Coroutine 相關類的包名中包含了 experimental 的字樣。但 Kotlin 將要正式包含 Coroutine 特性基本上是板上釘釘的事了。按照目前的計劃,Kotlin 1.3 將會正式包含 Coroutine 特性。目前 Coroutine 的總體設計和使用方式也早已肯定,不會發生明顯變化。併發

3、Kotlin Coroutine 的使用

接下來咱們看看 Kotlin Coroutine 在不一樣場景中如何解決咱們在異步和併發編程中所遇到的問題和痛點。app

場景一:延遲執行

咱們在開發的時候,常常遇到須要等待一段時間以後在執行某些語句的場景。這時,咱們經常使用 Thread.sleep 實現:框架

@Test
fun delay_with_thread() {
    log.info("Hello")
    Thread.sleep(1000)
    log.info("World")
}

這樣作效率比較低,由於線程白白地浪費了一秒鐘。若是這段代碼調用量比較大,那就很浪費資源了。

咱們能夠改進一下,使用 ScheduledThreadPool:

@Test
fun delay_with_scheduler() {
    val scheduler = Executors.newScheduledThreadPool(1)
    log.info("Hello")
    scheduler.schedule({
        log.info("World")
    }, 1, TimeUnit.SECONDS)
    scheduler.shutdown()
    scheduler.awaitTermination(1, TimeUnit.SECONDS)
}

這樣作雖然效率高了,可是缺點也很明顯,那就是代碼變得很不直觀了。若是代碼再複雜,那就更加不易理解了。

若是用 Kotlin Coroutine,該怎麼寫呢?

@Test
fun delay_with_coroutine() {
    runBlocking {
        log.info("Hello")
        delay(1000)
        log.info("World")
    }
}

是否是很簡單,和第一個版本惟一的區別就是把 Thread.sleep(1000) 換成了 delay(1000)。而且,delay(1000) 並不會掛起當前線程,這樣代碼執行效率就高的多了。

場景二:Completable Future

Kotlin Coroutine 提供了與各類異步技術的集成,包括 JDK8 Completable Future、Google Guava 的 Listenable Future、Spring 的 Reactor、Netflix 的 RxJava 等,但不包括 JDK5 中的 Future。緣由是傳統的 Future 接口並無提供任何回掉機制,因此 Kotlin Coroutine 沒法與其集成。所以,本節主要介紹 Kotlin Coroutine 如何與 CompletableFuture 集成。

按照傳統方式使用 CompletableFuture 須要調用 thenApplythenComposethenAccept 這樣的方法串聯起異步調用:

val future = CompletableFuture.supplyAsync({ 1 })
future.thenApply { value -> "${value + 2}" }
        .thenAccept({ value ->
    log.info(value.toString())
})

Kotlin Coroutine 爲 CompletableFuture 接口增長了 await 方法,能夠將回調轉換爲傳統的調用方式:

val future = CompletableFuture.supplyAsync({ 1 })
val value = future.await()
val result = value + 2
log.info(result.toString())

可見使用 Kotlin Coroutine 以後代碼獲得了明顯簡化。

場景三:反應式編程

接下來咱們來看看 Kotlin Coroutine 是如何簡化反應式編程的。

在 Spring 5 出現以後,開發人員能夠在 Web 開發領域更容易地使用反應式編程,從而提升系統的併發性能和伸縮性。但是,雖然像 Spring Reactor 項目、Netflix RxJava 項目等反應式編程技術使得異步編程變得簡單了許多,可是距離理想仍是有必定距離。

接下來我們就來看看現有的反應式編程技術存在的問題和 Kotlin Coroutine 是如何解決這些問題的。

直接使用 Spring Reactor

下面這段代碼的目的是根據人員 ID 查詢在他上次登陸以後,又有多少新消息。其中使用到了 Spring 5 的反應式編程特性,使用了 Reactor 的 API 和 Spring Data 中的 Reactive Repository。

@GetMapping("/reactive/{personId}")
fun getMessagesFor(@PathVariable personId: String): Mono<String> {
  return peopleRepository.findById(personId)
      .switchIfEmpty(Mono.error(NoSuchElementException()))
      .flatMap { person ->
          auditRepository.findByEmail(person.email)
              .flatMap { lastLogin ->
                  messageRepository.countByMessageDateGreaterThanAndEmail(lastLogin.eventDate, person.email)
                      .map { numberOfMessages ->
                          "Hello ${person.name}, you have $numberOfMessages messages since ${lastLogin.eventDate}"
                      }
              }
      }
}

看到上面這段代碼以後,我想大部分人的直觀感覺就是「好複雜」、「Callback Hell」等。

等等,不是說好了 Reactive Stream 方式能夠避免 Callback Hell 嗎?爲何這裏仍是存在 Callback Hell。其實,像 RxJava、Reactor 這樣的 Reactive Programming 框架,所能解決的 Callback Hell 問題的範圍是有限的。通常來講,若是一系列的調用,每一步只依賴上一步的結果,那用 Reactive Stream 的方式能夠完美的寫成鏈式調用:

monoA.flatMap(valueA -> {
  returnMonoB(valueA);
}).flatMap(valueB -> {
  returnMonoC(valueB);
}).flatMap(valueC -> {
  returnMonoD(valueC);
});

上面代碼中,monoA 中包含的值是 valueA,依次類推。

但問題是,現實中的業務需求哪裏會這麼簡單和理想。以上面的查詢新消息數的應用爲例,messageRepository.countByMessageDateGreaterThanAndEmail(lastLogin.eventDate, person.email) 這一步依賴了上一步的結果 lastLogin 和上上步的結果 person。不知足我以前所說的「每一步只依賴上一步的結果」的條件,致使這個例子不太容易寫成完美鏈式調用。

雖然經過一些小技巧能夠對上面的代碼進行必定程度的優化,但優化以後可讀性仍是不高。

使用 Kotlin Coroutine

Spring 5 對 Kotlin 提供了完備的支持。一樣,Kotlin 也增長了對 Spring 的支持。其中一個即是對 Spring Reactor 項目的支持。因而咱們可使用 Kotlin Coroutine 改造上面的代碼:

@GetMapping("/coroutine/{personId}")
fun getNumberOfMessages(@PathVariable personId: String) = mono(Unconfined) {
    val person = peopleRepository.findById(personId).awaitFirstOrDefault(null)
            ?: throw NoSuchElementException("No person can be found by $personId")

    val lastLoginDate = auditRepository.findByEmail(person.email).awaitSingle().eventDate

    val numberOfMessages =
            messageRepository.countByMessageDateGreaterThanAndEmail(lastLoginDate, person.email).awaitSingle()

    "Hello ${person.name}, you have $numberOfMessages messages since $lastLoginDate"
}

改造以後代碼最明顯的變化就是代碼可讀性提升了不少。代碼的可讀性對全部的軟件系統都是十分重要,若是代碼很難讓人理解,那軟件系統的維護、升級工做的成本就會很高。所以,Kotlin Coroutine 對異步編程的代碼可讀性的提高是很是有價值的。

說明:若是查詢結果爲空,調用 awaitSingle 會致使程序拋出 NoSuchElementException,並沒有法直接經過 try...catch 捕獲(只能經過 Mono 的錯誤處理回調方法處理,如 doOnErroronErrorCosume 等)。爲了除了查詢結果可能爲空的狀況,使用了 awaitFirstOrDefault 方法。

4、解釋

上面介紹了使用 Kotlin Coroutine 所帶來的一些好處。接下來將對上面的代碼和 Kotlin Coroutine 中的重要概念進行介紹。

suspending 方法

用一句話歸納 Kotlin Coroutine 的特色能夠是「以同步之名,行異步之實」。那這個「實」是怎麼行的?關鍵就是 suspending 方法。上面幾個 Kotlin Coroutine 的例子出現了多個 suspending 方法:delayawaitawaitSingle 等。這些 suspending 方法可以使程序執行過程暫停,但又不掛起線程。從而可讓程序既高效,又易懂。

suspending 方法的聲明很簡單,只需在方法或 Lambda 定義前面加 suspend 關鍵字便可。下面以 awaitSingle 爲例:

public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)

suspending 方法聲明容易,但 suspending 方法的使用卻有限制,並非在任何地方均可以調用 suspending 方法。suspending 方法只能兩種地方被調用,一是在另外一個 suspending 方法中,二是在 Coroutine Builder 中被調用。因此,咱們接下來看看什麼是 Coroutine Builder。

Coroutine Builder

Coroutine Builder,顧名思義,是用來建立 Coroutine 的。對於 Coroutine Builder 到底是如何建立 Coroutine 的,我放在後面的文章再講。咱們先來看看 Coroutine Builder 具體都有哪些,以及它們如何使用。

常見的 Coroutine Builder 有 runBlockinglaunchasync,以及用於和 Spring Reactor 配合使用的 monoflux

簡單來講,Coroutine Builder 就是一些方法,這些方法接受 suspending lambda 做爲參數,並將其放入 Coroutine 中執行。一個完整的 Coroutine 調用的開始都是一個 Coroutine Builder。

簡單說一下幾個常見的 Coroutine Builder 的用法:

runBlocking

這個 Coroutine Builder 的做用是阻塞調用它的線程。例如,在上面 delay 的例子中,就使用了 runBlocking。

launch

這個 Coroutine Builder 會建立一個 Coroutine 並執行它,並返回一個 Job 對象,用於控制這個 Coroutine 的執行,但沒有結果的返回。

例如,以前 delay例子也能夠這麼寫

fun main(args: Array<String>) {
    launch { // launch new coroutine in background and continue
        delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
        println("World!") // print after delay
    }
    println("Hello,") // main thread continues while coroutine is delayed
    Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}

(說明:由於 launch 不會掛起線程,因此須要使用 Thread.sleep 避免主線程提早退出)

launch 方法會返回一個 Job 對象:

@Test
fun delay_with_coroutine_launch() {
    runBlocking {
        log.info("Hello")
        val job = launch {
            // launch new coroutine and keep a reference to its Job
            delay(5000L)
            log.info("World")
        }
        job.cancel()
        job.join() // wait until child coroutine completes
    }
}

Job 對象提供了 cancel()join() 等方法,用來控制 Job 的執行(由於 join() 方法也是一個 suspending 方法,因此外面有加了一層 runBlocking

async

launch 相似,async 也能夠用來啓動一個 Coroutine。不一樣的是,launch 返回的 Job,其只能控制 Coroutine 的執行,可是不能獲得任何返回結果。async 返回的是 Deferred,你能夠經過調用其提供的 await() 方法獲得 Coroutine 的運行結果:

@Test
fun delay_with_async() {
    log.info("Start to demo async")

    val one = async {
        delay(1000)
        1
    }

    val two = async {
        delay(2000)
        2
    }

    runBlocking { log.info("${one.await() + two.await()}") }
}

mono 和 flux

最後介紹的兩個 Coroutine Builder 是 kotlinx-coroutines-reactor 所提供的,用於和 Spring 的 Reactor 項目集成。從上面的示例看,mono 與前幾個 Coroutine Builder 有所不一樣。最明顯的區別是在後面的括號了帶了一個 Unconfined。簡單說,這個 Unconfined 是一個 CoroutineDispatcher,用來限定使用什麼線程來執行 Coroutine。

在 Github 的 Kotlin 項目的文檔中,對 CoroutineDispatcher 有着詳細的描述(連接在最後給出)。接下來我對文檔裏的內容作一些解釋,方便你們理解。

全部的 Coroutine Builder 方法的第一參數都是 CoroutineContext。那爲何能夠把 CoroutineDispatcher 做爲參數傳給 Coroutine Builder 呢?

原來 CoroutineDispatcher 實現了 CoroutineContext.Element 接口,而 Element 又是一個特殊的 CoroutineContext,其是隻存放了一個元素的 CoroutineContext。因此,CoroutineDispatcher 也是一個 CoroutineContext。這個 CoroutineContext 僅包含一個元素,而這個元素就是 CoroutineDispatcher 本身。

當 Coroutine 執行的時候,Kotlin 會看當前 Coroutine 的 CoroutineContext 裏面是否有 CoroutineDispatcher。若是有,則使用 CoroutineDispatcher 限定 Coroutine 所使用的線程。

當不給 Coroutine Builder 制定參數時,launchasync,以及 monoflux 默認使用的 CoroutineDispatcherCommonPool,一個公共的線程池實現。runBlocking 默認使用的是 BlockingEventLoop。另外一個常見的 CoroutineDispatcher 實現就是 mono 例子中的 Unconfined

Unconfined 意思就是不限定。在第一個暫停點以前,Coroutine 的執行線程都是調用的線程。在第一個暫停點以後,用哪一個線程執行就是由 suspending 方法決定了。

例如,在「反應式編程」這個示例中,peopleRepository.findById(personId) 的執行是使用的調用線程。以後的執行是使用 Mongo 異步客戶端回調線程(其中的 Repository 基於 Mongo 異步客戶端)。

5、小結

現在面對高併發應用開發場景,Java 傳統的線程模型顯得愈來愈力不從心。Java 社區也意識到了這個問題,因而出現了一批提供輕量級線程解決方案的項目,如 Quasar 項目、Alibaba JDK 的協程解決方案、Open JDK Project Loom 提案,也包括反應式編程技術。但這些方案都存在這樣或那樣的問題。

Kotlin Coroutine 的出現爲解決 Java 高併發應用開發提供了新的選擇,帶來了新的但願。但咱們也須要看到,Kotlin Coroutine 只是剛剛起步,還有很長的路要走。同時,Kotlin Coroutine 雖然在形式上簡化了異步代碼的開發,但也對使用者提出了至關的要求。若是對 Java 併發、NIO、反應式編程,以及 Kotlin 自己等技術缺少足夠的瞭解,那恐怕仍是難以順暢使用 Kotlin Coroutine 的。這可能也是 Java 程序開發難以擺脫的歷史包袱。

本篇文章介紹了簡要介紹了 Kotlin Coroutine 的概念和使用場景、使用 Kotlin Coroutine 的好處,以及一些關鍵概念。後續的文章將會給你們詳細介紹 Kotlin Coroutine、其實現原理和 Kotlin Coroutine 同其它相似技術的比較。

本文一些示例使用了日誌,這裏提醒你們須要注意在實際項目中要避免日誌阻塞線程問題。避免線程阻塞是幾乎全部高性能異步應用開發都須要注意的。

附:參考

相關文章
相關標籤/搜索