kotlin協程-Android實戰

協程,英文名是 Coroutine, 本質上,協程是輕量級的線程, 它的調度切換是協做式的,能夠主動掛起和恢復java

retrofit2對協程的支持

先來看看咱們最經常使用的retrofit2,在使用協程和不實用協程的代碼區別在哪裏android

注意retrofit22.6.0纔開始支持協程,因此必定要將retrofit2升級到2.6.0及以上數據庫

先分別定義兩個api,一個是結合rxjava2的用法,一個結合協程的用法api

interface TestApi {
    @GET("api/4/news/latest")
    fun getLatestNews(): Flowable<LatestNews>
    
    @GET("api/4/news/latest")
    suspend fun getLatestNews2(): LatestNews
}

複製代碼

可見retrofit2支持用suspend 定義 getLatestNews2api爲一個掛起函數,便可在協程中使用這個apibash

再來看看怎麼使用兩個不一樣的api網絡

class CoroutineActivity : AppCompatActivity() {
	...
	// 這是一個咱們使用retrofit2 請求數據+切換線程最經常使用的方法
    fun requestData1() {
        testApi.getLatestNews()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(object : DisposableSubscriber<LatestNews>() {
                override fun onComplete() {}

                override fun onNext(t: LatestNews) {
                    tv_text.text = Gson().toJson(t)
                }

                override fun onError(t: Throwable?) {
                    tv_text.text = "error"
                }
            })
    }
    
    // 使用協程 請求+渲染數據
    fun requestData2() {
        GlobalScope.launch(Dispatchers.Main) {
            try {
                tv_text.text = Gson().toJson(testApi.getLatestNews2())
            } catch (e: Exception) {
                tv_text.text = "error"
            }
        }
    }
}
複製代碼

rxjava2是使用回調的方式渲染數據,這個你們都知道併發

而協程須要先使用GlobalScope.launch啓動一個協程(啓動協程的方法不少,請自行查看官方文檔),並使用Dispatchers.Main指定協程調度器爲主線程(即ui線程), 而後經過 try catch分別處理正常和異常的狀況(暫時使用GlobalScope上下文啓動協程,下面會介紹一種專門再android中啓動協程的方法)異步

這樣看來是否是使用協程能夠簡化不少代碼,使代碼看起來更加優雅jvm

咱們再來看看多個請求併發和串行的狀況async

先多添加幾個api,方便操做

interface TestApi {
	@GET("api/3/news/latest")
    fun getLatestNews(): Flowable<LatestNews>

    @GET("api/3/news/{id}")
    fun getNewsDetail(@Path("id") id: Long): Flowable<News>


    @GET("api/4/news/latest")
    suspend fun getLatestNews2(): LatestNews

    @GET("api/3/news/{id}")
    suspend fun getNewsDetail2(@Path("id") id: Long): News
}

複製代碼

好比咱們先調用getLatestNews()方法請求一系列的新聞列表,而後在調用getNewsDetail請求第一個新聞的詳情,代碼以下

// 非協程用法
testApi.getLatestNews()
    .flatMap {
        testApi.getNewsDetail(it.stories!![0].id!!)
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(object : DisposableSubscriber<News>() {
        override fun onComplete() {}

        override fun onNext(t: News) {
            tv_text.text = t.title
        }

        override fun onError(t: Throwable?) {
            tv_text.text = "error"
        }
    })

// 協程用法
GlobalScope.launch(Dispatchers.Main) {
    try {
        val lastedNews = testApi.getLatestNews2()
        val detail = testApi.getNewsDetail2(lastedNews.stories!![0].id!!)
        tv_text.text = detail.title
    } catch(e: Exception) {
        tv_text.text = "error"
    }
}
複製代碼

再好比若是咱們想調用getNewsDetail同時請求多個新聞詳情數據

// 非協程用法
testApi.getLatestNews()
    .flatMap {
        Flowable.zip(
            testApi.getNewsDetail(it.stories!![0].id!!), 
            testApi.getNewsDetail(it.stories!![1].id!!), 
            BiFunction<News, News, List<News>> { news1, news2->
                listOf(news1, news2) 
            }
        )
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(object : DisposableSubscriber<List<News>>() {
        override fun onComplete() {}

        override fun onNext(t: List<News>) {
            tv_text.text = t[0].title + t[1].title
        }

        override fun onError(t: Throwable?) {
            tv_text.text = "error"
        }
    })

// 協程的用法
GlobalScope.launch(Dispatchers.Main) {
    try {
    	// 先請求新聞列表
        val lastedNews = testApi.getLatestNews2()
        // 再使用async 併發請求第一個和第二個新聞的詳情
        val detail1 = async { testApi.getNewsDetail2(lastedNews.stories!![0].id!!) }
        val detail2 = async { testApi.getNewsDetail2(lastedNews.stories!![1].id!!) }
        tv_text.text = detail1.await().title + detail2.await().title
    } catch(e: Exception) {
        tv_text.text = "error"
    }
}
複製代碼

可見相對於非協程的寫法(代碼中使用rxjava2),協程能讓你的代碼更加簡潔、優雅,能更加清晰的描述你第一步想作什麼、第二步想作什麼等等

room數據庫對協程的支持

room數據庫在2.1.0開始支持協程, 而且須要導入room-ktx依賴

implementation "androidx.room:room-ktx:2.1.0"
複製代碼

而後在Dao中使用suspend定義掛起函數

@Dao
abstract class UserDao {
    @Query("select * from tab_user")
    abstract suspend fun getAll(): List<User>
}

複製代碼

最後就像上面retrofit2那樣使用協程便可

class RoomActivity : AppCompatActivity() {
    private var adapter: RoomAdapter? = null

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_room)
        ...
    }
    
    ...

    private fun loadUser() {
        GlobalScope.launch(Dispatchers.Main) {
            adapter!!.data = AppDataBase.getInstance().userDao().getAll()
        }
    }
    
}
        
