安卓-kotlin協程的異常處理機制分析

做者

你們好,我叫🐜java

本人於2020年10月加入37手遊安卓團隊微信

目前主要負責國內相關業務開發和一些平常業務markdown

背景

使用kotlin的協程一段時間了,經常使用的用法也已經很熟悉,但都是停留在使用的階段,沒有對代碼深刻了解過,仍是感受有點虛;趁着過年這段時間,針對協程的異常處理,對其相關的源碼學習了一波,梳理總結一下本身的理解。app

本文基於 Kotlin v1.4.0,Kotlin-Coroutines v1.3.9源碼分析less

一、CoroutineScope源碼分析

做用:建立和追蹤協程,管理不一樣協程之間的父子關係和結構 建立協程的方式:async

一、經過CoroutineScope建立ide

二、在協程中建立函數

第一種方式,首先如何經過CoroutineScope建立?oop

val scope = CoroutineScope(Job() + Dispatchers.Main) 
複製代碼
@Suppress("FunctionName")
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
    ContextScope(if (context[Job] != null) context else context + Job()) //沒有job實例的話就搞一個
複製代碼
internal class ContextScope(context: CoroutineContext) : CoroutineScope {
    override val coroutineContext: CoroutineContext = context
}
複製代碼

CoroutineScope是一個全局的方法,而後在裏面經過ContextScope就能夠實例出來一個CoroutineScope對象了。 相似咱們平時用到的MainScope或者Android平臺上viewModelScope和lifecycleScope(只不過在生命週期相關回調作了有些自動cancel的處理) 也是跑到這裏來。另外scope初始化的時候會有生成一個job,起到跟蹤的做用 這裏須要注意的是GlobalScope和普通協程的CoroutineScope的區別,GlobalScope的 Job 是爲空的,由於它的coroutineContext是EmptyCoroutineContext,是沒有job的源碼分析

有了scope以後,咱們就能夠經過launch建立一個協程了

val job = scope.launch {}
複製代碼

戳代碼看看

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    。。。省略代碼。。。
    return coroutine
}
複製代碼

launch參數有三個,前兩個參數先不不分析,第三個是一個帶receiver的lambda參數(參考Kotlin 中的Receiver 是什麼),默認的類型是CoroutineScope

val job = scope.launch {①/* this: CoroutineScope */
    // 新的協程會將 CoroutineScope 做爲父級 ,在launch裏面建立
    //由於launch是一個擴展方法, 因此上面例子中默認的receiver是this,因此如下兩種寫法同樣。這裏能夠理解爲這裏是一個回調,句柄是CoroutineScop
    launch { /* ... */ }
    this.launch { 
     // 經過 ① 建立的新協程做爲當前協程的父級    
     }
}
複製代碼

再看看CoroutineScope.launch的實現

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    //這裏是根據父級建立新的上下文(協程的父級上下文),而後給下面建立協程用,具體邏輯下面代碼塊分析
    val newContext = newCoroutineContext(context)
   //這裏就是建立協程
    val coroutine = if (start.isLazy)
        //協程真正的上下文生成是以newContext做爲父級上下文生成的
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
        //start裏面就是建立job相關的,不一樣的coroutine實例有不一樣的生成job策略
    coroutine.start(start, coroutine, block)
    return coroutine
}
複製代碼
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    //原來這是一個CoroutineScope的擴展函數,coroutineContext其實就是拿到到了scope對象的成員,而後經過「+」就能夠搞成了,下面會說「+」
    //能夠理解爲把一個context數據add到一個 context map數據組中,還有一些邏輯判斷,先無論,反正拿到的是一個新的context map
    val combined = coroutineContext + context
    //測試環境會給一下id拿來調試用的
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default else debug
}
複製代碼

「+」 如何相加的?這就涉及到的相關類 CoroutineContext: 全部上下文的接口 CombinedContext:上下文組合時生成的類 CoroutineContext.Element:大部分單個上下文實現的類,由於有的會直接實現CoroutineContext

