Kotlin Coroutine 探索之旅

協程

你們若是已經使用Kotlin語言進行開發,對協程這個概念應該不會很陌生。雖然在網上有不少Kotlin協程相關的文章,但當我開始準備使用的時候,仍是有以下幾個疑慮。html

  1. 協程到底可以解決什麼樣的問題?
  2. 協程和咱們經常使用的Executor、RxJava有什麼區別?
  3. 項目上使用有什麼風險嗎?

接下來就帶着這幾個問題一塊兒來了解一下Kotlin的協程。java

如何使用

關於協程,我在網上看到最多的說法是協程是輕量級的線程。那麼協程首先應該解決的問題就是程序中咱們經常遇到的 「異步」 的問題。咱們看看官網介紹的幾個使用例子。android

依賴

implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.3'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.3'
複製代碼

入門

import kotlinx.coroutines.*

fun main() {
    GlobalScope.launch { // 在後臺啓動一個新的協程並繼續
        delay(1000L)
        println("World!")
    }
    println("Hello,") // 主線程中的代碼會當即執行
    runBlocking {     // 可是這個表達式阻塞了主線程
        delay(2000L)  // ……咱們延遲 2 秒來保證 JVM 的存活
    } 
}
複製代碼

掛起函數

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // 假設咱們在這裏作了一些有用的事
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // 假設咱們在這裏也作了一些有用的事
    return 29
}

val time = measureTimeMillis {
    val one = doSomethingUsefulOne()
    val two = doSomethingUsefulTwo()
    println("The answer is ${one + two}")
}
println("Completed in $time ms")

複製代碼

結果:git

The answer is 42
Completed in 2015 ms
複製代碼

使用 async 併發

val time = measureTimeMillis {
    val one = async { doSomethingUsefulOne() }
    val two = async { doSomethingUsefulTwo() }
    println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")
複製代碼

結果:github

The answer is 42
Completed in 1017 ms
複製代碼

單元測試

class MyTest {
    @Test
    fun testMySuspendingFunction() = runBlocking<Unit> {
        // 這裏咱們可使用任何喜歡的斷言風格來使用掛起函數
    }
}
複製代碼

更新詳細的使用可參考官網示例bash

爲什麼使用

既然已經有這麼多異步處理的框架,那咱們爲什麼還要使用協程。這裏舉個例子,看看對同個需求,不一樣異步框架的處理方式。微信

如今有一個產品需求,生成一個二維碼在頁面展現給用戶。咱們來對比看看不一樣的作法。併發

Thread

Thread(Runnable {
        try {
            val qrCode: Bitmap =
            CodeCreator.createQRCode(this@ShareActivity, SHARE_QR_CODE)
            runOnUiThread { 
                img_qr_code.setImageBitmap(qrCode)
                }
            } catch (e: WriterException) {
                e.printStackTrace()
            }
        }).start()
    }
複製代碼

Executors

Executors.newSingleThreadExecutor().execute {
        try {
            val qrCode: Bitmap =
            CodeCreator.createQRCode(this@ShareActivity, SHARE_QR_CODE)
            runOnUiThread {
                img_qr_code.setImageBitmap(qrCode)
            }
        } catch (e: WriterException) {
            e.printStackTrace()
        }
    }
複製代碼

RxJava

Observable.just(SHARE_QR_CODE)
        .map(new Function<String, Bitmap>() {
            @Override
            public Bitmap apply(String s) throws Exception {
                return CodeCreator.createQRCode(ShareActivity.this, s);
            }
        })
        .subscribe(new Consumer<Bitmap>() {
            @Override
            public void accept(Bitmap bitmap) throws Exception {
                img_qr_code.setImageBitmap(bitmap);
            }
        });
複製代碼

Koroutine

val job = GlobalScope.launch(Dispatchers.IO) {
            val bitmap = CodeCreator.createQRCode(ShareActivity.this, SHARE_QR_CODE)
            launch(Dispatchers.Main) {
                img_qr_code.setImageBitmap(bitmap)
            }
        }
}
複製代碼

經過這個例子,能夠看出使用協程的很是方便解決 "異步回調" 問題。 相比傳統的Thread及Excutors,RxJava將嵌套回調轉換成鏈式調用的形式,提升了代碼可讀性。協程直接將鏈式調用轉換成了協程內的順序調用,"代碼更加精簡"app