複製代碼

這裏指介紹room數據庫的協程用法,對於room數據庫的介紹和其餘用法請查看Android Jetpack ROOM數據庫用法介紹android Jetpack ROOM數據庫結合其它Library的使用介紹

協程在android裏的應用

上面的example都是使用GlobalScope上下文來啓動協程, 其實真正在android中通常不建議直接使用GlobalScope,由於使用GlobalScope.launch 時,咱們會建立一個頂層協程。雖然它很輕量,但它運行時仍會消耗一些內存資源,若是咱們忘記保持對新啓動的協程的引用,它還會繼續運行,因此咱們必須保持全部對GlobalScope.launch啓動協程的引用,而後在activity destory(或其它須要cancel)的時候cancel掉全部的協程,不然就會形成內存泄露等一系列問題

好比:

class CoroutineActivity : AppCompatActivity() {
    private lateinit var testApi: TestApi
    private var job1: Job? = null
    private var job2: Job? = null
    private var job3: Job? = null
    
    ...

    override fun onDestroy() {
        super.onDestroy()
        job1?.cancel()
        job2?.cancel()
        job3?.cancel()
    }
    ...
    
    // 啓動第一個頂級協程
    fun requestData1() {
        job1 = GlobalScope.launch(Dispatchers.Main) {
            try {
                val lastedNews = testApi.getLatestNews2()
                tv_text.text = lastedNews.stories!![0].title
            } catch(e: Exception) {
                tv_text.text = "error"
            }
        }
    }

	// 啓動第二個頂級協程
    fun requestData2() {
        job2 = GlobalScope.launch(Dispatchers.Main) {
            try {
                val lastedNews = testApi.getLatestNews2()
                // 在協程內部啓動第三個頂級協程
                job3 = GlobalScope.launch(Dispatchers.Main) {
                    try {
                        val detail = testApi.getNewsDetail2(lastedNews.stories!![0].id!!)
                        tv_text.text = detail.title
                    } catch (e: Exception) {
                        tv_text.text = "error"
                    }
                }
            } catch(e: Exception) {
                tv_text.text = "error"
            }
        }
    }
}
複製代碼

可見若是使用GlobalScope啓動的協程越多,就必須定義越多的變量持有對啓動協程的引用,並在onDestroy的時候cancel掉全部協程

下面咱們就介紹MainScope代替GlobalScope的使用

class CoroutineActivity : AppCompatActivity() {
    private var mainScope = MainScope()
    private lateinit var testApi: TestApi

    ...

    override fun onDestroy() {
        super.onDestroy()
        // 只須要調用mainScope.cancel,就會cancel掉全部使用mainScope啓動的全部協程
        mainScope.cancel()
    }

    fun requestData1() {
        mainScope.launch {
            try {
                val lastedNews = testApi.getLatestNews2()
                tv_text.text = lastedNews.stories!![0].title
            } catch(e: Exception) {
                tv_text.text = "error"
            }
        }
    }

    fun requestData2() {
        mainScope.launch {
            try {
                val lastedNews = testApi.getLatestNews2()
                val detail = testApi.getNewsDetail2(lastedNews.stories!![0].id!!)
                tv_text.text = detail.title
            } catch (e: Exception) {
                tv_text.text = "error"
            }
        }
    }
}

複製代碼

又或者是使用kotlin委託模式實現以下:

class CoroutineActivity : AppCompatActivity(), CoroutineScope by MainScope() {
    private lateinit var testApi: TestApi

	...
	
    override fun onDestroy() {
        super.onDestroy()
        cancel()
    }

    fun requestData1() {
        launch {
            try {
                val lastedNews = testApi.getLatestNews2()
                tv_text.text = lastedNews.stories!![0].title
            } catch(e: Exception) {
                tv_text.text = "error"
            }
        }
    }

    fun requestData2() {
        launch {
            try {
                val lastedNews = testApi.getLatestNews2()
                val detail = testApi.getNewsDetail2(lastedNews.stories!![0].id!!)
                tv_text.text = detail.title
            } catch (e: Exception) {
                tv_text.text = "error"
            }
        }
    }
}
複製代碼