public operator fun plus(context: CoroutineContext): CoroutineContext =
        //operator操做符重載的特性 eg:Job() + Dispatchers.IO + CoroutineName("test") 就會跑到這裏來 
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            //acc爲加數,element爲被加數
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    // make sure interceptor is always last in the context (and thus is fast to get when present)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }
複製代碼

能夠理解爲一個map(其實是一個單鏈表,詳細的能夠參考Kotlin協程上下文CoroutineContext是如何可相加的),經過key來獲取不一樣類型的數據,須要改變的話使用當前的CoroutineContext來建立一個新的CoroutineContext便可。 上面提到的val scope = CoroutineScope(Job() + Dispatchers.Main)

綜和以上兩個代碼片斷,咱們能夠知道,一個新建的協程CoroutineContext的元素組成

一、有一個元素job,控制協程的生命週期

二、剩餘的元素會從CoroutineContext 的父級繼承,該父級多是另一個協程或者建立該協程的 CoroutineScope

二、CoroutineScope的類型

2.一、協程做用域對異常傳播的影響

類型:

做用分析:

說明:

C2-1發生異常的時候,C2-1->C2->C2-2->C2->C1->C3(包括裏面的子協程)->C4

C3-1-1發生異常的時候,C3-1-1->C3-1-1-1,其餘不受影響

C3-1-1-1發生異常的時候,C3-1-1-1->C3-1-1,其餘不受影響

2.二、示意代碼

一、C1和C2沒有關係

GlobalScope.launch { //協程C1
    GlobalScope.launch {//協程C2
        //...
    }
}
複製代碼

二、C2和C3是C1的子協程,C2和C3異常會取消C1

GlobalScope.launch { //協程C1
    coroutineScoope {
         launch{}//協程C2
         launch{}//協程C3
    }
} 
複製代碼

三、C2和C3是C1的子協程,C2和C3異常不會取消C1

GlobalScope.launch { //協程C1
    supervisorScope {
         launch{}//協程C2
         launch{}//協程C3
    }
} 
複製代碼

2.三、舉個🌰

eg1:

@Test
fun test()= runBlocking{
    val handler = CoroutineExceptionHandler { coroutineContext, exception ->
        println("CoroutineExceptionHandler got $exception  coroutineContext ${coroutineContext}")
    }
    val job = GlobalScope.launch(handler) {
        println("1")
        delay(1000)
        coroutineScope {
            println("2")
            val job2 = launch(handler) {
                throwErrorTest()
            }
            println("3")
            job2.join()
            println("4")
        }
    }
    job.join()

}
fun throwErrorTest(){
    throw Exception("error test")
}
複製代碼

輸出結果:

若是是協同做用域,job2所在的協程發生異常,會把job取消(不會打印「4」),並且異常是從job所在協程拋出來的

eg2:

@Test
fun test()= runBlocking{
    val handler = CoroutineExceptionHandler { coroutineContext, exception ->
        println("CoroutineExceptionHandler got $exception  coroutineContext ${coroutineContext}")
    }
    val job = GlobalScope.launch(handler) {
        println("1")
        delay(1000)
        supervisorScope {
            println("2")
            val job2 = launch(handler) {
                throwErrorTest()
            }
            println("3")
            job2.join()
            println("4")
        }
    }
    job.join()

}
fun throwErrorTest(){
    throw Exception("error test")
}
複製代碼

輸出結果: 若是是主從做用域,job2所在的協程發生異常,不會把job取消(會打印「4」),並且異常是job2所在協程拋出來的

三、協程中異常處理的流程源碼分析

3.一、協程的三層包裝

第一層:launch和async返回的job,封裝了協程的狀態,提供取消協程的接口,實例都是繼承自AbstractCoroutine

第二層:編譯器生成(cps)的SuspendLambda的子類,封裝了協程的真正運算邏輯,繼承自BaseContinuationImpl,其中completion屬性就是協程的第一層包裝

第三層:DispatchedContinuation,封裝了線程調度邏輯,包含了協程的第二層包裝 三層包裝都實現了Continuation接口,經過代理模式將協程的各層包裝組合在一塊兒,每層負責不一樣的功能 運算邏輯在第二層BaseContinuationImpl的resumeWith()函數中的invokeSuspend運行

