破解 Kotlin 協程(3) - 協程調度篇

關鍵詞:Kotlin 異步編程 協程java

上一篇咱們知道了協程啓動的幾種模式,也經過示例認識了 launch 啓動協程的使用方法,本文將延續這些內容從調度的角度來進一步爲你們揭示協程的奧義。android

1. 協程上下文

調度器本質上就是一個協程上下文的實現,咱們先來介紹下上下文。編程

前面咱們提到 launch 函數有三個參數,第一個參數叫 上下文,它的接口類型是 CoroutineContext,一般咱們見到的上下文的類型是 CombinedContext 或者 EmptyCoroutineContext,一個表示上下文的組合,另外一個表示什麼都沒有。咱們來看下 CoroutineContext 的接口方法:緩存

@SinceKotlin("1.3")
public interface CoroutineContext {
    public operator fun <E : Element> get(key: Key<E>): E?
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R
    public operator fun plus(context: CoroutineContext): CoroutineContext = ...
    public fun minusKey(key: Key<*>): CoroutineContext

    public interface Key<E : Element>

    public interface Element : CoroutineContext {
        public val key: Key<*>
        ...
    }
}

複製代碼

不知道你們有沒有發現,它簡直就是一個以 Key 爲索引的 List安全

CoroutineContext List
get(Key) get(Int)
plus(CoroutineContext) plus(List)
minusKey(Key) removeAt(Int)

表中的 List.plus(List) 實際上指的是擴展方法 Collection<T>.plus(elements: Iterable<T>): List<T>bash

CoroutineContext 做爲一個集合,它的元素就是源碼中看到的 Element,每個 Element 都有一個 key,所以它能夠做爲元素出現,同時它也是 CoroutineContext 的子接口,所以也能夠做爲集合出現。數據結構

講到這裏,你們就會明白,CoroutineContext 原來是個數據結構啊。若是你們對於 List 的遞歸定義比較熟悉的話,那麼對於 CombinedContextEmptyCoroutineContext 也就很容易理解了,例如 scala 的 List是這麼定義的:多線程

sealed abstract class List[+A] extends ... {
    ...
    def head: A
    def tail: List[A]
    ...
}

複製代碼

在模式匹配的時候,List(1,2,3,4) 是能夠匹配 x::y 的,x 就是 1,y 則是 List(2,3,4)併發

CombinedContext 的定義也很是相似:框架

internal class CombinedContext(
    private val left: CoroutineContext,
    private val element: Element
) : CoroutineContext, Serializable {
    ...
}
複製代碼

只不過它是反過來的,前面是集合,後面是單獨的一個元素。咱們在協程體裏面訪問到的 coroutineContext 大可能是這個 CombinedContext 類型,表示有不少具體的上下文實現的集合,咱們若是想要找到某一個特別的上下文實現,就須要用對應的 Key 來查找,例如:

suspend fun main(){
    GlobalScope.launch {
        println(coroutineContext[Job]) // "coroutine#1":StandaloneCoroutine{Active}@1ff62014
    }
    println(coroutineContext[Job]) // null,suspend main 雖然也是協程體,但它是更底層的邏輯,所以沒有 Job 實例
}

複製代碼

這裏的 Job 其實是對它的 companion object 的引用

public interface Job : CoroutineContext.Element {
    /** * Key for [Job] instance in the coroutine context. */
    public companion object Key : CoroutineContext.Key<Job> { ... }
    ...
}

複製代碼

因此咱們也能夠仿照 Thread.currentThread() 來一個獲取當前 Job 的方法:

suspend inline fun Job.Key.currentJob() = coroutineContext[Job]

suspend fun coroutineJob(){
    GlobalScope.launch {
        log(Job.currentJob())
    }
    log(Job.currentJob())
}
複製代碼

咱們能夠經過指定上下文爲協程添加一些特性,一個很好的例子就是爲協程添加名稱,方便調試:

GlobalScope.launch(CoroutineName("Hello")) {
    ...
}
複製代碼

若是有多個上下文須要添加,直接用 + 就能夠了:

GlobalScope.launch(Dispatchers.Main + CoroutineName("Hello")) {
    ...
}
複製代碼

Dispatchers.Main 是調度器的一個實現,不用擔憂,咱們很快就會認識它了。

2. 協程攔截器

費了好大勁兒說完上下文,這裏就要說一個比較特殊的存在了——攔截器。

public interface ContinuationInterceptor : CoroutineContext.Element {
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>
    
    public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
    ...
}

複製代碼

攔截器也是一個上下文的實現方向,攔截器能夠左右你的協程的執行,同時爲了保證它的功能的正確性,協程上下文集合永遠將它放在最後面,這真可謂是天選之子了。

