Kotlin - 協程基礎及原理

Kotlin Coroutines(協程) 徹底解析java

最全面的Kotlin協程: Coroutine/Channel/Flow 以及實際應用android

硬核萬字解讀:Kotlin 協程原理解析設計模式

Kotlin協程-Scheduler的優秀設計markdown

初識協程

什麼是協程

Kotlin 1.3 添加了協程 Coroutine 的概念,文檔中介紹協程是一種併發設計模式,能夠在 Android 平臺上使用它來簡化異步執行的代碼。數據結構

協程具備以下特色:併發

  • 異步代碼同步化:使用編寫同步代碼的方式編寫異步代碼。框架

  • 輕量:您能夠在單個線程上運行多個協程,由於協程支持掛起,不會使正在運行協程的線程阻塞。掛起比阻塞節省內存,且支持多個並行操做。異步

  • 內存泄漏更少:使用結構化併發機制在一個做用域內執行多項操做。async

  • 內置取消支持:取消操做會自動在運行中的整個協程層次結構內傳播。ide

  • Jetpack 集成:許多 Jetpack 庫都包含提供全面協程支持的擴展。某些庫還提供本身的協程做用域,可供您用於結構化併發。

協程的掛起和恢復

Kotlin 協程的掛起和恢復本質上是掛起函數的掛起和恢復。

suspend fun suspendFun() {}

複製代碼

掛起函數suspend 關鍵字修飾的普通函數。若是在協程體內調用了掛起函數,那麼調用處就被稱爲 掛起點。掛起點若是出現 異步調用,那麼當前協程就會被掛起,直到對應的 Continuation.resume() 函數被調用纔會恢復執行。

掛起函數和普通函數的區別在於:

  • 掛起函數只能在協程體內或其餘掛起函數內調用;

  • 掛起函數能夠調用任何函數,普通函數只能調用普通函數。

suspend 除用於修飾函數外還可用於修飾 lambda 表達式,在源碼分析的章節會詳細分析它們的區別。

基本用法

Gradle 引入

dependencies {

// Kotlin Coroutines

implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2'

// 使用 `Dispatchers.Main` 須要添加以下依賴

implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.2'

}

複製代碼

啓動協程

kotlin 協程框架爲咱們提供了兩種便捷的方式啓動協程:

  • GlobalScop.launch

  • GlobalScope.async

分別來使用兩種方式輸出 Hello World!

fun main() {

GlobalScope.launch { // 使用 GlobalScope.launch 啓動協程

delay(1000L) // 非阻塞的等待 1 秒鐘(默認時間單位是毫秒)

println("World!") // 在延遲後打印輸出

}

print("Hello ") // 協程已在等待時主線程還在繼續

Thread.sleep(2000L) // 阻塞主線程 2 秒鐘來保證 JVM 存活

}

fun main() {

GlobalScope.async { // 使用 GlobalScope.async 啓動協程

delay(1000L)

println("World!")

}

print("Hello ")

Thread.sleep(2000L)

}

複製代碼

從上面的例子裏看這兩種方式好像並無什麼區別,其實區別在他們的返回值上

  • GlobalScop.launch:返回值 Job

  • GlobalScope.async:返回值 Deferred<T>

Deferred<T>Job 的子類,而且能夠經過調用 await 函數獲取協程的返回值。上面 GlobalScope.async 的例子改造一下:

GlobalScope.launch {

val result = GlobalScope.async { // 使用 GlobalScope.async 啓動協程

delay(1000L)

"World!"

}

println("Hello ${result.await()}")

}

Thread.sleep(2000L)

//輸出:Hello World!

複製代碼

上面的示例把 async 嵌套在了 launch 函數體內部,這是由於 await 是一個掛起函數,而掛起函數不一樣於普通函數的就是它必須在協程體或其餘掛起函數內部調用。

在協程體內 ({} 內) 能夠隱藏 GlobalScope 直接使用 async、launch 啓動協程,因此上面的示例能夠修改以下:

GlobalScope.launch {

val result = async { // 使用 GlobalScope.async 啓動協程

...

}

...

// launch {}

}

...

複製代碼

協程操做

經過了解協程的兩種啓動方式,咱們知道 GlobalScop.launch、GlobalScop.async 的返回值都是 Job 對象或其子類對象。那 Job 是什麼呢? 又有哪些功能。