性能

官網上對於協程的有一句介紹。框架

本質上,協程是輕量級的線程

那麼協程的執行效率到底怎麼樣呢?下面咱們採用官網的示例在相同的環境和設備下作下對比。

啓動了 1000個協程,而且爲每一個協程都輸出一個點

Coroutine

var startTime = System.currentTimeMillis()
            repeat(times) { i -> // 啓動大量的協程
                GlobalScope.launch(Dispatchers.IO) {
                    Log.d(this@MainActivity.toString(), "$i=.")
                }

            }
            var endTime = System.currentTimeMillis() - startTime;
            Log.d(this@MainActivity.toString(), "endTime=$endTime")
            
複製代碼

執行結果:endTime=239 ms

Thread

var startTime = System.currentTimeMillis()
            repeat(times) { i ->// 啓動大量的線程
                Thread(Runnable {
                    Log.d(this@MainActivity.toString(), "$i=.")
                }).start()
            }
            var endTime = System.currentTimeMillis() - startTime;
複製代碼

執行結果:endTime=3161 ms

Excutors

var startTime = System.currentTimeMillis()
            var executors = Executors.newCachedThreadPool()
            repeat(times) { i -> // 使用線程池
                executors.execute {
                    Log.d(this@MainActivity.toString(), "$i=.")
                }
            }
            var endTime = System.currentTimeMillis() - startTime;
            Log.d(this@MainActivity.toString(), "endTime=$endTime")
複製代碼

執行結果:endTime=143 ms

rxjava

var startTime = System.currentTimeMillis()
            repeat(times) { i -> // 啓動Rxjava
                Observable.just("").subscribeOn(Schedulers.io())
                        .subscribe {
                            Log.d(this@MainActivity.toString(), "$i=.")
                        }
            }
            var endTime = System.currentTimeMillis() - startTime;
            Log.d(this@MainActivity.toString(), "endTime=$endTime")
複製代碼

執行結果:endTime=241 ms

源碼工程:CorountineTest

Profiler

利用AS自帶的Profiler對運行時的CPU狀態進行檢測,咱們能夠看到Thread對CPU的消耗比較大,Koroutine、Executor、RxJava的消耗基本差很少。

總結

從執行時間和Profiler上看,Coroutine比使用Thread性能提高了一個量級,但與Excutor和RxJava性能是在一個量級上。

注意這裏的例子爲了簡便,由於異步執行的時間基本和repeat的時間差很少,咱們沒有等全部異步執行完再打印時間,這裏不追求精確的時間,只爲作量級上的對比。

實現機制

協程底層異步實現機制

咱們先來看一段簡單的Kotlin程序。

GlobalScope.launch(Dispatchers.IO) {
            print("hello world")
        }
複製代碼

咱們接着看下launch的實現代碼。

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
複製代碼

這裏注意,咱們經過追蹤最後的繼承關係發現,DefaultScheduler.IO最後也是一個CoroutineContext。

接着發現繼續看coroutine.start的實現,以下:

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        initParentJob()
        start(block, receiver, this)
    }
複製代碼

接着繼續看CoroutineStart的start策略,以下:

@InternalCoroutinesApi
    public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>) =
        when (this) {
            CoroutineStart.DEFAULT -> block.startCoroutineCancellable(completion)
            CoroutineStart.ATOMIC -> block.startCoroutine(completion)
            CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(completion)
            CoroutineStart.LAZY -> Unit // will start lazily
        }
複製代碼

繼續看startCoroutineCancellable方法,以下:

@InternalCoroutinesApi
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) = runSafely(completion) {
    createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
複製代碼

繼續看resumeCancellableWith方法實現:

@InternalCoroutinesApi
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>) = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result)
    else -> resumeWith(result)
}
複製代碼

最後發現調用的resumeCancellableWith方法實現以下:

inline fun resumeCancellableWith(result: Result<T>) {
        val state = result.toState()
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
            dispatcher.dispatch(context, this)
        } else {
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled()) {
                    resumeUndispatchedWith(result)
                }
            }
        }
    }
複製代碼

這裏關鍵的觸發方法在這個地方

dispatcher.dispatch(context, this)

複製代碼

咱們看 DefaultScheduler.IO最後的dispatch方法:

override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
        try {
            coroutineScheduler.dispatch(block)
        } catch (e: RejectedExecutionException) {
            DefaultExecutor.dispatch(context, block)
        }
複製代碼

這裏咱們最終發現是調用了CoroutineScheduler的dispatch方法,繼續看CoroutineScheduler的實現發現,CoroutineScheduler繼承了Executor。

經過dispatch的調用最後能夠發現CoroutineScheduler其實就是對Worker的調度,咱們看看Worker的定義。

internal inner class Worker private constructor() : Thread()
複製代碼

經過這裏咱們發現另一個老朋友Thread,因此到這裏也符合上面性能驗證的測試結果。

到這裏咱們也有結論了,協程異步實現機制本質也就是自定義的線程池。

非阻塞式掛起 suspend

suspend有什麼做用,如何作到異步不用回調?下面先定義一個最簡單的suspend方法。

suspend fun hello(){
        delay(100)
        print("hello world")
    }
複製代碼

經過Kotlin Bytecode轉換爲java 代碼以下:

@Nullable
   public final Object hello(@NotNull Continuation $completion) {
      Object $continuation;
      label20: {
         if ($completion instanceof <undefinedtype>) {
            $continuation = (<undefinedtype>)$completion;
            if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
               ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
               break label20;
            }
         }

         $continuation = new ContinuationImpl($completion) {
            // $FF: synthetic field
            Object result;
            int label;
            Object L$0;

            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
               this.result = $result;
               this.label |= Integer.MIN_VALUE;
               return Test.this.hello(this);
            }
         };
      }

      Object $result = ((<undefinedtype>)$continuation).result;
      Object var6 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      switch(((<undefinedtype>)$continuation).label) {
      case 0:
         ResultKt.throwOnFailure($result);
         ((<undefinedtype>)$continuation).L$0 = this;
         ((<undefinedtype>)$continuation).label = 1;
         if (DelayKt.delay(100L, (Continuation)$continuation) == var6) {
            return var6;
         }
         break;
      case 1:
         Test var7 = (Test)((<undefinedtype>)$continuation).L$0;
         ResultKt.throwOnFailure($result);
         break;
      default:
         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }

      String var2 = "hello world";
      boolean var3 = false;
      System.out.print(var2);
      return Unit.INSTANCE;
   }
複製代碼

這裏首先咱們發現方法的參數多了一個Continuation completion而且內部回定義一個 Object continuation,看看Continuation的定義。

@SinceKotlin("1.3")
public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}
複製代碼

這是一個回調接口,裏面有一個關鍵的方法爲resumeWith。 這個方法的具體調用經過上面的協程調用流程能夠知道 ,在DispatchedContinuation的resumeCancellableWith會觸發。

public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>) = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result)
    else -> resumeWith(result)
}
複製代碼

那麼resumeWith裏面作了那些事情?咱們看下具體的實如今ContinuationImpl的父類BaseContinuationImpl中。

public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

複製代碼

首先咱們發現這裏實際上是一個遞歸的循環,而且會調用invokeSuspend方法觸發實際的調用,等待返回結果。經過上面的分析能夠看出2點。

  1. 非阻塞是由於自己啓動一個協程也是使用線程池異步執行,因此不會阻塞
  2. 協程並非沒有回調,而是將回調的接口(Continuation)及調度代碼在編譯器生成,不用本身編寫。
  3. resumeWith是一個循環及遞歸,因此會將協程內定義的表達式順序串聯調用。達到掛起及恢復的鏈式調用。

總結

  1. 協程到底可以解決什麼樣的問題?
  • 解決異步回調嵌套
  • 解決異步任務之間協做
  1. 協程和咱們經常使用的Executor、RxJava有什麼區別?
  • 從任務調度上看,本質都是線程池的封裝
  1. 項目上使用有什麼風險嗎?
  • 從性能上看與線程池與RxJava在一個量級
  • 目前已經是穩定版本1.3.3,開源項目使用多
  • 代碼使用簡便,可維護性高
  • 開源生態支持良好,方便使用(Retrofit、Jitpack已支持)
  • 團隊學習及舊項目改造須要投入必定成本

參考資料

www.kotlincn.net

關於

歡迎關注個人我的公衆號

微信搜索:一碼一浮生,或者搜索公衆號ID:life2code

相關文章
相關標籤/搜索