它攔截協程的方法也很簡單,由於協程的本質就是回調 + 「黑魔法」,而這個回調就是被攔截的 Continuation 了。用過 OkHttp 的小夥伴一下就興奮了,攔截器我經常使用的啊,OkHttp 用攔截器作緩存,打日誌,還能夠模擬請求,協程攔截器也是同樣的道理。調度器就是基於攔截器實現的,換句話說調度器就是攔截器的一種。

咱們能夠本身定義一個攔截器放到咱們的協程上下文中,看看會發生什麼。

class MyContinuationInterceptor: ContinuationInterceptor{
    override val key = ContinuationInterceptor
    override fun <T> interceptContinuation(continuation: Continuation<T>) = MyContinuation(continuation)
}

class MyContinuation<T>(val continuation: Continuation<T>): Continuation<T> {
    override val context = continuation.context
    override fun resumeWith(result: Result<T>) {
        log("<MyContinuation> $result" )
        continuation.resumeWith(result)
    }
}

複製代碼

咱們只是在回調處打了一行日誌。接下來咱們把用例拿出來:

suspend fun main() {
    GlobalScope.launch(MyContinuationInterceptor()) {
        log(1)
        val job = async {
            log(2)
            delay(1000)
            log(3)
            "Hello"
        }
        log(4)
        val result = job.await()
        log("5. $result")
    }.join()
    log(6)
}

複製代碼

這多是迄今而止咱們給出的最複雜的例子了,不過請你們不要被它嚇到,它依然很簡單。咱們經過 launch 啓動了一個協程,爲它指定了咱們本身的攔截器做爲上下文,緊接着在其中用 async 啓動了一個協程,asynclaunch 從功能上是同等類型的函數,它們都被稱做協程的 Builder 函數,不一樣之處在於 async 啓動的 Job 也就是實際上的 Deferred 能夠有返回結果,能夠經過 await 方法獲取。

可想而知,result 的值就是 Hello。那麼這段程序運行的結果如何呢?

15:31:55:989 [main] <MyContinuation> Success(kotlin.Unit)  // ①
15:31:55:992 [main] 1
15:31:56:000 [main] <MyContinuation> Success(kotlin.Unit) // ②
15:31:56:000 [main] 2
15:31:56:031 [main] 4
15:31:57:029 [kotlinx.coroutines.DefaultExecutor] <MyContinuation> Success(kotlin.Unit) // ③
15:31:57:029 [kotlinx.coroutines.DefaultExecutor] 3
15:31:57:031 [kotlinx.coroutines.DefaultExecutor] <MyContinuation> Success(Hello) // ④
15:31:57:031 [kotlinx.coroutines.DefaultExecutor] 5. Hello
15:31:57:031 [kotlinx.coroutines.DefaultExecutor] 6
複製代碼

「// ①」 不是程序輸出的內容,僅爲後續講解方便而作的標註。

你們可能就要奇怪了,你不是說 Continuation 是回調麼,這裏面回調調用也就一次啊(await 那裏),怎麼日誌打印了四次呢?

別慌,咱們按順序給你們介紹。

首先,全部協程啓動的時候,都會有一次 Continuation.resumeWith 的操做,這一次操做對於調度器來講就是一次調度的機會,咱們的協程有機會調度到其餘線程的關鍵之處就在於此。 ①、② 兩處都是這種狀況。

其次,delay 是掛起點,1000ms 以後須要繼續調度執行該協程,所以就有了 ③ 處的日誌。

最後,④ 處的日誌就很容易理解了,正是咱們的返回結果。

可能有朋友還會有疑問,我並無在攔截器當中切換線程,爲何從 ③ 處開始有了線程切換的操做?這個切換線程的邏輯源自於 delay,在 JVM 上 delay 其實是在一個 ScheduledExcecutor 裏面添加了一個延時任務,所以會發生線程切換;而在 JavaScript 環境中則是基於 setTimeout,若是運行在 Nodejs 上,delay 就不會切線程了,畢竟人家是單線程的。

若是咱們在攔截器當中本身處理了線程切換,那麼就實現了本身的一個簡單的調度器,你們有興趣能夠本身去嘗試。

思考:攔截器能夠有多個嗎?

3. 調度器

3.1 概述

有了前面的基礎,咱們對於調度器的介紹就變得水到渠成了。

public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    ...
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)
    ...
}

複製代碼

它自己是協程上下文的子類,同時實現了攔截器的接口, dispatch 方法會在攔截器的方法 interceptContinuation 中調用,進而實現協程的調度。因此若是咱們想要實現本身的調度器,繼承這個類就能夠了,不過一般咱們都用現成的,它們定義在 Dispatchers 當中:

val Default: CoroutineDispatcher
val Main: MainCoroutineDispatcher
val Unconfined: CoroutineDispatcher
複製代碼

這個類的定義涉及到了 Kotlin MPP 的支持,所以你在 Jvm 版本當中還會看到 val IO: CoroutineDispatcher,在 js 和 native 當中就只有前面提到的這三個了(對 Jvm 好偏愛吶)。

Jvm Js Native
Default 線程池 主線程循環 主線程循環
Main UI 線程 與 Default 相同 與 Default 相同
Unconfined 直接執行 直接執行 直接執行
IO 線程池 -- --
  • IO 僅在 Jvm 上有定義,它基於 Default 調度器背後的線程池,並實現了獨立的隊列和限制,所以協程調度器從 Default 切換到 IO 並不會觸發線程切換。
  • Main 主要用於 UI 相關程序,在 Jvm 上包括 Swing、JavaFx、Android,可將協程調度到各自的 UI 線程上。
  • Js 自己就是單線程的事件循環,與 Jvm 上的 UI 程序比較相似。

3.2 編寫 UI 相關程序

Kotlin 的用戶絕大多數都是 Android 開發者,你們對 UI 的開發需求仍是比較大的。咱們舉一個很常見的場景,點擊一個按鈕作點兒異步的操做再回調刷新 UI:

getUserBtn.setOnClickListener { 
    getUser { user ->
        handler.post {
            userNameView.text = user.name
        }
    }
}

複製代碼

咱們簡單得給出 getUser 函數的聲明:

typealias Callback = (User) -> Unit

fun getUser(callback: Callback){
    ...
}

複製代碼

因爲 getUser 函數須要切到其餘線程執行,所以回調一般也會在這個非 UI 的線程中調用,因此爲了確保 UI 正確被刷新,咱們須要用 handler.post 切換到 UI 線程。上面的寫法就是咱們最古老的寫法了。

後來又有了 RxJava,那麼事情開始變得有趣了起來:

fun getUserObservable(): Observable<User> {
    return Observable.create<User> { emitter ->
        getUser {
            emitter.onNext(it)
        }
    }
}

複製代碼

因而點擊按鈕的事件能夠這麼寫:

getUserBtn.setOnClickListener {
    getUserObservable()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe { user ->
                userNameView.text = user.name
            }
}

複製代碼

其實 RxJava 在線程切換上的表現是很是優秀的,也正是如此,不少人甚至用它只是爲了切線程方便!

那麼咱們如今把這段代碼過渡到協程的寫法:

suspend fun getUserCoroutine() = suspendCoroutine<User> {
    continuation ->
    getUser {
        continuation.resume(it)
    }
}

複製代碼

按鈕點擊時,咱們能夠:

getUserBtn.setOnClickListener {
    GlobalScope.launch(Dispatchers.Main) {
        userNameView.text = getUserCoroutine().name
    }
}

複製代碼

你們也能夠用 anko-coroutines 當中的 View.onClick 擴展,這樣咱們就無需本身在這裏用 launch 啓動協程了。有關 Anko 對協程的支持,咱們後面專門安排一篇文章介紹。

這裏又有你們沒見過的內容啦,suspendCoroutine 這個方法並非幫咱們啓動協程的,它運行在協程當中而且幫咱們獲取到當前協程的 Continuation 實例,也就是拿到回調,方便後面咱們調用它的 resume 或者 resumeWithException 來返回結果或者拋出異常。

若是你重複調用 resume 或者 resumeWithException 會收穫一枚 IllegalStateException,仔細想一想這是爲何。

對比前面的 RxJava 的作法,你會發現這段代碼其實很容易理解,你甚至會發現協程的使用場景與 RxJava 竟是如此的類似。這裏咱們用到了 Dispatchers.Main 來確保 launch 啓動的協程在調度時始終調度到 UI 線程,那麼下面咱們來看看 Dispatchers.Main 的具體實現。

在 Jvm 上,Main 的實現也比較有意思:

internal object MainDispatcherLoader {
    @JvmField
    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

    private fun loadMainDispatcher(): MainCoroutineDispatcher {
        return try {
            val factories = MainDispatcherFactory::class.java.let { clz ->
                ServiceLoader.load(clz, clz.classLoader).toList()
            }
            factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories)
                ?: MissingMainCoroutineDispatcher(null)
        } catch (e: Throwable) {
            MissingMainCoroutineDispatcher(e)
        }
    }
}

複製代碼