Job 是一個可取消的後臺任務,用於操做協程的執行並記錄執行過程當中協程的狀態,因此通常來講 Job 實例也表明了協程。

Job 具備以下幾種狀態:

| State | [isActive] | [isCompleted] | [isCancelled] |

| --------------------- | -------- | ---------- | ------------ |

| New (可選初始狀態) | false | false | false |

| Active (默認初始狀態) | true | false | false |

| Completing (瞬態) | true | false | false |

| Cancelling (瞬態) | false | false | true |

| Cancelled (最終狀態) | false | true | true |

| Completed (最終狀態) | false | true | false |

一般狀況下,建立 Job 時會自動啓動,狀態默認爲 _Active_,可是若是建立時添加參數 CoroutineStart.Lazy 則狀態爲 _NEW_,能夠經過 start()join() 等函數激活。

Job 狀態流程圖:

wait children

+-----+ start +--------+ complete +-------------+ finish +-----------+

| New | -----> | Active | ---------> | Completing | -------> | Completed |

+-----+ +--------+ +-------------+ +-----------+

| cancel / fail |

| +----------------+

| |

V V

+------------+ finish +-----------+

| Cancelling | --------------------------------> | Cancelled |

+------------+ +-----------+

複製代碼

Job 的可用方法:

  • cancel(CancellationException):取消 Job 對應的協程併發送協程取消錯誤 (CancellationException)。

  • invokeOnCompletion():註冊當此 Job 狀態更新爲 Completed 時同步調用的處理程序。

  • join():掛起 Job 對應的協程,當協程完成時,外層協程恢復。

  • start():若是建立 Job 對象時使用的啓動模式爲 CoroutineStart.Lazy,經過它能夠啓動協程。

  • cancelAndJoin():取消 Job 並掛起當前協程,直到 Job 被取消。

當要取消正在運行的協程:

val job = launch {

repeat(1000) { i ->

println("job: I'm sleeping $i ...")

delay(500L)

}

}

delay(1300L) // 延遲一段時間

println("main: I'm tired of waiting!")

job.cancel() // 取消該做業

job.join() // 等待做業執行結束

println("main: Now I can quit.")

// 輸出

job: I'm sleeping 0 ...

job: I'm sleeping 1 ...

job: I'm sleeping 2 ...

main: I'm tired of waiting!

main: Now I can quit.

複製代碼

上面示例中可使用 cancelAndJoin 函數它合併了對 cancel 以及 join 函數的調用。

注意:若是在協程執行過程當中沒有掛起點,那麼協程是不可被取消的。

val startTime = System.currentTimeMillis()

val job = launch(Dispatchers.Default) {

var nextPrintTime = startTime

var i = 0

while (i < 5) { // 一個執行計算的循環,只是爲了佔用 CPU

// 每秒打印消息兩次

if (System.currentTimeMillis() >= nextPrintTime) {

println("job: I'm sleeping ${i++} ...")

nextPrintTime += 500L

}

}

}

delay(1300L) // 等待一段時間,並保證協程開始執行

println("main: I'm tired of waiting!")

job.cancelAndJoin() // 取消一個做業而且等待它結束

println("main: Now I can quit.")

// 輸出

job: I'm sleeping 0 ...

job: I'm sleeping 1 ...

job: I'm sleeping 2 ...

main: I'm tired of waiting!

job: I'm sleeping 3 ...

job: I'm sleeping 4 ...

main: Now I can quit.

複製代碼

簡單來講,若是協程體內沒有掛起點的話,已開始執行的協程是沒法取消的。

下面來介紹,協程啓動時傳參的含義及做用:

public fun CoroutineScope.launch(

context: CoroutineContext = EmptyCoroutineContext,

start: CoroutineStart = CoroutineStart.DEFAULT,

block: suspend CoroutineScope.() -> Unit

): Job {

...

}

複製代碼

協程的啓動模式

CoroutineStart:協程啓動模式。協程內提供了四種啓動模式:

  • DEFAULT:協程建立後,當即開始調度,在調度前若是協程被取消,其將直接進入取消相應的狀態。

  • ATOMIC:協程建立後,當即開始調度,協程執行到第一個掛起點以前不響應取消。

  • LAZY:只有協程被須要時,包括主動調用協程的 start()、join()、await() 等函數時纔會開始調度,若是調度前就被取消,那麼該協程將直接進入異常結束狀態。

  • UNDISPATCHED:協程建立後當即執行,直到遇到第一個真正掛起的點。

