一篇文章帶你完全搞懂Kotlin的協程

產生背景

爲了解決異步線程產生的回調地獄java

//傳統回調方式
api.login(phone,psd).enquene(new Callback<User>(){
  public void onSuccess(User user){
    api.submitAddress(address).enquene(new Callback<Result>(){
      public void onSuccess(Result result){
        ...
      }
    });
  }
});
複製代碼
//使用協程後
val user=api.login(phone,psd)
api.submitAddress(address)
...
複製代碼

協程是什麼

本質上,協程是輕量級的線程。android

協程關鍵名詞

val job = GlobalScope.launch {
    delay(1000)
    println("World World!")
}
複製代碼
  • CoroutineScope(做用範圍)

    控制協程代碼塊執行的線程,生命週期等,包括GlobeScope、lifecycleScope、viewModelScope以及其餘自定義的CoroutineScopeapi

    GlobeScope:全局範圍,不會自動結束執行markdown

    lifecycleScope:生命週期範圍,用於activity等有生命週期的組件,在DESTROYED的時候會自動結束,需額外引入框架

    viewModelScope:viewModel範圍,用於ViewModel中,在ViewModel被回收時會自動結束,需額外引入異步

  • Job(做業)

    協程的計量單位,至關於一次工做任務,launch方法默認返回一個新的Jobasync

  • suspend(掛起)

    做用於方法上,表明該方法是耗時任務,例如上面的delay方法ide

public suspend fun delay(timeMillis: Long) {
    ...
}
複製代碼

協程的引入

主框架($coroutines_version替換爲最新版本,如1.3.9,下同)函數

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutines_version"
複製代碼

lifecycleScope(可選,版本2.2.0)oop

implementation 'androidx.activity:activity-ktx:$lifecycle_scope_version'
複製代碼

viewModelScope(可選,版本2.3.0-beta01)

implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:$coroutines_viewmodel_version"
複製代碼

簡單使用

先舉個簡單例子

lifecycleScope.launch { 
    delay(2000)
    tvTest.text="Test"
}
複製代碼

上面這個例子實現的功能是等待2秒,而後修改id爲tvTest的TextView控件的text值爲Test

自定義延遲返回方法

在kotlin裏面,對於須要延遲才能返回結果的方法,須要用suspend標明

lifecycleScope.launch {
    val text=getText()
    tvTest.text = text
}
複製代碼
suspend fun getText():String{
    delay(2000)
    return "getText"
}
複製代碼

若是在其餘線程,須要使用Continuation進行線程切換,可以使用suspendCancellableCoroutine 或 suspendCoroutine包裹(前者可取消,至關於後者的擴展),成功調用it.resume(),失敗調用it.resumeWithException(Exception()),拋出異常

suspend fun getTextInOtherThread() = suspendCancellableCoroutine<String> {
    thread {
        Thread.sleep(2000)
        it.resume("getText")
    }
}
複製代碼

異常捕獲

協程裏面的失敗均可以經過異常捕獲,來統一處理特殊狀況

lifecycleScope.launch {
    try {
        val text=getText()
        tvTest.text = text
    } catch (e:Exception){
        e.printStackTrace()
    }
}
複製代碼

取消功能

下面執行了兩個job,第一個是原始的,第二個是在1秒後取消第一個job,這會致使tvText的文本並不會改變

val job = lifecycleScope.launch {
    try {
        val text=getText()
        tvTest.text = text
    } catch (e:Exception){
        e.printStackTrace()
    }
}
lifecycleScope.launch {
    delay(1000)
    job.cancel()
}
複製代碼

設置超時

這個至關於系統封裝了自動取消功能,對應函數withTimeout

lifecycleScope.launch {
    try {
        withTimeout(1000) {
            val text = getText()
            tvTest.text = text
        }
    } catch (e:Exception){
        e.printStackTrace()
    }
}
複製代碼

帶返回值的Job

與launch相似的還有一個async方法,它會返回一個Deferred對象,屬於Job的擴展類,Deferred能夠獲取返回的結果,具體使用以下

lifecycleScope.launch {
    val one= async {
        delay(1000)
        return@async 1
    }
    val two= async {
        delay(2000)
        return@async 2
    }
    Log.i("scope test",(one.await()+two.await()).toString())
}
複製代碼

高級進階

自定義CoroutineScope

先看CoroutineScope源碼

public interface CoroutineScope {
    public val coroutineContext: CoroutineContext
}
複製代碼

CoroutineScope中主要包含一個coroutineContext對象,咱們要自定義只需實現coroutineContext的get方法

class TestScope() : CoroutineScope {
    override val coroutineContext: CoroutineContext
        get() = TODO("Not yet implemented")
}
複製代碼

