Kotlin coroutine詳解

前言

本文主要介紹一下Kotlin是如何實現Coroutine的,對於具體的用法推薦參考一下官方文檔,講得仍是比較詳細的html

什麼是 Coroutine

概念上來講相似於線程,擁有本身的棧、本地變量、指令指針等,須要一段代碼塊來運行而且擁有相似的生命週期。可是和線程不一樣,coroutine並不和某一個特定的線程綁定,它能夠再線程A中執行,並在某一個時刻暫停(suspend),等下次調度到恢復執行的時候在線程B中執行。不一樣於線程,coroutine是協做式的,即子程序能夠經過在函數中有不一樣的入口點來實現暫停、恢復,從而讓出線程資源。java

實戰演練

首先看一個簡單的小demo,來看看Kotlin的Coroutine是具體適合使用的:git

@Test
    fun async() {
        async {
            delay(1000)
            print("World!")
        }
        print("Hello ")
        Thread.sleep(2000)
    }複製代碼

上面這段代碼會輸出Hello World!, 那麼下面咱們看看具體是如何工做的.github

原理剖析

asyn()這裏是一個函數,下面是它的源碼:golang

public val DefaultDispatcher: CoroutineDispatcher = CommonPool

public fun <T> async( context: CoroutineContext = DefaultDispatcher, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.initParentJob(context[Job])
    start(block, coroutine, coroutine)
    return coroutine
}複製代碼

這個函數有三個參數,其中兩個都有默認值,也就是默認不須要傳遞,context是指coroutine的上下文,默認是DefaultDispatcher,DefaultDispatcher當前的實現是CommonPool,因爲目前仍是experimental,後面說不定會更改爲其餘的實現。算法

object CommonPool : CoroutineDispatcher() {
    private var usePrivatePool = false

    @Volatile
    private var _pool: Executor? = null

    private inline fun <T> Try(block: () -> T) = try { block() } catch (e: Throwable) { null }

    private fun createPool(): ExecutorService {
        val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
            ?: return createPlainPool()
        if (!usePrivatePool) {
            Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
                ?.let { return it }
        }
        Try { fjpClass.getConstructor(Int::class.java).newInstance(defaultParallelism()) as? ExecutorService }
            ?. let { return it }
        return createPlainPool()
    }

    private fun createPlainPool(): ExecutorService {
        val threadId = AtomicInteger()
        return Executors.newFixedThreadPool(defaultParallelism()) {
            Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
        }
    }
}複製代碼

CommonPool默認會使用ForkJoinPool做爲coroutine的調度策略,若是不存在這fallback到線程池的策略,ForkJoinPool實現了work-stealing算法,當前線程的工做完成後從其餘線程的待執行任務中竊取,具體的解釋推薦直接看其註釋,比網上的博客清晰的多。
而第二個參數CoroutineStart 指的是coroutine啓動的選項,總的來講有四種:編程

/* * * [DEFAULT] -- immediately schedules coroutine for execution according to its context; * * [LAZY] -- starts coroutine lazily, only when it is needed; * * [ATOMIC] -- atomically (non-cancellably) schedules coroutine for execution according to its context; * * [UNDISPATCHED] -- immediately executes coroutine until its first suspension point _in the current thread_. */複製代碼

第三個就是實際的coroutine要執行的代碼段了,下面咱們看看具體的執行流程:app

public fun <T> async( context: CoroutineContext = DefaultDispatcher, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T
): Deferred<T> {
    /** * 初始化上下文,例如名字(方便調試) */
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    //這裏是DeferredCoroutine,而且不存在父任務
    coroutine.initParentJob(context[Job])
    //Kotlin的運算符重載,會轉化爲對應參數的invoke方法
   //https://kotlinlang.org/docs/reference/operator-overloading.html
    start(block, coroutine, coroutine)
    return coroutine
}

public fun newCoroutineContext(context: CoroutineContext): CoroutineContext {
    val debug = if (DEBUG) context + CoroutineId(COROUTINE_ID.incrementAndGet()) else context
    return if (context !== DefaultDispatcher && context[ContinuationInterceptor] == null)
        debug + DefaultDispatcher else debug
}複製代碼

下面就會調用CoroutineStart中對應的invoke方法:jvm

public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
        when (this) {
            //相似java的switch語句(TABLESWITCH/lookupswitch)
            //https://kotlinlang.org/docs/reference/control-flow.html
            CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
            CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
            CoroutineStart.LAZY -> Unit // will start lazily
        }複製代碼

