破解 Kotlin 協程(6) - 協程掛起篇

關鍵詞:Kotlin 協程 協程掛起 任務掛起 suspend 非阻塞java

協程的掛起最初是一個很神祕的東西,由於咱們老是用線程的概念去思考,因此咱們只能想到阻塞。不阻塞的掛起究竟是怎麼回事呢?說出來你也許會笑~~(哭?。。抱歉這篇文章我實在是沒辦法寫的更通俗易懂了,你們必定要親手實踐!)bash

1. 先看看 delay

咱們剛剛學線程的時候,最多見的模擬各類延時用的就是 Thread.sleep 了,而在協程裏面,對應的就是 delaysleep 讓線程進入休眠狀態,直到指定時間以後某種信號或者條件到達,線程就嘗試恢復執行,而 delay 會讓協程掛起,這個過程並不會阻塞 CPU,甚至能夠說從硬件使用效率上來說是「什麼都不耽誤」,從這個意義上講 delay 也能夠是讓協程休眠的一種很好的手段。框架

delay 的源碼其實很簡單:ide

public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
    }
}

複製代碼

cont.context.delay.scheduleResumeAfterDelay 這個操做,你能夠類比 JavaScript 的 setTimeout,Android 的 handler.postDelay,本質上就是設置了一個延時回調,時間一到就調用 cont 的 resume 系列方法讓協程繼續執行。函數

剩下的最關鍵的就是 suspendCancellableCoroutine 了,這但是咱們的老朋友了,前面咱們用它實現了回調到協程的各類轉換 —— 原來 delay 也是基於它實現的,若是咱們再多看一些源碼,你就會發現相似的還有 joinawait 等等。post

2. 再來講說 suspendCancellableCoroutine

既然你們對於 suspendCancellableCoroutine 已經很熟悉了,那麼咱們乾脆直接召喚一個老朋友給你們:性能

private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
    cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler))
}

複製代碼

Job.join() 這個方法會首先檢查調用者 Job 的狀態是否已經完成,若是是,就直接返回並繼續執行後面的代碼而再也不掛起,不然就會走到這個 joinSuspend 的分支當中。咱們看到這裏只是註冊了一個完成時的回調,那麼傳說中的 suspendCancellableCoroutine 內部究竟作了什麼呢?學習

public suspend inline fun <T> suspendCancellableCoroutine( crossinline block: (CancellableContinuation<T>) -> Unit
): T =
    suspendCoroutineUninterceptedOrReturn { uCont ->
        val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
        block(cancellable)
        cancellable.getResult() // 這裏的類型是 Any?
    }
    
複製代碼

suspendCoroutineUninterceptedOrReturn 這個方法調用的源碼是看不到的,由於它根本沒有源碼:P 它的邏輯就是幫你們拿到 Continuation 實例,真的就只有這樣。不過這樣提及來仍是很抽象,由於有一處很是的可疑:suspendCoroutineUninterceptedOrReturn 的返回值類型是 T,而傳入的 lambda 的返回值類型是 Any?, 也就是咱們看到的 cancellable.getResult() 的類型是 Any?,這是爲何?優化

我記得在協程系列文章的開篇,我就提到過 suspend 函數的簽名,當時是以 await 爲例的,這個方法大體至關於:ui

fun await(continuation: Continuation<User>): Any {
    ...
}
複製代碼

suspend 一方面爲這個方法添加了一個 Continuation 的參數,另外一方面,原先的返回值類型 User 成了 Continuation 的泛型實參,而真正的返回值類型居然是 Any。固然,這裏由於定義的邏輯返回值類型 User 是不可空的,所以真實的返回值類型也用了 Any 來示意,若是泛型實參是個可空的類型,那麼真實的返回值類型也就是 Any? 了,這正與前面提到的 cancellable.getResult() 返回的這個 Any? 相對應。

若是你們去查 await 的源碼,你一樣會看到這個 getResult() 的調用。

簡單來講就是,對於 suspend 函數,不是必定要掛起的,能夠在須要的時候掛起,也就是要等待的協程尚未執行完的時候,等待協程執行完再繼續執行;而若是在開始 join 或者 await 或者其餘 suspend 函數,若是目標協程已經完成,那麼就不必等了,直接拿着結果走人便可。那麼這個神奇的邏輯就在於 cancellable.getResult() 究竟返回什麼了,且看:

internal fun getResult(): Any? {
    ...
    if (trySuspend()) return COROUTINE_SUSPENDED // ① 觸發掛起邏輯
    ...
    if (state is CompletedExceptionally)  // ② 異常當即拋出
        throw recoverStackTrace(state.cause, this) 
    return getSuccessfulResult(state) // ③ 正常結果當即返回
}

複製代碼

這段代碼 ① 處就是掛起邏輯了,表示這時候目標協程尚未執行完,須要等待結果,②③是協程已經執行完能夠直接拿到異常和正常結果的兩種狀況。②③好理解,關鍵是 ①,它要掛起,這返回的是個什麼東西?

public val COROUTINE_SUSPENDED: Any get() = CoroutineSingletons.COROUTINE_SUSPENDED

internal enum class CoroutineSingletons { COROUTINE_SUSPENDED, UNDECIDED, RESUMED }
複製代碼

這是 1.3 的實現,1.3 之前的實現更有趣,就是一個白板 Any。實際上是什麼不重要,關鍵是這個東西是一個單例,任什麼時候候協程見到它就知道本身該掛起了。

3. 深刻掛起操做

既然說到掛起,你們可能以爲仍是隻知其一;不知其二,仍是不知道掛起究竟怎麼作到的,怎麼辦?說真的這個掛起是個什麼操做其實一直沒有拿出來給你們看,不是咱們過小氣了,只是太早拿出來會比較嚇人。。

suspend fun hello() = suspendCoroutineUninterceptedOrReturn<Int>{
    continuation ->
    log(1)
    thread {
        Thread.sleep(1000)
        log(2)
        continuation.resume(1024)
    }
    log(3)
    COROUTINE_SUSPENDED
}

複製代碼

我寫了這麼一個 suspend 函數,在 suspendCoroutineUninterceptedOrReturn 當中直接返回了這個傳說中的白板 COROUTINE_SUSPENDED,正常來講咱們應該在一個協程當中調用這個方法對吧,但是我偏不,我寫一段 Java 代碼去調用這個方法,結果會怎樣呢?

public class CallCoroutine {
    public static void main(String... args) {
        Object value = SuspendTestKt.hello(new Continuation<Integer>() {
            @NotNull
            @Override
            public CoroutineContext getContext() {
                return EmptyCoroutineContext.INSTANCE;
            }

            @Override
            public void resumeWith(@NotNull Object o) { // ①
                if(o instanceof Integer){
                    handleResult(o);
                } else {
                    Throwable throwable = (Throwable) o;
                    throwable.printStackTrace();
                }
            }
        });

        if(value == IntrinsicsKt.getCOROUTINE_SUSPENDED()){ // ②
            LogKt.log("Suspended.");
        } else {
            handleResult(value);
        }
    }

    public static void handleResult(Object o){
        LogKt.log("The result is " + o);
    }
}

複製代碼

這段代碼看上去比較奇怪,可能會讓人困惑的有兩處:

① 處,咱們在 Kotlin 當中看到的 resumeWith 的參數類型是 Result,怎麼這兒成了 Object 了?由於 Result 是內聯類,編譯時會用它惟一的成員替換掉它,所以就替換成了 Object (在Kotlin 裏面是 Any?

② 處 IntrinsicsKt.getCOROUTINE_SUSPENDED() 就是 Kotlin 的 COROUTINE_SUSPENDED

剩下的其實並不難理解,運行結果天然就是以下所示了:

07:52:55:288 [main] 1
07:52:55:293 [main] 3
07:52:55:296 [main] Suspended.
07:52:56:298 [Thread-0] 2
07:52:56:306 [Thread-0] The result is 1024
複製代碼

其實這段 Java 代碼的調用方式與 Kotlin 下面的調用已經很接近了:

suspend fun main() {
    log(hello())
}
複製代碼

只不過咱們在 Kotlin 當中仍是不太容易拿到 hello 在掛起時的真正返回值,其餘的返回結果徹底相同。

12:44:08:290 [main] 1
12:44:08:292 [main] 3
12:44:09:296 [Thread-0] 2
12:44:09:296 [Thread-0] 1024
複製代碼

頗有可能你看到這裏都會以爲暈頭轉向,沒有關係,我如今已經開始嘗試揭示一些協程掛起的背後邏輯了,比起簡單的使用,概念的理解和接受須要有個小小的過程。

4. 深刻理解協程的狀態轉移

前面咱們已經對協程的原理作了一些揭示,顯然 Java 的代碼讓你們可以更容易理解,那麼接下來咱們再來看一個更復雜的例子:

suspend fun returnSuspended() = suspendCoroutineUninterceptedOrReturn<String>{
    continuation ->
    thread {
        Thread.sleep(1000)
        continuation.resume("Return suspended.")
    }
    COROUTINE_SUSPENDED
}

suspend fun returnImmediately() = suspendCoroutineUninterceptedOrReturn<String>{
    log(1)
    "Return immediately."
}

複製代碼

咱們首先定義兩個掛起函數,第一個會真正掛起,第二個則會直接返回結果,這相似於咱們前面討論 join 或者 await 的兩條路徑。咱們再用 Kotlin 給出一個調用它們的例子:

suspend fun main() {
    log(1)
    log(returnSuspended())
    log(2)
    delay(1000)
    log(3)
    log(returnImmediately())
    log(4)
}

複製代碼

運行結果以下:

08:09:37:090 [main] 1
08:09:38:096 [Thread-0] Return suspended.
08:09:38:096 [Thread-0] 2
08:09:39:141 [kotlinx.coroutines.DefaultExecutor] 3
08:09:39:141 [kotlinx.coroutines.DefaultExecutor] Return immediately.
08:09:39:141 [kotlinx.coroutines.DefaultExecutor] 4
複製代碼

好,如今咱們要揭示這段協程代碼的真實面貌,爲了作到這一點,咱們用 Java 來仿寫一下這段邏輯:

注意,下面的代碼邏輯上並不能作到十分嚴謹,不該該出如今生產當中,僅供學習理解協程使用。

public class ContinuationImpl implements Continuation<Object> {

    private int label = 0;
    private final Continuation<Unit> completion;

    public ContinuationImpl(Continuation<Unit> completion) {
        this.completion = completion;
    }

    @Override
    public CoroutineContext getContext() {
        return EmptyCoroutineContext.INSTANCE;
    }

    @Override
    public void resumeWith(@NotNull Object o) {
        try {
            Object result = o;
            switch (label) {
                case 0: {
                    LogKt.log(1);
                    result = SuspendFunctionsKt.returnSuspended( this);
                    label++;
                    if (isSuspended(result)) return;
                }
                case 1: {
                    LogKt.log(result);
                    LogKt.log(2);
                    result = DelayKt.delay(1000, this);
                    label++;
                    if (isSuspended(result)) return;
                }
                case 2: {
                    LogKt.log(3);
                    result = SuspendFunctionsKt.returnImmediately( this);
                    label++;
                    if (isSuspended(result)) return;
                }
                case 3:{
                    LogKt.log(result);
                    LogKt.log(4);
                }
            }
            completion.resumeWith(Unit.INSTANCE);
        } catch (Exception e) {
            completion.resumeWith(e);
        }
    }

    private boolean isSuspended(Object result) {
        return result == IntrinsicsKt.getCOROUTINE_SUSPENDED();
    }
}

複製代碼

咱們定義了一個 Java 類 ContinuationImpl,它就是一個 Continuation 的實現。

實際上若是你願意,你還真得能夠在 Kotlin 的標準庫當中找到一個名叫 ContinuationImpl 的類,只不過,它的 resumeWith 最終調用到了 invokeSuspend,而這個 invokeSuspend 實際上就是咱們的協程體,一般也就是一個 Lambda 表達式 —— 咱們經過 launch啓動協程,傳入的那個 Lambda 表達式,實際上會被編譯成一個 SuspendLambda 的子類,而它又是 ContinuationImpl 的子類。

有了這個類咱們還須要準備一個 completion 用來接收結果,這個類仿照標準庫的 RunSuspend 類實現,若是你有閱讀前面的文章,那麼你應該知道 suspend main 的實現就是基於這個類:

public class RunSuspend implements Continuation<Unit> {

    private Object result;

    @Override
    public CoroutineContext getContext() {
        return EmptyCoroutineContext.INSTANCE;
    }

    @Override
    public void resumeWith(@NotNull Object result) {
        synchronized (this){
            this.result = result;
            notifyAll(); // 協程已經結束,通知下面的 wait() 方法中止阻塞
        }
    }

    public void await() throws Throwable {
        synchronized (this){
            while (true){
                Object result = this.result;
                if(result == null) wait(); // 調用了 Object.wait(),阻塞當前線程,在 notify 或者 notifyAll 調用時返回
                else if(result instanceof Throwable){
                    throw (Throwable) result;
                } else return;
            }
        }
    }
}

複製代碼

這段代碼的關鍵點在於 await() 方法,它在其中起了一個死循環,不過你們不要懼怕,這個死循環是個紙老虎,若是 resultnull,那麼當前線程會被當即阻塞,直到結果出現。具體的使用方法以下:

...
    public static void main(String... args) throws Throwable {
        RunSuspend runSuspend = new RunSuspend();
        ContinuationImpl table = new ContinuationImpl(runSuspend);
        table.resumeWith(Unit.INSTANCE);
        runSuspend.await();
    }
...

複製代碼

這寫法簡直就是 suspend main 的真實面貌了。

咱們看到,做爲 completion 傳入的 RunSuspend 實例的 resumeWith 其實是在 ContinuationImplresumeWtih 的最後纔會被調用,所以它的 await() 一旦進入阻塞態,直到 ContinuationImpl 的總體狀態流轉完畢纔會中止阻塞,此時進程也就運行完畢正常退出了。

因而這段代碼的運行結果以下:

08:36:51:305 [main] 1
08:36:52:315 [Thread-0] Return suspended.
08:36:52:315 [Thread-0] 2
08:36:53:362 [kotlinx.coroutines.DefaultExecutor] 3
08:36:53:362 [kotlinx.coroutines.DefaultExecutor] Return immediately.
08:36:53:362 [kotlinx.coroutines.DefaultExecutor] 4
複製代碼

咱們看到,這段普通的 Java 代碼與前面的 Kotlin 協程調用徹底同樣。那麼我這段 Java 代碼的編寫根據是什麼呢?就是 Kotlin 協程編譯以後產生的字節碼。固然,字節碼是比較抽象的,我這樣寫出來就是爲了讓你們更容易的理解協程是如何執行的,看到這裏,相信你們對於協程的本質有了進一步的認識:

  • 協程的掛起函數本質上就是一個回調,回調類型就是 Continuation
  • 協程體的執行就是一個狀態機,每一次遇到掛起函數,都是一次狀態轉移,就像咱們前面例子中的 label 不斷的自增來實現狀態流轉同樣

若是可以把這兩點認識清楚,那麼相信你在學習協程其餘概念的時候就都將再也不是問題了。若是想要進行線程調度,就按照咱們講到的調度器的作法,在 resumeWith 處執行線程切換就行了,其實很是容易理解的。官方的協程框架本質上就是在作這麼幾件事兒,若是你去看源碼,可能一時雲裏霧裏,主要是由於框架除了實現核心邏輯外還須要考慮跨平臺實現,還須要優化性能,但無論怎麼樣,這源碼橫豎看起來就是五個字:狀態機回調。

5. 小結

不一樣以往,咱們從這一篇開始毫無保留的爲你們嘗試揭示協程背後的邏輯,也許一時間可能有些難懂,不過沒關係,你可使用協程一段時間以後再來閱讀這些內容,相信必定會豁然開朗的。

固然,這一篇內容的安排更可能是爲後面的序列篇開路,Kotlin 的 Sequence 就是基於協程實現的,它的用法很簡單,幾乎與普通的 Iterable 沒什麼區別,所以序列篇咱們會重點關注它的內部實現原理,歡迎你們關注。


歡迎關注 Kotlin 中文社區!

中文官網:www.kotlincn.net/

中文官方博客:www.kotliner.cn/

公衆號:Kotlin

知乎專欄:Kotlin

CSDN:Kotlin中文社區

掘金:Kotlin中文社區

簡書:Kotlin中文社區

相關文章
相關標籤/搜索