同時咱們先來看看MainScope的定義

@Suppress("FunctionName")
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)

複製代碼

可見使用MainScope很是簡單,只須要在activity onDestroy中調用MainScopecancel方法便可,而不須要定義其它協程的引用, 而且MainScope的調度器是Dispatchers.Main, 因此也不須要手動指定Main調度器

Lifecycle對協程的支持

發現Lifecycle組件庫在2.2.0alpha版中已經有了對於協程的支持

須要添加lifecycle-runtime-ktx依賴(正式版出來以後,請使用正式版)

implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-alpha05"
複製代碼

lifecycle-runtime-ktx 中 給LifecycleOwner添加了 lifecycleScope擴展屬性(類於上面介紹的MainScope),用於方便的操做協程;

先看看源碼

val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
    get() = lifecycle.coroutineScope
    
    
val Lifecycle.coroutineScope: LifecycleCoroutineScope
    get() {
        while (true) {
            val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
            if (existing != null) {
                return existing
            }
            val newScope = LifecycleCoroutineScopeImpl(
                this,
                // SupervisorJob 指定協程做用域是單向傳遞
                // Dispatchers.Main.immediate 指定協程體 在主線程中執行
                // Dispatchers.Main.immediate 跟 Dispatchers.Main惟一的區別是,若是當前在主線程,這立馬執行協程體,而不是走Dispatcher分發流程
                SupervisorJob() + Dispatchers.Main.immediate
            )
            if (mInternalScopeRef.compareAndSet(null, newScope)) {
                newScope.register()
                return newScope
            }
        }
    }    
複製代碼

同時LifecycleCoroutineScope 還提供了綁定LifecycleOwner生命週期(通常是指activityfragment)的啓動協程的方法;以下:

abstract class LifecycleCoroutineScope internal constructor() : CoroutineScope {
    internal abstract val lifecycle: Lifecycle

	// 當 activity 處於created的時候執行 協程體
    fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job = launch {
        lifecycle.whenCreated(block)
    }

	// 當 activity 處於start的時候執行 協程體
    fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job = launch {
        lifecycle.whenStarted(block)
    }

	// 當 activity 處於resume的時候執行 協程體
    fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job = launch {
        lifecycle.whenResumed(block)
    }
}
複製代碼

因爲上面啓動協程的方法綁定了activity生命週期,因此在activity destroy的時候,也實現了自動cancel掉協程

因此咱們 CoroutineActivity Demo的代碼能夠寫的更加簡單,以下:

class CoroutineActivity : AppCompatActivity() {
    private lateinit var testApi: TestApi

	...

    fun requestData1() {
        lifecycleScope.launchWhenResumed {
            try {
                val lastedNews = testApi.getLatestNews2()
                tv_text.text = lastedNews.stories!![0].title
            } catch(e: Exception) {
                tv_text.text = "error"
            }
        }
    }
}

複製代碼

LiveData對協程的支持

同時Google也對LiveData提供了對協程的支持,不過須要添加lifecycle-livedata-ktx依賴

// 如今仍是`alpha`版,等正式版發佈之後,請替換成正式版
implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-alpha05"

複製代碼

lifecycle-livedata-ktx依賴添加了liveData頂級函數,返回CoroutineLiveData

源碼以下:

...
internal const val DEFAULT_TIMEOUT = 5000L
...
fun <T> liveData(
    context: CoroutineContext = EmptyCoroutineContext,
    timeoutInMs: Long = DEFAULT_TIMEOUT,
    @BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)

複製代碼

CoroutineLiveData是在何時啓動協程並執行協程體的呢???

internal class CoroutineLiveData<T>(
    context: CoroutineContext = EmptyCoroutineContext,
    timeoutInMs: Long = DEFAULT_TIMEOUT,
    block: Block<T>
) : MediatorLiveData<T>() {
    private var blockRunner: BlockRunner<T>?
    private var emittedSource: EmittedSource? = null

    init {
        val scope = CoroutineScope(Dispatchers.Main.immediate + context + supervisorJob)
        blockRunner = BlockRunner(
            liveData = this,
            block = block,
            timeoutInMs = timeoutInMs,
            scope = scope
        ) {
            blockRunner = null
        }
    }
	
	...
	
    // observer(觀察者)個數有0到1時執行
    // 即第一次調用observe或observeForever時執行
    override fun onActive() {
        super.onActive()
        // 啓動協程並執行協程體
        blockRunner?.maybeRun()
    }
	
    // observer(觀察者)個數有1到0時執行
    // 即調用removeObserver時觸發檢查並執行回調
    override fun onInactive() {
        super.onInactive()
        // 取消協程
        blockRunner?.cancel()
    }
}
複製代碼

可見CoroutineLiveData是在onActive()啓動協程,在onInactive()取消協程

因此使用LiveData對協程的支持, 那麼CoroutineActivity Demo的代碼寫法以下