當即調度和當即執行的區別:當即調度表示協程的調度器會當即接收到調度指令,但具體執行的時機以及在那個線程上執行,還須要根據調度器的具體狀況而定,也就是說當即調度到當即執行之間一般會有一段時間。所以,咱們得出如下結論:

  • DEFAULT 雖然是當即調度,但也有可能在執行前被取消。

  • UNDISPATCHED 是當即執行,所以協程必定會執行。

  • ATOMIC 雖然是當即調度,但其將調度和執行兩個步驟合二爲一了,就像它的名字同樣,其保證調度和執行是原子操做,所以協程也必定會執行。

  • UNDISPATCHEDATOMIC 雖然都會保證協程必定執行,但在第一個掛起點以前,前者運行在協程建立時所在的線程,後者則會調度到指定的調度器所在的線程上執行。

協程上下文和調度器

CoroutineContext:協程上下文。用於控制協程的行爲,上文提到的 Job 和準備介紹的調度器都屬於 CoroutineContext

協程默認提供了四種調度器:

  • Dispatchers.Default:默認調度器,若是沒有指定協程調度器和其餘任何攔截器,那默認都使用它來構建協程。適合處理後臺計算,其是一個 CPU 密集型任務調度器。

  • Dispatchers.IOIO 調度器,適合執行 IO 相關操做,其是一個 IO 密集型任務調度器。

  • Dispatchers.MainUI 調度器,會將協程調度到主線程中執行。

  • Dispatchers.Unconfined:非受限制調度器,不要求協程執行在特定線程上。協程的調度器若是是 Unconfined,那麼它在掛起點恢復執行時會在恢復所在的線程上直接執行,固然,若是嵌套建立以它爲調度器的協程,那麼這些協程會在啓動時被調度到協程框架內部的時間循環上,以免出現 StackOverflow

  • Dispatchers.Unconfined:非受限調度器,會在調用它的線程啓動協程,但它僅僅只是運行到第一個掛起點。掛起後,它恢復線程中的協程,而這徹底由被調用的掛起函數來決定。

runBlocking {

launch { // 運行在父協程的上下文中,即 runBlocking 主協程

println("main runBlocking : I'm working in thread ${Thread.currentThread().name}")

}

launch(Dispatchers.Unconfined) { // 不受限的——將工做在主線程中

println("Unconfined : I'm working in thread ${Thread.currentThread().name}")

}

launch(Dispatchers.Default) { // 將會獲取默認調度器

println("Default : I'm working in thread ${Thread.currentThread().name}")

}

}

//輸出結果

Unconfined : I'm working in thread main @coroutine#3

Default : I'm working in thread DefaultDispatcher-worker-1 @coroutine#4

main runBlocking : I'm working in thread main @coroutine#2

複製代碼

withContext

除了能夠在 GlobalScope.launch {}、GlobalScope.async {} 建立協程時設置協程調度器,

async {...}.await() 相比 withContext 的內存開銷更低,所以對於使用 async 以後當即調用 await 的狀況,應當優先使用 withContext

withTimeout

Kotlin 協程提供了 withTimeout 函數設置超時取消。若是運行超時,取消後會拋出 TimeoutCancellationException 異常。拋出異常的狀況下回影響到其餘協程,這時候可使用 withTimeoutOrNull 函數,它會在超時的狀況下返回 null 而不拋出異常。

runBlocking {

val result = withContext(coroutineContext) {

withTimeoutOrNull(500) {

delay(1000)

"hello"

}

}

println(result)

}

// 輸出結果

hello

複製代碼

yield

若是想要解決上面示例中的問題可使用 yield 函數。它的做用在於檢查所在協程的狀態,若是已經取消,則拋出取消異常予以響應。此外它還會嘗試出讓線程的執行權,給其餘協程提供執行機會。

在上面示例中添加 yield 函數:

if (System.currentTimeMillis() >= nextPrintTime) {

yield()

println("job: I'm sleeping ${i++} ...")

nextPrintTime += 500L

}

// 輸出結果

job: I'm sleeping 0 ...

job: I'm sleeping 1 ...

job: I'm sleeping 2 ...