要建立coroutineContext,得要先知道CoroutineContext是什麼,咱們再看CoroutineContext源碼

/** * Persistent context for the coroutine. It is an indexed set of [Element] instances. * An indexed set is a mix between a set and a map. * Every element in this set has a unique [Key]. */
public interface CoroutineContext {
    public operator fun <E : Element> get(key: Key<E>): E?
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R
    public operator fun plus(context: CoroutineContext): CoroutineContext = 
        ...
    public fun minusKey(key: Key<*>): CoroutineContext
  
    public interface Key<E : Element>
    public interface Element : CoroutineContext {
        ...
    }
}
複製代碼

經過註釋說明,咱們知道它本質就是一個包含Element的集合,只是不像set和map集合同樣,它本身實現了獲取(get),摺疊(fold,添加和替換的組合),相減(minusKey,移除),對象組合(plus,如val coroutineContext=coroutineContext1+coroutineContext2)

它的主要內容是Element,而Element的實現有

  • Job 任務
  • ContinuationInterceptor 攔截器
  • AbstractCoroutineContextElement
  • CoroutineExceptionHandler
  • ThreadContextElement
  • DownstreamExceptionElement
  • ....

能夠看到不少地方都有實現Element,它主要目的是限制範圍以及異常的處理。這裏咱們先了解兩個重要的Element,一個是Job,一個是CoroutineDispatcher

Job
  • Job:子Job取消,會致使父job和其餘子job被取消;父job取消,全部子job被取消
  • SupervisorJob:父job取消,全部子job被取消
CoroutineDispatcher
  • Dispatchers.Main:主線程執行
  • Dispatchers.IO:IO線程執行

咱們模擬一個相似lifecycleScope的自定義TestScope

class TestScope() : CoroutineScope {
    override val coroutineContext: CoroutineContext
        get() = SupervisorJob() +Dispatchers.Main
}
複製代碼

這裏咱們定義了一個總流程線SupervisorJob()以及具體執行環境Dispatchers.Main(Android主線程),假如咱們想替換掉activity的lifecycleScope,就須要在activity中建立實例

val testScope=TestScope()
複製代碼

而後在activity銷燬的時候取消掉全部job

override fun onDestroy() {
    testScope.cancel()
    super.onDestroy()
}
複製代碼

其餘使用方式同lifecycleScope,如

testScope.launch{
    val text = getText()
    tvTest.text = text
}
複製代碼

深刻理解Job

CoroutineScope中包含一個主Job,以後調用的launch或其餘方法建立的job都屬於CoroutineScope的子Job,每一個job都有屬於本身的狀態,其中包括isActive、isCompleted、isCancelled,以及一些基礎操做start()、cancel()、join(),具體的轉換流程以下

job狀態圖.png

咱們先從建立job開始,當調用launch的時候默認有三個參數CoroutineContext、CoroutineStart以及代碼塊參數。

  • context:CoroutineContext的對象,默認爲CoroutineStart.DEFAULT,會與CoroutineScope的context進行摺疊
  • start:CoroutineStart的對象,默認爲CoroutineStart.DEFAULT,表明當即執行,同時還有CoroutineStart.LAZY,表明非當即執行,必須調用job的start()纔會開始執行
val job2= lifecycleScope.launch(start =  CoroutineStart.LAZY) {
    delay(2000)
    Log.i("scope test","lazy")
}
job2.start()
複製代碼

當使用這種模式建立時默認就是new狀態,此時isActive,isCompleted,isCancelled都爲false,當調用start後,轉換爲active狀態,其中只有isActive爲true,若是它的任務完成了則會進入Completing狀態,此時爲等待子job完成,這種狀態下仍是隻有isActive爲true,若是全部子job也完成了則會進入Completed狀態,只有isCompleted爲true。若是在active或Completing狀態下出現取消或異常,則會進入Cancelling狀態,若是須要取消父job和其餘子job則會等待它們取消完成,此時只有isCancelled爲true,取消完成後最終進入Cancelled狀態,isCancelled和isCompleted都爲true

State isActive isCompleted isCancelled
New FALSE FALSE FALSE
Active TRUE FALSE FALSE
Completing TRUE FALSE FALSE
Cancelling FALSE FALSE TRUE
Cancelled FALSE TRUE TRUE
Completed FALSE TRUE FALSE

不一樣job交互需使用join()與cancelAndJoin()

  • join():將當前job添加到其餘協程任務裏面
  • cancelAndJoin():取消操做,只是添加進去後再取消