class CoroutineActivity : AppCompatActivity() {
    private lateinit var testApi: TestApi
    
	...

    fun requestData1() {
        liveData {
            try {
                val lastedNews = testApi.getLatestNews2()
                emit(lastedNews.stories!![0].title!!)
            } catch(e: Exception) {
                emit("error")
            }
        }.observe(this, Observer {
            tv_text.text = it
        })
    }
}

複製代碼

上面咱們講了協程在android裏最經常使用的用法,下面將介紹協程的一些基本知識

協程上下文

協程上下文用CoroutineContext表示,kotlin中 比較經常使用的Job協程調度器(CoroutineDispatcher)協程攔截器(ContinuationInterceptor)等都是CoroutineContext的子類,即它們都是協程上下文

先看一下CoroutineContext 比較重要的plus方法,它是一個用operator修復的重載(+)號的操做符方法

@SinceKotlin("1.3")
public interface CoroutineContext {

    /**
     * Returns a context containing elements from this context and elements from  other [context].
     * The elements from this context with the same key as in the other one are dropped.
     */
    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    // make sure interceptor is always last in the context (and thus is fast to get when present)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }
}

複製代碼

好比上面說的MainScope定義就使用了+號操做符

public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
複製代碼

若是你看啓動協程的源碼就會發現,在kotlin中 大量使用 + 號操做符,因此kotlin中大部分CoroutineContext對象都是CombinedContext對象

上面的example使用的launch方法啓動協程有三個參數, 分別是協程上下文協程啓動模式協程體

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext, // 協程上下文
    start: CoroutineStart = CoroutineStart.DEFAULT, // 協程啓動模式
    block: suspend CoroutineScope.() -> Unit // 協程體
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
複製代碼

協程啓動模式

  • DEFAULT

    當即執行協程體

    runBlocking {
        val job = GlobalScope.launch(start = CoroutineStart.DEFAULT) {
            println("1: " + Thread.currentThread().name)
        }
        // 不須要調用join方法
        // job.join()
    }
    複製代碼

    打印結果

    1: DefaultDispatcher-worker-1
    複製代碼

    CoroutineStart.DEFAULT啓動模式不須要手動調用joinstart等方法,而是在調用launch方法的時候就會自動執行協程體的代碼

  • LAZY

    只有在須要的狀況下才執行協程體

    runBlocking {
        val job = GlobalScope.launch(start = CoroutineStart.LAZY) {
            println("1: " + Thread.currentThread().name)
        }
        // 必定調用join方法
        job.join()
    }
    複製代碼

    打印結果

    1: DefaultDispatcher-worker-1
    複製代碼

    CoroutineStart.LAZY啓動模式必定要手動調用joinstart等方法,否者協程體不會執行

  • ATOMIC

    當即執行協程體,但在開始運行以前沒法取消, 即開啓協程會無視cancelling狀態

    runBlocking {
        val job = GlobalScope.launch(start = CoroutineStart.ATOMIC) {
            println("1: " + Thread.currentThread().name)
            delay(1000)
            println("2: " + Thread.currentThread().name)
        }
        job.cancel()
        delay(2000)
    }
    複製代碼

    打印結果

    1: DefaultDispatcher-worker-1
    複製代碼

    CoroutineStart. ATOMIC啓動模式的協程體 即便調了cancel方法 也必定會執行,由於開啓協程會無視cancelling狀態;上面的example只打印了一句話,是由於執行delay(1000)的時候 發現協程處於關閉狀態, 因此出現了JobCancellationException異常,致使下面的代碼沒有執行,若是 delay(1000) 這句代碼用 try catch 捕獲一下異常,就會繼續執行下面的代碼

  • UNDISPATCHED

    當即在當前線程執行協程體,直到第一個 suspend 調用 掛起以後的執行線程取決於上下文當中的調度器了

    runBlocking {
        println("0: " + Thread.currentThread().name)
        // 注意這裏沒有用GlobalScope.launch
        // 由於GlobalScope.launch啓動的是一個頂層協程, 沒法關聯當前協程的上下文(coroutineContext), 致使結果有誤差
        launch(context = Dispatchers.Default, start = CoroutineStart.UNDISPATCHED) {
            println("1: " + Thread.currentThread().name)
            delay(1000)
            println("2: " + Thread.currentThread().name)
        }
        delay(2000)
    }
    複製代碼

    打印結果

    0: main
    1: main
    2: DefaultDispatcher-worker-1
    複製代碼

    可見 0 和 1 的執行線程是同樣的,當執行完delay(1000), 後面的代碼執行線程取決於Dispatchers.Default調度器指定的線程,因此 2 在另外一個線程中執行

協程調度器

協程調度器 其實也是 協程上下文