main: I'm tired of waiting!

main: Now I can quit.

複製代碼

協程的做用域

協程做用域:協程做用域主要用於明確協程之間的父子關係,以及對於取消或者異常處理等方面的傳播行爲。

協程做用域包括如下三種:

  • 頂級做用域:沒有父協程的協程所在的做用域爲頂級做用域。

  • 協同做用域:協程中啓動新的協程,新協程爲所在協程的子協程,這種狀況下子協程所在的做用域默認爲協同做用域。此時子協程拋出的未捕獲異常將傳遞給父協程處理,父協程同時也會被取消。

  • 主從做用域:與協程做用域在協程的父子關係上一致,區別在於處於該做用域下的協程出現未捕獲的異常時不會將異常向上傳遞給父協程。

父子協程間的關係:

  • 父協程被取消,則全部子協程均被取消。

  • 父協程須要等待子協程執行完畢以後纔會最終進入完成狀態,無論父協程自身的協程體是否已經執行完畢。

  • 子協程會繼承父協程的協程上下文元素,若是自身有相同 key 的成員,則覆蓋對應的 key,覆蓋的效果僅限自身範圍內有效。

聲明頂級做用域:GlobalScope.launch {}runBlocking {}

聲明協同做用域:coroutineScope {}

聲明主從做用域:supervisorScope {}

coroutineScope {}supervisorScope {} 是掛起函數因此它們只能在協程做用域中或掛起函數中調用。

coroutineScope {}supervisorScope {} 的區別在於 SupervisorCoroutine 重寫了 childCancelled() 函數使異常不會向父協程傳遞。

協程併發

經過上文的介紹能夠了解到協程其實就是執行在線程上的代碼片斷,因此線程的併發處理均可以用在協程上,好比 synchorinzedCAS 等。而協程自己也提供了兩種方式處理併發:

  • Mutex:互斥鎖;

  • Semaphore:信號量。

Mutex

Mutex 相似於 synchorinzed,協程競爭時將協程包裝爲 LockWaiter 使用雙向鏈表存儲。Mutex 還提供了 withLock 擴展函數,以簡化使用:

runBlocking<Unit> {

val mutex = Mutex()

var counter = 0

repeat(10000) {

GlobalScope.launch {

mutex.withLock {

counter ++

}

}

}

Thread.sleep(500) //暫停一下子等待全部協程執行結束

println("The final count is $counter")

}

複製代碼
Semaphore

Semaphore 用以限制訪問特定資源的協程數量。

runBlocking<Unit> {

val semaphore = Semaphore(1)

var counter = 0

repeat(10000) {

GlobalScope.launch {

semaphore.withPermit {

counter ++

}

}

}

Thread.sleep(500) //暫停一下子等待全部協程執行結束

println("The final count is $counter")

}

複製代碼

注意:只有在 permits = 1 時才和 Mutex 功能相同。

源碼分析

suspend

咱們來看 suspend 修飾函數和修飾 lambda 的區別。

掛起函數:

suspend fun suspendFun() {

}

複製代碼

編譯成 java 代碼以下:

@Nullable

public final Object suspendFun(@NotNull Continuation $completion) {

return Unit.INSTANCE;

}

複製代碼

能夠看到掛起函數其實隱藏着一個 Continuation 協程實例參數,而這個參數其實就來源於協程體或者其餘掛起函數,所以掛起函數只能在協程體內或其餘函數內調用了。

suspend 修飾 lambda 表達式:

suspend {}

// 反編譯結果以下