在 Android 當中,協程框架經過註冊 AndroidDispatcherFactory 使得 Main 最終被賦值爲 HandlerDispatcher 的實例,有興趣的能夠去看下 kotlinx-coroutines-android 的源碼實現。

注意前面對於 RxJava 和協程的實現,咱們都沒有考慮異常和取消的問題。有關異常和取消的話題,咱們會在後面的文章中詳細介紹。

3.3 綁定到任意線程的調度器

調度器的目的就是切線程,你不要想着我在 dispatch 的時候根據本身的心情來隨機調用,那你是在害你本身(不怕各位笑話,這樣的代碼我還真寫過,僅供娛樂)。那麼問題就簡單了,咱們只要提供線程,調度器就應該很方便的建立出來:

suspend fun main() {
    val myDispatcher= Executors.newSingleThreadExecutor{ r -> Thread(r, "MyThread") }.asCoroutineDispatcher()
    GlobalScope.launch(myDispatcher) {
        log(1)
    }.join()
    log(2)
}

複製代碼

輸出的信息就代表協程運行在咱們本身的線程上。

16:10:57:130 [MyThread] 1
16:10:57:136 [MyThread] 2
複製代碼

不過請你們注意,因爲這個線程池是咱們本身建立的,所以咱們須要在合適的時候關閉它,否則的話:

咱們能夠經過主動關閉線程池或者調用:

myDispatcher.close()
複製代碼

來結束它的生命週期,再次運行程序就會正常退出了。

固然有人會說你建立的線程池的線程不是 daemon 的,因此主線程結束時 Jvm 不會中止運行。說的沒錯,但該釋放的仍是要及時釋放,若是你只是在程序的整個生命週期當中短暫的用了一下這個調度器,那麼一直不關閉它對應的線程池豈不是會有線程泄露嗎?這就很尷尬了。

Kotlin 協程設計者也特別懼怕你們注意不到這一點,還特意廢棄了兩個 API 而且開了一個 issue 說咱們要重作這套 API,這兩個可憐的傢伙是誰呢?

廢棄的兩個基於線程池建立調度器的 API

fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher
fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher
複製代碼

這兩者能夠很方便的建立綁定到特定線程的調度器,但過於簡潔的 API 彷佛會讓人忘記它的風險。Kotlin 一貫不愛作這種不清不楚的事兒,因此您呢,仍是像咱們這一節例子當中那樣本身去構造線程池吧,這樣好歹本身忘了關閉也怨不着別人(哈哈哈)。

其實在多個線程上運行協程,線程老是這樣切來切去其實並不會顯得很輕量級,例以下面的例子就是比較可怕的了:

Executors.newFixedThreadPool(10)
        .asCoroutineDispatcher().use { dispatcher ->
            GlobalScope.launch(dispatcher) {
                log(1)
                val job = async {
                    log(2)
                    delay(1000)
                    log(3)
                    "Hello"
                }
                log(4)
                val result = job.await()
                log("5. $result")
            }.join()
            log(6)
        }
        
複製代碼

這裏面除了 delay 那裏有一次不可避免的線程切換外,其餘幾處協程掛起點的繼續操做(Continuation.resume)都會切線程:

16:28:04:771 [pool-1-thread-1] 1
16:28:04:779 [pool-1-thread-1] 4
16:28:04:779 [pool-1-thread-2] 2
16:28:05:790 [pool-1-thread-3] 3
16:28:05:793 [pool-1-thread-4] 5. Hello
16:28:05:794 [pool-1-thread-4] 6
複製代碼

若是咱們的線程池只開 1 個線程,那麼這裏全部的輸出都將在這惟一的線程中打印:

16:40:14:685 [pool-1-thread-1] 1
16:40:14:706 [pool-1-thread-1] 4
16:40:14:710 [pool-1-thread-1] 2
16:40:15:723 [pool-1-thread-1] 3
16:40:15:725 [pool-1-thread-1] 5. Hello
16:40:15:725 [pool-1-thread-1] 6
複製代碼

對比這兩者,10個線程的狀況線程切換次數最少 3次,而 1 個線程的狀況則只要 delay 1000ms 以後恢復執行的時候那一次。只是多兩次線程切換,到底會有多大影響呢?我在我本身的 2015 款 mbp 上對於兩種不一樣的狀況分別循環運行 100 次,獲得的平均時間以下:

線程數 10 1
耗時ms 1006.00 1004.97

注意,爲了測試的公平性,在運行 100 次循環以前已經作好了預熱,確保全部類都已經加載。測試結果僅供參考。

也就是說多兩次線程切換平均能多出 1ms 的耗時。生產環境當中的代碼固然會更復雜,若是這樣用線程池去調度,結果可想而知。