協程調度器是用來指定協程代碼塊在哪一個線程中執行,kotlin提供了幾個默認的協程調度器,分別是DefaultMainUnconfined, 並針對jvm, kotlin提供了一個特有的IO調度器

  • Dispatchers.Default

    指定代碼塊在線程池中執行

    GlobalScope.launch(Dispatchers.Default) {
        println("1: " + Thread.currentThread().name)
        launch (Dispatchers.Default) {
            delay(1000) // 延遲1秒後,再繼續執行下面的代碼
            println("2: " + Thread.currentThread().name)
        }
        println("3: " + Thread.currentThread().name)
    }
    複製代碼

    打印結果以下

    1: DefaultDispatcher-worker-1
    3: DefaultDispatcher-worker-1
    2: DefaultDispatcher-worker-1
    複製代碼
  • Dispatchers.Main

    指定代碼塊在main線程中執行(針對Android就是ui線程)

    GlobalScope.launch(Dispatchers.Default) {
        println("1: " + Thread.currentThread().name)
        launch (Dispatchers.Main) {
            delay(1000) // 延遲1秒後,再繼續執行下面的代碼
            println("2: " + Thread.currentThread().name)
        }
        println("3: " + Thread.currentThread().name)
    }
    複製代碼

    打印結果以下:

    1: DefaultDispatcher-worker-1
    3: DefaultDispatcher-worker-1
    2: main
    複製代碼

    可見Dispatchers.Main就是指定協程代碼塊在main線程中執行

  • Dispatchers.Unconfined

    沒有指定協程代碼快在哪一個特定線程中執行,即當前在哪一個線程,代碼塊中接下來的代碼就在哪一個線程中執行(即一段協程代碼塊 因爲啓動了子協程 致使切換了線程, 那麼接下來的代碼塊也是在這個線程中執行)

    GlobalScope.launch(Dispatchers.Default) {
        println("1: " + Thread.currentThread().name)
        launch (Dispatchers.Unconfined) {
            println("2: " + Thread.currentThread().name)
            requestApi()  // delay(1000) 原本想用delay,可是使用requestApi 可能更加清晰
            println("3: " + Thread.currentThread().name)
        }
        println("4: " + Thread.currentThread().name)
    }
    
    // 定義一個掛起函數,在一個新的子線程中執行
    private suspend fun requestApi() = suspendCancellableCoroutine<String> {
        Thread {
            println("5: requestApi: " + Thread.currentThread().name)
            it.resume("success")
        }.start()
    }
    複製代碼

    打印結果以下:

    1: DefaultDispatcher-worker-1
    2: DefaultDispatcher-worker-1
    5: requestApi: Thread-3
    4: DefaultDispatcher-worker-1
    3: Thread-3
    複製代碼

    可見2 和 3的代碼 執行線程明顯不同;當執行到requestApi這句代碼的時候 會切換到子線程(即Thread-3)中執行代碼,而後接下來的協程代碼塊就會在Thread-3中執行

  • Dispatchers.IO

    它是基於 Default 調度器背後的線程池,並實現了獨立的隊列和限制,所以協程調度器從 Default 切換到 IO 並不會觸發線程切換

    GlobalScope.launch(Dispatchers.Default) {
        println("1: " + Thread.currentThread().name)
        launch (Dispatchers.IO) {
            println("2: " + Thread.currentThread().name)
            requestApi()  // delay(1000)
            println("3: " + Thread.currentThread().name)
        }
        println("4: " + Thread.currentThread().name)
    }
    複製代碼

    打印結果以下:

    1: DefaultDispatcher-worker-1
    4: DefaultDispatcher-worker-1
    2: DefaultDispatcher-worker-1
    5: requestApi: Thread-3
    3: DefaultDispatcher-worker-1
    複製代碼
  • 綁定到任意自定義線程的調度器(這種方式要謹慎使用)

    可使用kotlin自帶newSingleThreadContext方法或者使用ExecutorService的擴展方法asCoroutineDispatcher建立一個Dispatcher

    // 第一種方法
    val dispatcher = newSingleThreadContext("custom thread")
    // 第二種方法
    // val dispatcher = Executors.newSingleThreadExecutor{ r -> Thread(r, "custom thread") }.asCoroutineDispatcher()
    GlobalScope.launch(dispatcher) {
        println("1: " + Thread.currentThread().name)
        delay(1000)
        println("2: " + Thread.currentThread().name)
    }
    
    runBlocking {     
        delay(2000L)  
        // 必定要close,不然線程永遠都不會結束,很危險
        dispatcher.close()
    }
    複製代碼

    打印結果以下:

    1: custom thread
    2: custom thread
    複製代碼

    可見咱們能夠本身建立線程綁定到協程調度器上,可是這種方式不建議使用,由於一旦手動建立了線程 就須要手動close,不然線程就永遠也不會終止,這樣會很危險

協程做用域GlobalScope、coroutineScope、supervisorScope