Function1 var2 = (Function1)(new Function1((Continuation)null) {

int label;

@Nullable

public final Object invokeSuspend(@NotNull Object $result) {

switch(this.label) {

case 0:

return Unit.INSTANCE;

default:

}

}

@NotNull

public final Continuation create(@NotNull Continuation completion) {

Function1 var2 = new <anonymous constructor>(completion);

return var2;

}

public final Object invoke(Object var1) {

return ((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE);

}

});

複製代碼

suspend lambda 實際會被編譯成 SuspendLambda 的子類。suspendLambda 的繼承關係以下圖:

image

經過反編譯的代碼能夠發現咱們在協程體內編寫的代碼最終是在 invokeSuspend 函數內執行的。而在 BaseContinuationImpl 內實現了 Continuation 協程接口的 resumeWidth 函數,並在其內調用了 invokeSuspend 函數。

suspend 關鍵字的介紹先到這裏,接下來咱們看協程是如何建立並運行的。

協程是如何被建立的

文件地址 kotlin.coroutines.Continuation.kt

Continuation.kt 文件基本屬於協程的基礎核心了,搞懂了它也就至關於搞懂了協程的基礎原理。

  • 協程接口的定義;

  • 喚醒或啓動協程的函數;

  • 四種建立協程的函數;

  • 幫助獲取協程內的協程實例對象的函數。

首先是協程的接口聲明,很是簡單:

/**

* 協程接口,T 表示在最後一個掛起點恢復時的返回值類型

*/

public interface Continuation<in T> {

/**

* 協程上下文

*/

public val context: CoroutineContext

/**

* 這個函數的功能有不少,它能夠啓動協程,也能夠恢復掛點,還能夠做爲最後一次掛起點恢復時輸出協程的結果

*/

public fun resumeWith(result: Result<T>)

}

複製代碼

協程接口聲明以後 Continuation.kt 文件提供了兩個調用 resumeWith 函數的函數:

public inline fun <T> Continuation<T>.resume(value: T): Unit =

resumeWith(Result.success(value))

public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =

resumeWith(Result.failure(exception))

複製代碼

這兩個函數除了傳參一成功一失敗,它們的功能是如出一轍的,都是直接調用了 resumeWith 函數。至關因而 resumeWith 函數的封裝。

再而後就是四種建立協程的方式了:

public fun <T> (suspend () -> T).createCoroutine(

completion: Continuation<T>

): Continuation<Unit> =

SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)

public fun <R, T> (suspend R.() -> T).createCoroutine(

receiver: R,

completion: Continuation<T>

): Continuation<Unit> =

SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED)

public fun <T> (suspend () -> T).startCoroutine(

completion: Continuation<T>

) {

createCoroutineUnintercepted(completion).intercepted().resume(Unit)

}

public fun <R, T> (suspend R.() -> T).startCoroutine(

receiver: R,

completion: Continuation<T>

) {

createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit)

}

複製代碼

這四種方式能夠說是類似度超高,createCoroutinestartCoroutine 最大的區別在於,經過 createCoroutine 建立的協程須要掉用 resume 函數啓動,而 startCoroutine 函數內部已經默認調用了 resume 函數。那咱們先用第一種方式建立一個協程:

// 建立協程

val continuation = suspend {

println("In Coroutine")

}.createCoroutine(object : Continuation<Unit> {

override fun resumeWith(result: Result<Unit>) {

println(result)

}

override val context = EmptyCoroutineContext

})

// 啓動協程

continuation.resume(Unit)

複製代碼

調用 createCoroutine 函數建立協程時傳入了 Continuation 協程的匿名類對象,誒?好像有點不對,爲何建立協程的時候要傳一個協程實例進去,直接用不就成了。想知道爲何的話,那就須要看看 createCoroutine 到底作了什麼操做了。

SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)

複製代碼

首先調用的是 createCoroutineUnintercepted 函數,它的源碼能夠在 kotlin.coroutines.intrinsics.IntrinsicsJvm.kt 內找到:

public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(

completion: Continuation<T>

): Continuation<Unit> {

val probeCompletion = probeCoroutineCreated(completion)

return if (this is BaseContinuationImpl)

create(probeCompletion)

else

createCoroutineFromSuspendFunction(probeCompletion) {

(this as Function1<Continuation<T>, Any?>).invoke(it)

}

}

複製代碼

probeCoroutineCreated 函數內直接將參數返回了,而且經過斷點的方式,它的返回值和 completion 傳參是同樣的,因此這裏先忽略它。

經過斷點會發現 (this is BaseContinuationImpl) 判斷的返回值是 true 這也就間接證實了上文中 suspend lambdaBaseContinuationImpl 的繼承關係。最後返回的是 create(Continuation) 函數的返回值,這裏能夠發現做爲參數傳入的 Continuation 變量被 suspend lambda 包裹了一層,而後返回,至關於 suspend lambda 成爲了 Continuation 的代理。

到這裏 createCoroutineUnintercepted(completion) 的含義就搞明白了:

object : Continuation<Unit> {} 建立的協程實例傳入 suspend lambda,由其代理協程執行操做。