3.二、發生異常的入口

BaseContinuationImpl中的resumeWith(result: Result<Any?>)處理異常的邏輯,省略的部分代碼

public final override fun resumeWith(result: Result) {
    val completion = completion!! // fail fast when trying to resume continuation without completion
    val outcome: Result =
    。。。其餘代碼。。。
        try {
            val outcome = invokeSuspend(param)
            if (outcome === COROUTINE_SUSPENDED) return
            Result.success(outcome)
        } catch (exception: Throwable) {
            Result.failure(exception)
            }
            。。。其餘代碼。。。
        completion.resumeWith(outcome)
        。。。其餘代碼。。。
    }
複製代碼

由以上代碼分析可知

一、invokeSuspend(param)方法的具體實現是在編譯的生成的,對應協程體的處理邏輯

二、當發生異常的時候,即outcome爲Result.failure(exception),具體調用在completion.resumeWith(outcome)裏面,經過AbstractCoroutine.resumeWith(Result.failure(exception))進入到第三層包裝中

繼續跟蹤 AbstractCoroutine.resumeWith(result: Result) -> JobSupport.makeCompletingOnce(proposedUpdate: Any?): Any? -> JobSupport.tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any?->JobSupport.tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any?

在tryMakeCompletingSlowPath方法中

var notifyRootCause: Throwable? = null
synchronized(finishing) {
    //。。。其餘代碼。。。
    notifyRootCause = finishing.rootCause.takeIf { !wasCancelling }
}
// process cancelling notification here -- it cancels all the children _before_ we start to to wait them (sic!!!)
// 該情景下,notifyRootCause 的值爲 exception
notifyRootCause?.let { notifyCancelling(list, it) }

// otherwise -- we have not children left (all were already cancelled?)
return finalizeFinishingState(finishing, proposedUpdate)
//。。。其餘代碼。。。
複製代碼

若是發生異常即notifyRootCause不爲空的時候,調用notifyCancelling方法,主要是取消子協程

private fun notifyCancelling(list: NodeList, cause: Throwable) {
    // first cancel our own children
    onCancelling(cause)
    notifyHandlers>(list, cause)
    // then cancel parent
    cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
}
複製代碼

另一個方法finalizeFinishingState,主要是異常傳遞和處理的邏輯,關鍵代碼以下

private fun finalizeFinishingState(state: Finishing, proposedUpdate: Any?): Any? {
    。。。其餘代碼。。。
    // Now handle the final exception
    if (finalException != null) {
        //異常的傳遞和處理邏輯,若是cancelParent(finalException)不處理異常的話,就由當前
        //協程處理handleJobException(finalException)(具體實如今StandaloneCoroutine類處理異常,下文會提到)
        val handled = cancelParent(finalException) || handleJobException(finalException)
        if (handled) (finalState as CompletedExceptionally).makeHandled()
    }
    。。其餘代碼。。。
    return finalState
}

複製代碼
/**
 * The method that is invoked when the job is cancelled to possibly propagate cancellation to the parent.
 * Returns `true` if the parent is responsible for handling the exception, `false` otherwise.
 *
 * Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
 * may leak to the [CoroutineExceptionHandler].
 * 返回指是true的話,異常由父協程處理,false的話異常由所在的協程來處理
 */
private fun cancelParent(cause: Throwable): Boolean {
    // Is scoped coroutine -- don't propagate, will be rethrown
    /**
    * Returns `true` for scoped coroutines.
    * Scoped coroutine is a coroutine that is executed sequentially within the enclosing scope without any concurrency.
    * Scoped coroutines always handle any exception happened within -- they just rethrow it to the enclosing scope.
    * Examples of scoped coroutines are `coroutineScope`, `withTimeout` and `runBlocking`.
   */
   //若是isScopedCoroutine true的話,即coroutineScope是主從做用域的話,異常是會傳到父協程
    if (isScopedCoroutine) return true
    
    //cause是CancellationException的話是正常的協程結束行爲,不會取消父協程
    /* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
     * This allow parent to cancel its children (normally) without being cancelled itself, unless
     * child crashes and produce some other exception during its completion.
     */
    val isCancellation = cause is CancellationException
    val parent = parentHandle
    // No parent -- ignore CE, report other exceptions.
    if (parent === null || parent === NonDisposableHandle) {
        return isCancellation
    }
    // Notify parent but don't forget to check cancellation
    //childCancelled(cause)爲false的話,異常不會傳遞到父協程
    //使用SupervisorJob和supervisorScope時,子協程出現未捕獲異常時也不會影響父協程,
    //它們的原理是重寫 childCancelled() 爲override fun childCancelled(cause: Throwable): Boolean = false
    return parent.childCancelled(cause) || isCancellation
}
複製代碼