協程做用域是一個很是重的東西

  • GlobeScope

    GlobeScope 啓動的協程會單獨啓動一個做用域,沒法繼承外面協程的做用域,其內部的子協程聽從默認的做用域規則

  • coroutineScope

    coroutineScope 啓動的協程會繼承父協程的做用域,其內部的取消操做是雙向傳播的,子協程未捕獲的異常也會向上傳遞給父協程

  • supervisorScope

    supervisorScope 啓動的協程會繼承父協程的做用域,他跟coroutineScope不同的點是 它是單向傳遞的,即內部的取消操做和異常傳遞 只能由父協程向子協程傳播,不能從子協程傳向父協程

    MainScope 就是使用的supervisorScope做用域,因此只須要子協程 出錯 或 cancel 並不會影響父協程,從而也不會影響兄弟協程

協程異常傳遞模式

協程的異常傳遞跟協程做用域有關,要麼跟coroutineScope同樣雙向傳遞,要麼跟supervisorScope同樣由父協程向子協程單向傳遞

針對supervisorScope的單向傳遞

runBlocking {
    println("1")
    supervisorScope {
        println("2")
        // 啓動一個子協程
        launch {
            1/0 // 故意讓子協程出現異常
        }
        delay(100)
        println("3")
    }
    println("4")
}
複製代碼

打印結果以下:

1
2
Exception in thread "main @coroutine#2" java.lang.ArithmeticException: / by zero
3
4

複製代碼

可見在supervisorScope做用域中啓動的子協程若是出現異常,並無致使父協程異常,而且父協程的代碼還能繼續往下執行

咱們再來驗證一下再supervisorScope做用域中父協程異常是否會傳遞給子協程

runBlocking {
    println("1")
    supervisorScope {
        println("2")
        // 啓動一個子協程
        launch {
            try {
                delay(1000)
                println("3")
            } catch (e: Exception) {
                println("error")
            }
        }
        delay(100)
        1/0 //父協程報錯
        println("3")
    }
}
複製代碼
1
2
error

java.lang.ArithmeticException: / by zero
複製代碼

可見在supervisorScope做用域中 父協程確實會將異常傳遞給子協程

針對coroutineScope的雙向傳遞

runBlocking {
    println("1")
    try {
        coroutineScope {
            println("2")
            // 啓動一個子協程
            launch {
                1/0 // 故意讓子協程出現異常
            }
            delay(100)
            println("3")
        }
    } catch (e: Exception) {
        println("error")
    }
}
複製代碼

打印結果以下:

1
2
error

複製代碼

可見在coroutineScope做用域中啓動的子協程若是出現異常,則會傳遞給父協程

咱們再來驗證一下再coroutineScope做用域中父協程異常是否會傳遞給子協程

runBlocking {
    println("1")
    coroutineScope {
        println("2")
        // 啓動一個
        launch {
            try {
                delay(1000)
                println("3")
            } catch (e: Exception) {
                println("error")
            }
        }
        delay(100)
        1/0
        println("3")
    }
}
複製代碼

打印結果以下:

1
2
error

java.lang.ArithmeticException: / by zero
複製代碼

可見在coroutineScope做用域中 父協程確實會將異常傳遞給子協程

協程取消

先看一段代碼

GlobalScope.launch {
    println("1")
    // 啓動一個子協程
    val job = launch {
        println("2")
        try {// 捕獲 協程cancel致使的異常,讓代碼繼續往下執行
            delay(1000)
        } catch (e: Exception) {
            println("error")
        }
        println("3")
        if (isActive) { // 若是協程cancel了,則isActive爲false
            println("4")
        }
        delay(1000) // 沒有捕獲異常,則終止代碼繼續往下執行
        println("5")
    }
    delay(100)
    job.cancel()
}
複製代碼

打印結果以下:

1
2
error
3
複製代碼

當先啓動協程,而後cancel,會出現以下幾種狀況:

  • 若是執行到協程體內的代碼依賴協程的cancel狀態(好比delay方法),則會拋出異常,若是捕獲了異常,則會繼續往下執行,若是沒有捕獲異常則終止往下繼續執行協程體
  • 若是協程體內的代碼不依賴協程的cancel狀態(即println方法),則會繼續往下執行

也就是說 協程的取消(cancel) 致使協程體終止運行的方式是 拋出異常,若是協程體的代碼不依賴協程的cancel狀態(即沒有報錯),則協程的取消 對協程體的執行通常沒什麼影響

好比:

GlobalScope.launch {
    val job = launch {
        println("==start==")
        var i = 0
        while (i <= 10) {
            Thread.sleep(100)
            println(i++)
        }
        println("==end==")
    }
    delay(100)
    job.cancel()
}
複製代碼

打印結果以下:

==start==
0
1
2
3
4
5
6
7
8
9
10
==end==
複製代碼

可見即便協程取消了,協程體仍是在繼續運行

若是想結束協程體的運行該怎麼辦呢??

這個時候可使用CoroutineScope的isActive字段判斷協程的狀態是否被取消了

GlobalScope.launch {
    val job = launch {
        println("==start==")
        var i = 0
        while (i <= 10 && isActive) {
            Thread.sleep(100)
            println(i++)
        }
        println("==end==")
    }
    delay(200)
    job.cancel()
}
複製代碼

打印結果

==start==
0
1
==end==
複製代碼