緊接着又調用了 intercepted 函數,intercepted 函數聲明也在 IntrinsicsJvm.kt 文件內:

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> = (this as? ContinuationImpl)?.intercepted() ?: this

複製代碼

接着看 ContinuationImplintercepted 函數:

public fun intercepted(): Continuation<Any?> =

intercepted

?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)

.also { intercepted = it }

複製代碼

其中 context[ContinuationInterceptor]?.interceptContinuation(this) 這句代碼涉及到協程攔截器的概念,下文會詳細分析。這裏能夠先簡單介紹一下,協程攔截器和協程其實也是代理的關係。因此 intercepted() 能夠理解爲若是協程上下文中添加了協程攔截器,那麼就返回協程攔截器,否則就返回 suspend lambda 實例自己,而它們都實現了 Continuation 接口。

先作一個小結,經過上文的介紹基本就清楚了,createCoroutine、startCoroutine 函數其實不是用來建立協程的,協程實例就是它們的傳參,它們是爲協程添加代理的。

createCoroutineUnintercepted(completion).intercepted()

複製代碼

經過上面的代碼,爲協程添加了代理,分別是 suspend lambda 和協程攔截器。這時候經過協程實例調用 resumeWith 函數時會先執行兩層代理內實現的 resumeWith 函數邏輯,最終纔會執行到協程的 resumeWith 函數輸出最終結果。

createCoroutine 函數內,在添加兩層代理以後又添加了一層代理,SafeContinuationSafeContinuation 內部使用協程的三種狀態,並配合 CAS 操做,保證當前返回的 SafeContinuation 實例對象僅能調用一次 resumeWith 函數,屢次調用會報錯。

  • UNDECIDED:初始狀態

  • COROUTINE_SUSPENDED:掛起狀態

  • RESUMED:恢復狀態

協程是如何被掛起又是如何被恢復的

那爲何協程要這麼作,很麻煩不是?要弄清楚這個問題先來看 BaseContinuationImplresumeWith 函數實現吧。

public final override fun resumeWith(result: Result<Any?>) {

var current = this

var param = result

while (true) {

probeCoroutineResumed(current)

with(current) {

val completion = completion!!

val outcome: Result<Any?> =

try {

val outcome = invokeSuspend(param)

if (outcome === COROUTINE_SUSPENDED) return

Result.success(outcome)

} catch (exception: Throwable) {

Result.failure(exception)

}

releaseIntercepted() // this state machine instance is terminating

if (completion is BaseContinuationImpl) {

current = completion

param = outcome

} else {

// top-level completion reached -- invoke and return

completion.resumeWith(outcome)

return

}

}

}

}

複製代碼

當調用 resume(Unit) 啓動協程時,因爲代理的存在會調用到 BaseContinuationImplresumeWith() 函數,函數內會執行 invokeSuspend() 函數,也就說咱們所說的協程體。

查看以下代碼的 invokeSuspend 函數:

suspend {5}

// 反編譯後的 invokeSuspend 函數

public final Object invokeSuspend(@NotNull Object $result) {

Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();

switch(this.label) {

case 0:

ResultKt.throwOnFailure($result);

return Boxing.boxInt(5);

default:

throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");

}

}

複製代碼

能夠看到這裏直接返回了最終的結果 5,接着在 ContinuationImpl.resumeWith 函數內最終調用

completion.resumeWith(outcome)

複製代碼

輸出協程的最終結果。

這是協程執行同步代碼的過程,能夠看到在整個過程當中,ContinuationImpl 好像並無起到什麼做用,那接着來看在協程體內執行異步代碼:

suspend {

suspendFunc()

}

suspend fun suspendFunc() = suspendCoroutine<Int> { continuation ->

thread {

Thread.sleep(1000)

continuation.resume(5)

}

}

// 反編譯後

public final Object invokeSuspend(@NotNull Object $result) {

Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();

Object var10000;

switch(this.label) {

case 0:

ResultKt.throwOnFailure($result);

this.label = 1;

var10000 = DeepKotlin3Kt.suspendFunc(this);

if (var10000 == var2) {

return var2;

}

break;

case 1:

ResultKt.throwOnFailure($result);

var10000 = $result;

break;

default:

throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");

}

return var10000;

}