由以上代碼可知

一、出現未捕獲異常時,首先會取消全部子協程

二、異常屬於 CancellationException 時,不會取消父協程

三、使用SupervisorJob和supervisorScope時,即主從做用域,發生異常不會取消父協程,異常由所在的協程處理

3.三、CoroutineExceptionHandler的是如何生效的

在AbstractCoroutine中,處理異常的邏輯是在JobSupport接口中,默認是空的實現。 protected open fun handleJobException(exception: Throwable): Boolean = false 具體的實現邏輯是在StandaloneCoroutine中(Builders.common.kt文件)

private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine(parentContext, active) {
    override fun handleJobException(exception: Throwable): Boolean {
        //處理異常的邏輯
        handleCoroutineException(context, exception)
        return true
    }
}
複製代碼

具體實現以下

//CoroutineExceptionHandlerImpl.kt
private val handlers: List = ServiceLoader.load(
        CoroutineExceptionHandler::class.java,
        CoroutineExceptionHandler::class.java.classLoader
).iterator().asSequence().toList()

internal actual fun handleCoroutineExceptionImpl(context: CoroutineContext, exception: Throwable) {
    // use additional extension handlers
    for (handler in handlers) {
        try {
            handler.handleException(context, exception)
        } catch (t: Throwable) {
            // Use thread's handler if custom handler failed to handle exception
            val currentThread = Thread.currentThread()
            currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, handlerException(exception, t))
        }
    }
    // 調用當前線程的 uncaughtExceptionHandler 處理異常
    // use thread's handler
    val currentThread = Thread.currentThread()
    currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
}
複製代碼

以上的處理邏輯能夠簡單的概括爲如下僞代碼

class StandardCoroutine(context: CoroutineContext) : AbstractCoroutine(context) {
    override fun handleJobException(e: Throwable): Boolean {
        context[CoroutineExceptionHandler]?.handleException(context, e) ?:
                Thread.currentThread().let { it.uncaughtExceptionHandler.uncaughtException(it, e) }
        return true
    }
}
複製代碼

因此默認狀況下,launch式協程對未捕獲的異常只是打印異常堆棧信息,若是使用了 CoroutineExceptionHandler 的話,只會使用自定義的 CoroutineExceptionHandler 處理異常。

小結

一、協程默認的做用域是協同做用域,異常會傳播到父協程處理,即coroutineScope或者CoroutineScope(Job())這種形式。

二、協程做用域若是是主從做用域,異常不會傳播到父協程處理,即supervisorScope 或 CoroutineScope(SupervisorJob()) 這種形式,其關鍵是重寫 childCancelled()=false。

三、協程處理異常的時候,若是自定義CoroutineExceptionHandler的話,則由其處理,不然交給系統處理。

最後,本文異常處理分析是從協程做用域爲切入點進行的,看代碼過程當中也會學到一些kotlin巧妙的語法使用;另外只是大概的去分析了一下異常的處理主線邏輯,有些細節的還須要去繼續學習,下次會進行更加詳細的分析,但願本文對你有幫助,也歡迎一塊兒交流學習。

歡迎交流

過程當中有問題或者須要交流的同窗,能夠掃描二維碼加好友,而後進羣進行問題和技術的交流等;

企業微信截圖_5d79a123-2e31-42cc-b03f-9312b8b99df3.png

相關文章
相關標籤/搜索