可見若是協程取消了,可使用isActive字段來判斷是否須要執行協程體的某段代碼

withContext

在執行協程體的時候,可使用withContext方便的切換代碼執行所運行線程;好比

GlobalScope.launch(Dispatchers.Default) {
	// 在Dispatchers.Default的線程池中執行
    println("1: " + Thread.currentThread().name)
    withContext(Dispatchers.Main) { // 切換到主線程執行
        println("2: " + Thread.currentThread().name)
    }
    // 在Dispatchers.Default的線程池中執行
    println("3: " + Thread.currentThread().name)
    val dispatcher = newSingleThreadContext("custom thread")
    withContext(dispatcher) { // 切換到自定義線程中執行
        println("4: " + Thread.currentThread().name)
    }
    dispatcher.close()
    // 在Dispatchers.Default的線程池中執行
    println("5: " + Thread.currentThread().name)
}
複製代碼

打印結果

1: DefaultDispatcher-worker-1
2: main
3: DefaultDispatcher-worker-2
4: custom thread
5: DefaultDispatcher-worker-2
複製代碼

可見咱們可使用withContext方便的切換代碼運行所在的線程

withContext還能夠配合NonCancellable上下文確保代碼塊不能被取消

GlobalScope.launch(Dispatchers.Default) {
    val job = launch {
        println("1: " + Thread.currentThread().name)
        try {
            delay(1000)
        } catch (e: Exception) {
            withContext(NonCancellable) { // 配合NonCancellable上下文確保協程體不能被取消
                println("error: " + e.message)
                delay(100) // 若是沒有用withContext(NonCancellable)包裹,則delay(100)會報錯, 致使下面的代碼不執行
                println("2: " + Thread.currentThread().name)
            }
        }
    }
    delay(100)
    job.cancel()
}
複製代碼

打印結果

1: DefaultDispatcher-worker-1
error: Job was cancelled
2: DefaultDispatcher-worker-1
複製代碼

結構化併發

什麼是結構化併發呢?

其實很簡單,即保證啓動的協程在同一做用域中(我的理解)

當咱們使用GlobalScope.launch啓動協程的時候會建立一個頂層協程,若是咱們每次都使用GlobalScope.launch啓動協程, 那麼就會建立不少個頂層協程,而且不會相互干擾,即即便一個協程出錯或的取消了,另外一個協程仍是會繼續運行,由於它們不是在同一個協程做用域中

GlobalScope.launch(Dispatchers.Default) {
    val a1 = GlobalScope.async { 這裏使用async啓動協程,沒有使用launch
        delay(1000)
        println("1: " + Thread.currentThread().name)
    }
    val a2 = GlobalScope.async {
        delay(100)
        1/0 // 故意報錯
        println("2: " + Thread.currentThread().name)
    }
    a1.await()
    a2.await() // a2.cancel() 也可使用cancel
}
複製代碼

打印結果以下

1: DefaultDispatcher-worker-1
Exception in thread "DefaultDispatcher-worker-1" java.lang.ArithmeticException: / by zero
複製代碼

可見a2報錯或cancel,並不會影響a1

這到底會引發什麼問題呢?

好比咱們在一個activity中一般會有多個併發網絡請求 請求數據(即會啓動多個協程),當其中一個網絡請求出錯時(即協程出錯),咱們但願關閉其它並行的網絡請求,而不處理(即但願關閉掉其它協程),可是結果並不是如此

再好比咱們在一個activity中一般會有許多個網絡請求(即會啓動許多個協程),若是咱們老是使用GlobalScope啓動協程,那麼必須保持每一個協程的引用,並在activity destroy時cancel掉全部協程,不然即便activity destroy,那麼協程裏的異步請求代碼仍是會繼續執行,這樣很容易出錯或內存泄漏

咱們該怎麼方便的解決這樣的問題呢?

其實咱們可使用結構化併發(即協程做用域)來解決這樣的問題,即保證啓動的多個協程在同一做用域中,若是cancel掉這個做用域上下文,那麼在這個做用域下啓動的全部子協程都會取消,同時還能夠配合coroutineScope、supervisorScope協程做用域 處理異常傳遞的問題

因此上面的代碼能夠這樣改

GlobalScope.launch(Dispatchers.Default) {
    val a1 = async {
        delay(1000)
        println("1: " + Thread.currentThread().name)
    }
    val a2 = async {
        delay(100)
        1/0 // 故意報錯
        println("2: " + Thread.currentThread().name)
    }
    a1.await()
    a2.await()
}
複製代碼

即把啓動 a1a2協程的GlobalScope去掉,保證a1a2在同一協程做用域中

協程掛起函數原理分析

咱們先來看一看retrofit兼容協程的實現源碼