public static final Object suspendFunc(@NotNull Continuation $completion) {

boolean var1 = false;

boolean var2 = false;

boolean var3 = false;

SafeContinuation var4 = new SafeContinuation(IntrinsicsKt.intercepted($completion));

Continuation continuation = (Continuation)var4;

int var6 = false;

ThreadsKt.thread$default(false, false, (ClassLoader)null, (String)null, 0, (Function0)(new DeepKotlin3Kt$suspendFunc02$2$1(continuation)), 31, (Object)null);

Object var10000 = var4.getOrThrow();

if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {

DebugProbesKt.probeCoroutineSuspended($completion);

}

return var10000;

}

複製代碼

resume 函數啓動協程,invokeSuspend 函數第一次執行時 this.label == 0 執行 case 0 代碼,this.label 變量賦值爲 1, 而後判斷若是 if (var10000 == var2)true 那麼 invokeSuspend 函數返回 var2,也就是 COROUTINE_SUSPENDED 標識,在 resumeWith 函數內,判斷若是 invokeSuspend 函數的返回值爲 COROUTINE_SUSPENDEDreture。這也就是協程的掛起過程。

當線程執行結束,調用 resume 函數恢復協程時再次執行到 invokeSuspend 函數,這時 this.label == 1,執行 case 1 代碼,直接返回結果 5。那在 resumeWith 函數內,這時就不會執行 return 了,最終會調用協程的 resumeWith 函數輸出最終的結果,這也就是協程的恢復過程。

經過了解協程運行流程能夠發現 ContinuationImpl 實際上是協程掛起和恢復邏輯的真正執行者。也正是由於協程掛起和恢復邏輯的存在,因此咱們能夠像編寫同步代碼同樣調用異步代碼:

suspend {

println("Coroutine start")

println("Coroutine: ${System.currentTimeMillis()}")

val resultFun = suspendThreadFun()

println("Coroutine: suspendThreadFun-$resultFun-${System.currentTimeMillis()}")

val result = suspendNoThreadFun()

println("Coroutine: suspendNoThreadFun-$result-${System.currentTimeMillis()}")

}.startCoroutine(object : Continuation<Unit> {

override val context = EmptyCoroutineContext

override fun resumeWith(result: Result<Unit>) {

println("Coroutine End: $result")

}

})

suspend fun suspendThreadFun() = suspendCoroutine<Int> { continuation ->

thread {

Thread.sleep(1000)

continuation.resumeWith(Result.success(5))

}

}

suspend fun suspendNoThreadFun() = suspendCoroutine<Int> { continuation ->

continuation.resume(5)

}

//輸出:

Coroutine start

Coroutine: 1627014868152

Coroutine: suspendThreadFun-5-1627014869182

Coroutine: suspendNoThreadFun-5-1627014869186

Coroutine End: Success(kotlin.Unit)

複製代碼

建立協程做用域

在經過 createCoroutine 建立協程時,你會發現還可爲它傳遞 receiver 參數,這個參數的做用是用於擴展協程體,通常稱其爲 協程做用域

public fun <R, T> (suspend R.() -> T).createCoroutine(

receiver: R,

completion: Continuation<T>

): Continuation<Unit> =

SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED)

複製代碼

能夠看到 suspend lambda 表達式也出現了變化。咱們知道 () -> TFunction0lambda 表達式,R.() -> T 至關於 R 類的 () -> T 擴展。若是瞭解擴展函數的話就知道擴展函數會將所擴展的類做爲其參數,那麼 R.() -> T 也就是 Function1lambda 表達式了。

固然因爲 suspend 關鍵字的做用,又增長了 Continuation 參數,因此最終看到的就是 Function1Function2

由於擴展函數的做用,因此能夠在協程體內經過 this (可隱藏)調用 receiver 的函數或者屬性。示例以下:

launchCoroutine(ProducerScope<Int>()) {

produce(1000)

}

fun <R, T> launchCoroutine(receiver: R, block: suspend R.() -> T) {

block.startCoroutine(receiver, object : Continuation<T> {

override val context = EmptyCoroutineContext

override fun resumeWith(result: Result<T>) {

println("Coroutine End: $result")

}

})

}

class ProducerScope<T> {

fun produce(value: T) {

println(value)

}

}

複製代碼

GlobalScope.launch 源碼分析

瞭解上文建立協程的邏輯以後再來分析 GlobalScope.launch 就很是簡單了。GlobalScope.launch 最終會執行到 CoroutineStart.invoke 函數:

AbstractCoroutine.kt

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {

initParentJob()

start(block, receiver, this)

}

複製代碼

CoroutineStart.kt

public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =

when (this) {

DEFAULT -> block.startCoroutineCancellable(receiver, completion)

ATOMIC -> block.startCoroutine(receiver, completion)

UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)

LAZY -> Unit // will start lazily

}

複製代碼

代碼基本跟上文分析的一致。

自定義協程上下文

協程上下文在協程中的做用很是大,有它在至關於協程有了裝備卡槽同樣。你能夠將你想添加的上下文對象合併到 CoroutineContext 參數上,而後在其餘地方使用。

CoroutineContext 的數據結構有以下特色:

  • 能夠經過 [] 以相似 List 的方式訪問任何一個協程上下文對象,[] 內是目標協程上下文。

  • 協程上下文能夠經過 + 的方式依次累加,固然 += 也是可用的。

咱們來自定義一個協程上下文給協程添加一個名字:

public data class CoroutineName(val name: String) : AbstractCoroutineContextElement(CoroutineName) {

public companion object Key : CoroutineContext.Key<CoroutineName>

override fun toString(): String = "CoroutineName($name)"

}

複製代碼

應用到示例中:

var coroutineContext: CoroutineContext = EmptyCoroutineContext

coroutineContext += CoroutineName("c0-01")

suspend {

println("Run Coroutine")

}.startCoroutine(object : Continuation<Unit> {

override fun resumeWith(result: Result<Unit>) {

println("${context[CoroutineName]?.name}")

}

override val context = coroutineContext

})

//輸出:

Run Coroutine

c0-01

複製代碼

其實協程已經爲咱們提供了 CoroutineName 實現。

自定義協程攔截器

經過實現攔截器接口 ContinuationInterceptor 來定義攔截器,由於攔截器也是協程上下文的一類實現,因此使用攔截器時將其添加到對應的協程上下文中便可。

聲明一個日誌攔截器:

class LogInterceptor : ContinuationInterceptor {

override val key = ContinuationInterceptor

override fun <T> interceptContinuation(continuation: Continuation<T>) = LogContinuation(continuation)

}

class LogContinuation<T>(private val continuation: Continuation<T>) : Continuation<T> by continuation {

override fun resumeWith(result: Result<T>) {

println("before resumeWith: $result")

continuation.resumeWith(result)

println("after resumeWith")

}

}

複製代碼

攔截器的關鍵攔截函數是 interceptContinuation,能夠根據須要返回一個新的 Continuation 實例。

在協程生命週期內每次恢復調用都會觸發攔截器。恢復調用有以下兩種狀況:

  • 協程啓動時調用一次,經過恢復調用來開始執行協程體從開始到下一次掛起之間的邏輯。

  • 掛起點處若是異步掛起,則在恢復時再調用一次。

由此可知,恢復調用的次數爲 n+1 次,其中 n 是協程體內真正掛起執行異步邏輯的掛起點的個數。

改寫上面的例子:

// 異步掛起函數

suspend fun suspendFunc02() = suspendCoroutine<Int> { continuation ->

thread {

continuation.resumeWith(Result.success(5))

}

}

// 開啓協程 - 未添加日誌攔截器

suspend {

suspendFunc02()

suspendFunc02()

}.startCoroutine(object : Continuation<Int> {

override val context: CoroutineContext = EmptyCoroutineContext

override fun resumeWith(result: Result<Int>) {

...

result.onSuccess {

println("Coroutine End: ${context[CoroutineName]?.name}, $result")

}

}

})

// 輸出以下

Coroutine End: Success(5)

// 開啓協程 - 添加日誌攔截器

suspend {

suspendFunc02()

suspendFunc02()

}.startCoroutine(object : Continuation<Int> {

override val context: CoroutineContext = LogInterceptor()

override fun resumeWith(result: Result<Int>) {

...

result.onSuccess {

println("Coroutine End: ${context[CoroutineName]?.name}, $result")

}

}

})

// 輸出以下:

before resumeWith: Success(kotlin.Unit)

after resumeWith

before resumeWith: Success(5)

after resumeWith

before resumeWith: Success(5)

after resumeWith

複製代碼
相關文章
相關標籤/搜索