接着會去調用startCoroutineCancellable方法:async

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    createCoroutineUnchecked(receiver, completion).resumeCancellable(Unit)

//
public fun <R, T> (suspend R.() -> T).createCoroutineUnchecked(
        receiver: R,
        completion: Continuation<T>
): Continuation<Unit> =
        if (this !is kotlin.coroutines.experimental.jvm.internal.CoroutineImpl)
            buildContinuationByInvokeCall(completion) {
                @Suppress("UNCHECKED_CAST")
                (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, completion)
            }
        else
            //這裏create方法會去建立CoroutineImpl,可是咱們看到CoroutineImpl中這方法會直接拋異常
            //wtf?實際上這裏傳遞進來的this是編輯器根據async中的lambda動態生產的類的實例,所以
            //也就是說實際的調用是哪一個動態類
            (this.create(receiver, completion) as kotlin.coroutines.experimental.jvm.internal.CoroutineImpl).facade複製代碼

上面說到編譯器會生成內部類,那麼咱們看看這裏到底有什麼黑魔法,下面貼一下具體的結構

image
image

反編譯以後,先只看create方法:

static final class CoroutineTest.launch extends CoroutineImpl implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
    private CoroutineScope p$;

    @NotNull
    public final Continuation<Unit> create(@NotNull CoroutineScope $receiver, @NotNull Continuation<? super Unit> $continuation) {
        Intrinsics.checkParameterIsNotNull((Object)$receiver, (String)"$receiver");
        Intrinsics.checkParameterIsNotNull($continuation, (String)"$continuation");
        CoroutineImpl coroutineImpl = new ;
        CoroutineScope coroutineScope = coroutineImpl.p$ = $receiver;
        return coroutineImpl;
    }
}複製代碼

建立完須要的上下文以後,會去調用拿到Continuation後,就去調用resumeCancellable方法:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    createCoroutineUnchecked(receiver, completion).resumeCancellable(Unit)複製代碼
internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {
    is DispatchedContinuation -> resumeCancellable(value)
    else -> resume(value)
}

    @Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
    inline fun resumeCancellable(value: T) {
        val context = continuation.context
        if (dispatcher.isDispatchNeeded(context))
            dispatcher.dispatch(context, DispatchTask(continuation, value, exception = false, cancellable = true))
        else
            resumeUndispatched(value)
    }複製代碼

image
image

能夠看到最終就是丟到CommonPool中(ForkJoinPool),不過在那以前會包裝成一個DispatchTask:

internal class DispatchTask<in T>(
    private val continuation: Continuation<T>,
    private val value: Any?, // T | Throwable
    private val exception: Boolean,
    private val cancellable: Boolean
) : Runnable {
    @Suppress("UNCHECKED_CAST")
    override fun run() {
        val context = continuation.context
        val job = if (cancellable) context[Job] else null
        withCoroutineContext(context) {
            when {
                job != null && !job.isActive -> continuation.resumeWithException(job.getCancellationException())
                exception -> continuation.resumeWithException(value as Throwable)
                else -> continuation.resume(value as T)
            }
        }
    }

    override fun toString(): String =
        "DispatchTask[$value, cancellable=$cancellable, ${continuation.toDebugString()}]"
}複製代碼

在咱們的場景下最終會調用:Continuation#public fun resume(value: T),這裏的實際會調用:

abstract class CoroutineImpl(
        arity: Int,
        @JvmField
        protected var completion: Continuation<Any?>?
) : Lambda(arity), Continuation<Any?> {
    override fun resume(value: Any?) {
        processBareContinuationResume(completion!!) {
            doResume(value, null)
        }
    }
}複製代碼
@kotlin.internal.InlineOnly
internal inline fun processBareContinuationResume(completion: Continuation<*>, block: () -> Any?) {
    try {
        val result = block()
        if (result !== COROUTINE_SUSPENDED) {
            @Suppress("UNCHECKED_CAST")
            (completion as Continuation<Any?>).resume(result)
        }
    } catch (t: Throwable) {
        completion.resumeWithException(t)
    }
}複製代碼

這裏doResume就是上文提到Kotlin編譯器生成的內部類:

@Nullable
    public final Object doResume(@Nullable Object var1_1, @Nullable Throwable var2_2) {
        var5_3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch (var0.label) {
            case 0: {
                v0 = var2_2;
                if (v0 != null) {
                    throw v0;
                }
                var3_4 = this.p$;
                this.label = 1;
                v1 = DelayKt.delay$default((long)1000, (TimeUnit)null, (Continuation)this, (int)2, (Object)null);
                if (v1 == var5_3) {
                    return var5_3;
                }
                ** GOTO lbl18
            }
            case 1: {
                v2 = throwable;
                if (v2 != null) {
                    throw v2;
                }
                v1 = data;
lbl18: // 2 sources:
                var4_5 = "World!";
                System.out.print((Object)var4_5);
                return Unit.INSTANCE;
            }
        }
        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }複製代碼

能夠看到實際上Kotlin使用狀態機實現的Coroutine,根據label的狀態決定要執行的代碼塊,Kotlin會在編譯時根據可suspend的方法插入相應的label,從而實現主動讓出線程資源,而且將本地變量保存到Continuation的實例變量中,等到下次獲得調度的時候,根據label來決定要執行的代碼塊,等到函數實際執行完的時候則直接返回對應的返回值(沒有的話則是默認值).
好比看下面這段代碼:

val a = a()
val y = foo(a).await() // suspension point #1
b()
val z = bar(a, y).await() // suspension point #2
c(z)複製代碼

這段代碼總共有三種狀態:

  1. 初始狀態,在全部的suspension point點以前
  2. 第一個暫停點
  3. 第二個暫停點

每個狀態都是continuation的入口點之一,這段代碼會編譯爲一個實現了狀態機的匿名類,有一個狀態變量用於保存當前狀態機的狀態,以及用於保存當前coroutine的本地變量,編譯後的代碼用Java代碼相似下面:

class <anonymous_for_state_machine> extends CoroutineImpl<...> implements Continuation<Object> {
    // 狀態機當前的狀態
    int label = 0

    // coroutine的本地變量
    A a = null
    Y y = null void resume(Object data) {
        if (label == 0) goto L0 if (label == 1) goto L1 if (label == 2) goto L2 else throw IllegalStateException() L0: a = a()
        label = 1
        data = foo(a).await(this) // 'this' 默認會被傳遞給await方法
        if (data == COROUTINE_SUSPENDED) return //若是須要暫停則返回
      L1:
        //從新獲得調度
        y = (Y) data//保存本地變量
        b()
        label = 2
        data = bar(a, y).await(this) 
        if (data == COROUTINE_SUSPENDED) return 
      L2:
        Z z = (Z) data c(z) label = -1 // No more steps are allowed
        return
    }          
}複製代碼

當coroutine開始執行的時候,默認label爲0,那麼就會跳轉到L0,而後執行一個耗時的業務邏輯,將label設置爲1,調用await,若是coroutine的執行須要暫停的那麼就返回掉。當須要繼續執行的時候就再次調用resume(),此次就會跳轉到L1, 執行完業務邏輯後,將label設置爲2,調用await並根據是否須要暫停來return,下次的繼續調度,此次會從L2開始執行,而後label設置爲-1,意味着不須要執行完了,不須要再調度了。

回到最初的代碼段,咱們首先調用了delay方法,這個方法默認使用ScheduledExecutorService,從而將當前的coroutine上下文包裝到DispatchTask,再對應的延遲時間以後再恢復執行,恢復執行以後,這時候label是1,那麼就會直接進入第二段代碼,輸出World!

@Test
    fun async() {
        async {
            //在另外的線程池中執行,經過保存當前的執行上下文(本地變量、狀態機的狀態位等),並丟到
            //ScheduledExecutorService中延遲執行
            delay(1000)
            print("World!")
        }
        //主線程中直接輸出
        print("Hello ")
        Thread.sleep(2000)
    }複製代碼

165021507294969_ pic_hd
165021507294969_ pic_hd

總結

本文大體講解了一些Kotlin中Coroutine的實現原理,固然對於協程,不少編程語言都有相關的實現,推薦都看一下文檔,實際使用對比看看。

image
image

參考資料

  1. github.com/Kotlin/kotl…
  2. en.wikipedia.org/wiki/Corout…
  3. www.lua.org/pil/9.html
  4. golang.org/doc/effecti…
  5. www.youtube.com/watch?v=4W3…
  6. www.youtube.com/watch?v=EMv…
相關文章
相關標籤/搜索