suspend fun <T : Any> Call<T>.await(): T {
	// 使用suspendCancellableCoroutine定義掛起函數,參數是Continuation對象
  return suspendCancellableCoroutine { continuation ->
    continuation.invokeOnCancellation {
      cancel()
    }
    enqueue(object : Callback<T> {
      override fun onResponse(call: Call<T>, response: Response<T>) {
        if (response.isSuccessful) {
          val body = response.body()
          if (body == null) {
            val invocation = call.request().tag(Invocation::class.java)!!
            val method = invocation.method()
            val e = KotlinNullPointerException("Response from " +
                method.declaringClass.name +
                '.' +
                method.name +
                " was null but response body type was declared as non-null")
            // 若是結果異常,則調用Continuation 的 resumeWithException回調
            continuation.resumeWithException(e)
          } else {
          	// 若是結果正常,則調用Continuation 的 resume回調
            continuation.resume(body)
          }
        } else {
          // 若是結果異常,則調用Continuation 的 resumeWithException回調
          continuation.resumeWithException(HttpException(response))
        }
      }

      override fun onFailure(call: Call<T>, t: Throwable) {
        // 若是結果異常,則調用Continuation 的 resumeWithException回調
        continuation.resumeWithException(t)
      }
    })
  }
}
複製代碼

Continuation的源碼和擴展函數以下

@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))

/**
 * Resumes the execution of the corresponding coroutine so that the [exception] is re-thrown right after the
 * last suspension point.
 */
@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
    resumeWith(Result.failure(exception))

@SinceKotlin("1.3")
public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}
複製代碼

可見協程掛起函數內部是使用回調將結果返回出去的,當有結果正常返回的時候,Continuation 調用 resume 返回結果,不然調用 resumeWithException 來拋出異常,這與 Callback 的模式如出一轍

而咱們寫協程代碼之因此能夠看起來是同步的,實際上是編譯器幫你作了不少事情(即你能夠當它是「語法糖」)

注意:使用AndroidStudio反編譯kotlin協程代碼的時候會致使ide嚴重卡頓,而且反編譯出來的java代碼有無數層的嵌套,不知道是沒法反編譯協程代碼,仍是AndroidStudio的bug, 致使沒法配合kotlin反編譯的java代碼來說解

協程的狀態轉移

上面已經對協程掛起函數原理作了一些解析,若是咱們使用了多個掛起函數 那它們是怎麼配合運行的呢?

注意: 下面的代碼是我copy的別人的代碼

suspend fun main() {
    log(1)
    // returnSuspended()是一個suspend函數
    log(returnSuspended())
    log(2)
    // delay也是一個suspend函數
    delay(1000)
    log(3)
    // returnImmediately也是一個suspend函數
    log(returnImmediately())
    log(4)
}

複製代碼

對應的java實現代碼邏輯以下(注意,下面的代碼邏輯上並不能作到十分嚴謹,僅供學習理解協程使用)

public class ContinuationImpl implements Continuation<Object> {
	 // label 狀態 默認爲 0
    private int label = 0;
    private final Continuation<Unit> completion;

    public ContinuationImpl(Continuation<Unit> completion) {
        this.completion = completion;
    }

    @Override
    public CoroutineContext getContext() {
        return EmptyCoroutineContext.INSTANCE;
    }

    @Override
    public void resumeWith(@NotNull Object o) {
        try {
            Object result = o;
            switch (label) {
                case 0: {
                    LogKt.log(1);
                    // 在SuspendFunctionsKt.returnSuspended內部以回調的方式 調用this的resumeWith方法
                    result = SuspendFunctionsKt.returnSuspended( this);
                    // label 狀態加 1
                    label++;
                    if (isSuspended(result)) return;
                }
                case 1: {
                    LogKt.log(result);
                    LogKt.log(2);
                    // 在DelayKt.delay內部以回調的方式 調用this的resumeWith方法
                    result = DelayKt.delay(1000, this);
                    // label 狀態加 1
                    label++;
                    if (isSuspended(result)) return;
                }
                case 2: {
                    LogKt.log(3);
                    // 在SuspendFunctionsKt.returnImmediately內部以回調的方式 調用this的resumeWith方法
                    result = SuspendFunctionsKt.returnImmediately( this);
                    // label 狀態加 1
                    label++;
                    if (isSuspended(result)) return;
                }
                case 3:{
                    LogKt.log(result);
                    LogKt.log(4);
                }
            }
            completion.resumeWith(Unit.INSTANCE);
        } catch (Exception e) {
            completion.resumeWith(e);
        }
    }

    private boolean isSuspended(Object result) {
        return result == IntrinsicsKt.getCOROUTINE_SUSPENDED();
    }
}
複製代碼

可見多個掛起函數之間的配合使用是使用label這個狀態字段不斷加1 而且 不斷調用resumeWith方法實現的

總結以下:

  • 協程的掛起函數本質上就是一個回調,回調類型就是 Continuation
  • 協程體的執行就是一個狀態機,每一次遇到掛起函數,都是一次狀態轉移,就像咱們前面例子中的 label 不斷的自增來實現狀態流轉同樣

最後 很是感謝破解 Kotlin 協程的博客,這是學習Coroutine很是好的文章,建議你們去看看

相關文章
相關標籤/搜索