實際上一般咱們只須要在一個線程當中處理本身的業務邏輯,只有一些耗時的 IO 才須要切換到 IO 線程中處理,因此好的作法能夠參考 UI 對應的調度器,本身經過線程池定義調度器的作法自己沒什麼問題,但最好只用一個線程,由於多線程除了前面說的線程切換的開銷外,還有線程安全的問題。

3.4 線程安全問題

Js 和 Native 的併發模型與 Jvm 不一樣,Jvm 暴露了線程 API 給用戶,這也使得協程的調度能夠由用戶更靈活的選擇。越多的自由,意味着越多的代價,咱們在 Jvm 上面編寫協程代碼時須要明白一點的是,線程安全問題在調度器不一樣的協程之間仍然存在。

好的作法,就像咱們前面一節提到的,儘可能把本身的邏輯控制在一個線程以內,這樣一方面節省了線程切換的開銷,另外一方面還能夠避免線程安全問題,一箭雙鵰。

若是你們在協程代碼中使用鎖之類的併發工具就反而增長了代碼的複雜度,對此個人建議是你們在編寫協程代碼時儘可能避免對外部做用域的可變變量進行引用,儘可能使用參數傳遞而非對全局變量進行引用。

如下是一個錯誤的例子,你們很容易就能想明白:

suspend fun main(){
    var i = 0
    Executors.newFixedThreadPool(10)
            .asCoroutineDispatcher().use { dispatcher ->
                List(1000000) {
                    GlobalScope.launch(dispatcher) {
                        i++
                    }
                }.forEach {
                    it.join()
                }
            }
    log(i)
}

複製代碼

輸出的結果:

16:59:28:080 [main] 999593
複製代碼

4. suspend main 函數如何調度?

上一篇文章咱們提到了 suspend main 會啓動一個協程,咱們示例中的協程都是它的子協程,但是這個最外層的協程究竟是怎麼來的呢?

咱們先給出一個例子:

suspend fun main() {
    log(1)
    GlobalScope.launch {
        log(2)
    }.join()
    log(3)
}

複製代碼

它等價於下面的寫法:

fun main() {
    runSuspend {
        log(1)
        GlobalScope.launch {
            log(2)
        }.join()
        log(3)
    }
}

複製代碼

那你說這個 runSuspend 又是何妨神聖?它是 Kotlin 標準庫的一個方法,注意它不是 kotlinx.coroutines 當中的,它實際上屬於更底層的 API 了。

internal fun runSuspend(block: suspend () -> Unit) {
    val run = RunSuspend()
    block.startCoroutine(run)
    run.await()
}

複製代碼

而這裏面的 RunSuspend 則是 Continuation 的實現:

private class RunSuspend : Continuation<Unit> {
    override val context: CoroutineContext
        get() = EmptyCoroutineContext

    var result: Result<Unit>? = null

    override fun resumeWith(result: Result<Unit>) = synchronized(this) {
        this.result = result
        (this as Object).notifyAll()
    }

    fun await() = synchronized(this) {
        while (true) {
            when (val result = this.result) {
                null -> (this as Object).wait()
                else -> {
                    result.getOrThrow() // throw up failure
                    return
                }
            }
        }
    }
}

複製代碼

它的上下文是空的,所以 suspend main 啓動的協程並不會有任何調度行爲。

經過這個例子咱們能夠知道,實際上啓動一個協程只須要有一個 lambda 表達式就能夠了,想當年 Kotlin 1.1 剛發佈的時候,我寫了一系列的教程都是以標準庫 API 爲基礎的,後來發現標準庫的 API 也許真的不是給咱們用的,因此看看就好。

上述代碼在標準庫當中被修飾爲 internal,所以咱們沒法直接使用它們。不過你能夠把 RunSuspend.kt 當中的內容複製到你的工程當中,這樣你就能夠直接使用啦,其中的 var result: Result<Unit>? = null 可能會報錯,不要緊,改爲 private var result: Result<Unit>? = null 就能夠了。

5. 小結

在這篇文章當中,咱們介紹了協程上下文,介紹了攔截器,進而最終引出了咱們的調度器,截止目前,咱們還有異常處理、協程取消、Anko 對協程的支持等話題沒有講到,若是你們有協程相關想了解的話題,能夠留言哈~


歡迎關注 Kotlin 中文社區!

中文官網:www.kotlincn.net/

中文官方博客:www.kotliner.cn/

公衆號:Kotlin

知乎專欄:Kotlin

CSDN:Kotlin中文社區

掘金:Kotlin中文社區

簡書:Kotlin中文社區

相關文章
相關標籤/搜索