val job1= GlobleScope.launch(start =  CoroutineStart.LAZY) {
    delay(2000)
    Log.i("scope test","job1")
}
lifecycleScope.launch {
    job1.join()
    delay(2000)
    Log.i("scope test","job2")
}
複製代碼

深刻理解suspend

suspend做爲kotlin新增的方法修飾詞,最終實現仍是java,咱們先看它們的差別性

suspend fun test1(){}
fun test2(){}
複製代碼

對應java代碼

public final Object test1(@NotNull Continuation $completion) {
  return Unit.INSTANCE;
}
public final void test2() {
}
複製代碼

對應字節碼

public final test1(Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
  ...
   L0
    LINENUMBER 6 L0
    GETSTATIC kotlin/Unit.INSTANCE : Lkotlin/Unit;
    ARETURN
   L1
    LOCALVARIABLE this Lcom/lieni/android_c/ui/test/TestActivity; L0 L1 0
    LOCALVARIABLE $completion Lkotlin/coroutines/Continuation; L0 L1 1
    MAXSTACK = 1
    MAXLOCALS = 2

public final test2()V L0 LINENUMBER 9 L0 RETURN L1 LOCALVARIABLE this Lcom/lieni/android_c/ui/test/TestActivity; L0 L1 0
    MAXSTACK = 0
    MAXLOCALS = 1

複製代碼

能夠看到,加了suspend的方法其實和普通方法同樣,只是傳入時多了個Continuation對象,並返回了Unit.INSTANCE對象

Continuation是一個接口,包含了context對象和resumeWith方法

public interface Continuation<in T> {
    public val context: CoroutineContext
    public fun resumeWith(result: Result<T>)
}
複製代碼

而Continuation的具體實如今BaseContinuationImpl中

internal abstract class BaseContinuationImpl(...) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result: Result<Any?>) {
        ...
        while (true) {
            ...
            with(current) {
              	val outcome = invokeSuspend(param)
                ...
                releaseIntercepted() 
                if (completion is BaseContinuationImpl) {
                    ...
                } else {
                    ...
                    return
                }
            }
        }
    }
    ...
}
複製代碼

當咱們調用resumeWith時,它會一直執行一個循環,調用invokeSuspend(param)和releaseIntercepted() ,直到最頂層completion執行完成後返回,而且釋放協程的interceptor

最終的釋放在ContinuationImpl中實現

internal abstract class ContinuationImpl(...) : BaseContinuationImpl(completion) {
    ...
    protected override fun releaseIntercepted() {
        val intercepted = intercepted
        if (intercepted != null && intercepted !== this) {
            context[ContinuationInterceptor]!!.releaseInterceptedContinuation(intercepted)
        }
        this.intercepted = CompletedContinuation 
    }
}
複製代碼

經過這裏知釋放最終經過CoroutineContext中爲ContinuationInterceptor的Element來實現

而暫停也是同理,繼續看suspendCoroutine

public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T =
    suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
        val safe = SafeContinuation(c.intercepted())
        ...
    }
複製代碼

默認會調用Continuation的intercepted()方法

internal abstract class ContinuationImpl(...) : BaseContinuationImpl(completion) {
    ...
    public fun intercepted(): Continuation<Any?> =intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
}
複製代碼

能夠看到暫停最終也是經過CoroutineContext中爲ContinuationInterceptor的Element來實現

流程總結(線程切換)

  • 建立新的Continuation
  • 調用CoroutineScope中的context的ContinuationInterceptor的interceptContinuation方法暫停父任務
  • 執行子任務(若是指定了線程,則在新線程執行,並傳入Continuation對象)
  • 執行完畢後用戶調用Continuation的resume或者resumeWith返回結果
  • 調用CoroutineScope中的context的ContinuationInterceptor的releaseInterceptedContinuation方法恢復父任務

阻塞與非阻塞

CoroutineScope默認是不會阻塞當前線程的,若是須要阻塞可使用runBlocking,若是在主線程執行下面代碼,會出現2s白屏

runBlocking { 
    delay(2000)
    Log.i("scope test","runBlocking is completed")
}
複製代碼

阻塞原理:執行runBlocking默認會建立BlockingCoroutine,而BlockingCoroutine中會一直執行一個循環,直到當前Job爲isCompleted狀態纔會跳出循環

public fun <T> runBlocking(...): T {
    ...
    val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
    return coroutine.joinBlocking()
}
複製代碼
private class BlockingCoroutine<T>(...) : AbstractCoroutine<T>(parentContext, true) {
    ...
    fun joinBlocking(): T {
      ...
      while (true) {
        ...
        if (isCompleted) break
        ...
      }    
      ...
    }
}
複製代碼
相關文章
相關標籤/搜索