Kotlin coroutines meeting Architecture components

前言

Kotlin coroutines 爲咱們提供了一種編寫良好異步代碼的簡易 API。在 Kotlin coroutines 中,咱們能夠自定義 CoroutineScope,用來管理咱們啓動的 coroutines 的運行位置。須要注意的是,每一個 coroutine 都須要運行在 CoroutineScope 中。 Architecture components 爲咱們提供了在各組件中使用 coroutine 的官方支持。html

Add KTX dependencies

這些 coroutine scopes 做爲 Architecture components 的擴展包提供給開發者使用,它們位於 KTX extensions。經過單獨添加如下依賴,咱們就可使用它們。java

  • ViewModelScope:androidx.lifecycle:lifecycle-viewmodel-ktx:2.1.0-beta01 或更高
  • LifecycleScope:androidx.lifecycle:lifecycle-runtime-ktx:2.2.0-alpha01 或更高
  • LiveData:androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-alpha01 或更高

最新版本可在 Google Maven 中找到。android

Lifecycle-aware coroutine scopes

Architecture components 爲咱們提供了 ViewModelScopeLifecycleScope 用於管理咱們在 ViewModel 和 Lifecycle 中啓動的 Coroutinesgit

ViewModelScope

Usage

ViewModelScope 是 ViewModel 的一個擴展屬性,所以咱們能夠在每一個 ViewModel 的子類中使用它。每一個在 ViewModelScope 中啓動的 coroutine 都會在 ViewModel 銷燬(ViewModel#onCleared())的時候自動取消(cancel)。若是咱們只須要在 ViewModel 存活(active)時作一些邏輯處理,使用 Coroutines 是一個好的選擇。舉個栗子,假如咱們在 ViewModel 中爲 View 層計算一些數據而後將結果顯示到 UI 上,咱們應該限定這些計算工做在 ViewModel 的生命週期內執行。這樣當咱們的 ViewModel 銷燬的時候,這些計算工做也會自動取消,避免資源浪費和內存泄露。 咱們可使用 ViewModel 的擴展屬性 -- ViewModelScope,來限定 Coroutines 的運行範圍。使用方式以下:github

class MyViewModel: ViewModel {
    init {
        // 在 ViewModelScope 中啓動一個 coroutine
        viewModelScope.launch {
            // 這個 coroutine 會在 ViewModel 銷燬時被自動取消。
        }
    }
}
複製代碼

Source code

下面咱們來看下源碼是如何實現的。web

ViewModel.kt

// 用於存放 viewModelScope 的 key
private const val JOB_KEY = "androidx.lifecycle.ViewModelCoroutineScope.JOB_KEY"

/** * [CoroutineScope] tied to this [ViewModel]. * This scope will be canceled when ViewModel will be cleared, i.e [ViewModel.onCleared] is called * * This scope is bound to [Dispatchers.Main] */
val ViewModel.viewModelScope: CoroutineScope
        get() {
            // 首先從緩存中取 CoroutineScope,若非第一次調用 viewModelScope,則會直接返回
            val scope: CoroutineScope? = this.getTag(JOB_KEY)
            if (scope != null) {
                return scope
            }
            // 若首次調用,則新建一個 CloseableCoroutineScope,並存在 ViewModel 中。
            // 這個 CloseableCoroutineScope 與主線程綁定。
            return setTagIfAbsent(JOB_KEY,
                CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main))
        }

internal class CloseableCoroutineScope(context: CoroutineContext) : Closeable, CoroutineScope {
    // 實現 CoroutineScope 接口,設置 CoroutineContext
    override val coroutineContext: CoroutineContext = context

    // 實現 Closeable 接口
    override fun close() {
        // 取消該 scope 中啓動的全部 coroutines
        coroutineContext.cancel()
    }
}
複製代碼

ViewModel.java

下面咱們帶着如下疑問,來看下 ViewModel 的源碼。ViewModel 的源碼也就幾十行。數據庫

  • getTag 是如何實現的?
  • setTagIfAbsent 是如何實現的?
  • 爲何 viewModelScope 中啓動的線程會在 ViewModel 銷燬時被自動取消。
public abstract class ViewModel {
    // Can't use ConcurrentHashMap, because it can lose values on old apis (see b/37042460)
    @Nullable
    private final Map<String, Object> mBagOfTags = new HashMap<>();
    private volatile boolean mCleared = false;

    /** * This method will be called when this ViewModel is no longer used and will be destroyed. * <p> * It is useful when ViewModel observes some data and you need to clear this subscription to * prevent a leak of this ViewModel. */
    @SuppressWarnings("WeakerAccess")
    protected void onCleared() {
    }

    @MainThread
    final void clear() {
        mCleared = true;
        // Since clear() is final, this method is still called on mock objects
        // and in those cases, mBagOfTags is null. It'll always be empty though
        // because setTagIfAbsent and getTag are not final so we can skip
        // clearing it
        if (mBagOfTags != null) {
            synchronized (mBagOfTags) {
                for (Object value : mBagOfTags.values()) {
                    // see comment for the similar call in setTagIfAbsent
                    closeWithRuntimeException(value);
                }
            }
        }
        onCleared();
    }

    /** * Sets a tag associated with this viewmodel and a key. * If the given {@code newValue} is {@link Closeable}, * it will be closed once {@link #clear()}. * <p> * If a value was already set for the given key, this calls do nothing and * returns currently associated value, the given {@code newValue} would be ignored * <p> * If the ViewModel was already cleared then close() would be called on the returned object if * it implements {@link Closeable}. The same object may receive multiple close calls, so method * should be idempotent. */
    <T> T setTagIfAbsent(String key, T newValue) {
        T previous;
        synchronized (mBagOfTags) {
            //noinspection unchecked
            previous = (T) mBagOfTags.get(key);
            if (previous == null) {
                mBagOfTags.put(key, newValue);
            }
        }
        T result = previous == null ? newValue : previous;
        if (mCleared) {
            // It is possible that we'll call close() multiple times on the same object, but
            // Closeable interface requires close method to be idempotent:
            // "if the stream is already closed then invoking this method has no effect." (c)
            closeWithRuntimeException(result);
        }
        return result;
    }

    /** * Returns the tag associated with this viewmodel and the specified key. */
    @SuppressWarnings("TypeParameterUnusedInFormals")
    <T> T getTag(String key) {
        //noinspection unchecked
        synchronized (mBagOfTags) {
            return (T) mBagOfTags.get(key);
        }
    }

    private static void closeWithRuntimeException(Object obj) {
        if (obj instanceof Closeable) {
            try {
                ((Closeable) obj).close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}
複製代碼

能夠看到:api

  • mBagOfTags 這個成員變量是一個 HashMap,用於存放鍵值對,getTag(String key) 就是從 mBagOfTags 取出該 key 對應的值,並作了強制類型轉換。
  • setTagIfAbsent(String key, T newValue) 就是將該 newValue 存儲到 mBagOfTags 中,以便下次取出使用。須要注意的是,若是想要給已經存在的 key 設置一個新值(newValue),將不會生效,新值會被忽略,而後返回已經存在的舊值(previous)。而且,若是 ViewModel#clear() 已經被系統調用(好比它的 Activity/Fragment 已經銷燬)時(mCleared = true),新存儲的值會調用 closeWithRuntimeException(Object obj)
  • ViewModel#clear() 中,會遍歷 mBagOfTags,而後調用 closeWithRuntimeException(Object obj)
  • closeWithRuntimeException(Object obj) 方法中,若是這個 obj 是實現了 Closeable 接口的類的對象,就會調用它的 close 方法。

回到這個問題:爲何 viewModelScope 中啓動的線程會在 ViewModel 銷燬時被自動取消? 如今就能夠有答案了:由於 ViewModel 的擴展屬性 viewModelScope 是一個實現了 Closeable 接口的 CloseableCoroutineScope,而且存放在了 ViewModel 的 mBagOfTags 中。因爲 ViewModel#clear() 時會將 mBagOfTags 中全部實現了 Closeable 接口的類的對象關閉(close),因此會回調 CloseableCoroutineScope#close() 方法,並此方法內,取消了全部該 CoroutineScope 中的全部 Coroutines。緩存

Test

TestCoroutineDispatcher

ViewModel.kt 源碼可知 viewModelScope 是運行在主線程中的(CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main))。Dispatchers.Main 在 Android 平臺是指 UI 線程,它的實現依賴 Looper.getMainLooper()。所以咱們沒法使用 Unit tests 測試它們。 幸運的是,官方爲咱們提供了測試依賴能夠替換 Dispatchers.Main 的實現。bash

implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:$coroutines_version'
複製代碼

咱們可使用該庫提供的 Dispatchers.setMain(dispatcher: CoroutineDispatcher) 來重寫它,而且它也爲咱們提供了一個默認實現:TestCoroutineDispatcherTestCoroutineDispatcher 是一個 CoroutineDispatcher 的實現類,咱們可使用它控制 Coroutines 的執行,好比 pause/resume 或控制它的虛擬時鐘。該類是在 Kotlin Coroutines v1.2.1 新增的,目前仍是一個實驗性 API。能夠查看官方文檔

咱們不該在單元測試時使用 Dispatchers.Unconfined 來替換 Dispatchers.Main,它不會是咱們預想的那樣驗證結果或耗時。因爲單元測試應該運行在一個隔離環境內,不受其它因素的影響,因此每次執行完一個測試,咱們要恢復初始狀態,能夠調用 Dispatchers.resetMain() 重置。 固然,咱們能夠自定義 Rule 來避免樣板代買:

@ExperimentalCoroutinesApi
class CoroutinesTestRule(
        val testDispatcher: TestCoroutineDispatcher = TestCoroutineDispatcher()
) : TestWatcher() {

    override fun starting(description: Description?) {
        super.starting(description)
        Dispatchers.setMain(testDispatcher)
    }

    override fun finished(description: Description?) {
        super.finished(description)
        Dispatchers.resetMain()
        testDispatcher.cleanupTestCoroutines()
    }
}
複製代碼

而後咱們就能夠在測試類中使用這個 Rule:

class MainViewModelUnitTest {

    @get:Rule
    var coroutinesTestRule = CoroutinesTestRule()

    @Test
    fun test() {
        ...
    }
}
複製代碼

使用 Mockito 測試 Coroutines

咱們通常使用 Mockitoverify 方法驗證對象的方法是否調用,但這並非一個完美的方式。咱們最好驗證咱們的邏輯代碼是否正確,好比某個元素是否存在。 在驗證某個對象的方法是否調用前,咱們要確保全部已啓動的 Coroutines 都執行完畢。舉個例子:

class MainViewModel(private val dependency: Any): ViewModel {

  fun sampleMethod() {
    viewModelScope.launch {
      val hashCode = dependency.hashCode()
      // TODO: do something with hashCode
  }
}

class MainViewModelUnitTest {

  // Mockito setup goes here
  ...

  @get:Rule
  var coroutinesTestRule = CoroutinesTestRule()

  @Test
  fun test() = coroutinesTestRule.testDispatcher.runBlockingTest {
    val subject = MainViewModel(mockObject)
    subject.sampleMethod()
    // Checks mockObject called the hashCode method that is expected from the coroutine created in sampleMethod
    verify(mockObject).hashCode()
  }
}
複製代碼

MainViewModelUnitTest 測試類中,咱們使用了 TestCoroutineDispatcher 提供的 runBlockingTest 函數。因爲 TestCoroutineDispatcher 重寫了 Dispatchers.Main,因此 MainViewModel 中的 Coroutines 將會在這個 Dispatcher 中運行。runBlockingTest 函數能夠保證全部測試代碼中的 Coroutines 都會同步執行。所以 verify 方法也將會在全部 Coroutines 運行完後纔會執行驗證行爲。

LifecycleScope

Usage

LifecycleScope 是 Lifecyle 的一個擴展屬性,所以咱們能夠在任何能夠拿到 Lifecyle 的地方(通常是 Activity/Fragment)使用它。每一個在 LifecycleScope 中啓動的 Coroutine 都會在 Lifecycle 銷燬的時候自動取消(cancel)。咱們能夠經過 lifecycle.coroutineScopelifecycleOwner.lifecycleScope 使用 Lifecycle 的 LifecycleScope。 下面這個例子,示範了怎麼使用 lifecycleOwner.lifecycleScope 來異步建立預計算的文本。

class MyFragment: Fragment() {
    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)
        viewLifecycleOwner.lifecycleScope.launch {
            val params = TextViewCompat.getTextMetricsParams(textView)
            val precomputedText = withContext(Dispatchers.Default) {
                PrecomputedTextCompat.create(longTextContent, params)
            }
            TextViewCompat.setPrecomputedText(textView, precomputedText)
        }
    }
}
複製代碼

Soure code

Lifecyle.kt

/** * [CoroutineScope] tied to this [Lifecycle]. * * This scope will be cancelled when the [Lifecycle] is destroyed. * * This scope is bound to [Dispatchers.Main] */
val Lifecycle.coroutineScope: LifecycleCoroutineScope
    get() {
        while (true) {
            val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
            if (existing != null) {
                return existing
            }
            val newScope = LifecycleCoroutineScopeImpl(
                this,
                SupervisorJob() + Dispatchers.Main
            )
            if (mInternalScopeRef.compareAndSet(null, newScope)) {
                newScope.register()
                return newScope
            }
        }
    }

/** * [CoroutineScope] tied to a [Lifecycle] and [Dispatchers.Main] * * This scope will be cancelled when the [Lifecycle] is destroyed. * * This scope provides specialised versions of `launch`: [launchWhenCreated], [launchWhenStarted], * [launchWhenResumed] */
abstract class LifecycleCoroutineScope internal constructor() : CoroutineScope {
    internal abstract val lifecycle: Lifecycle

    /** * Launches and runs the given block when the [Lifecycle] controlling this * [LifecycleCoroutineScope] is at least in [Lifecycle.State.CREATED] state. * * The returned [Job] will be cancelled when the [Lifecycle] is destroyed. * @see Lifecycle.whenCreated * @see Lifecycle.coroutineScope */
    fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job = launch {
        lifecycle.whenCreated(block)
    }

    /** * Launches and runs the given block when the [Lifecycle] controlling this * [LifecycleCoroutineScope] is at least in [Lifecycle.State.STARTED] state. * * The returned [Job] will be cancelled when the [Lifecycle] is destroyed. * @see Lifecycle.whenStarted * @see Lifecycle.coroutineScope */

    fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job = launch {
        lifecycle.whenStarted(block)
    }

    /** * Launches and runs the given block when the [Lifecycle] controlling this * [LifecycleCoroutineScope] is at least in [Lifecycle.State.RESUMED] state. * * The returned [Job] will be cancelled when the [Lifecycle] is destroyed. * @see Lifecycle.whenResumed * @see Lifecycle.coroutineScope */
    fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job = launch {
        lifecycle.whenResumed(block)
    }
}

internal class LifecycleCoroutineScopeImpl(
    override val lifecycle: Lifecycle,
    override val coroutineContext: CoroutineContext
) : LifecycleCoroutineScope(), LifecycleEventObserver {
    init {
        // in case we are initialized on a non-main thread, make a best effort check before
        // we return the scope. This is not sync but if developer is launching on a non-main
        // dispatcher, they cannot be 100% sure anyways.
        if (lifecycle.currentState == Lifecycle.State.DESTROYED) {
            coroutineContext.cancel()
        }
    }

    fun register() {
        // TODO use Main.Immediate once it is graduated out of experimental.
        launch(Dispatchers.Main) {
            if (lifecycle.currentState >= Lifecycle.State.INITIALIZED) {
                lifecycle.addObserver(this@LifecycleCoroutineScopeImpl)
            } else {
                coroutineContext.cancel()
            }
        }
    }

    override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
        if (lifecycle.currentState <= Lifecycle.State.DESTROYED) {
            lifecycle.removeObserver(this)
            coroutineContext.cancel()
        }
    }
}
複製代碼

LifecycleOwner.kt

/** * [CoroutineScope] tied to this [LifecycleOwner]'s [Lifecycle]. * * This scope will be cancelled when the [Lifecycle] is destroyed. * * This scope is bound to [Dispatchers.Main]. */
val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
    get() = lifecycle.coroutineScope

複製代碼

Suspend Lifecycle-aware coroutines

儘管 CoroutineScope 爲咱們提供了一種合適的方式來自動取消耗時操做,但在有些狀況下,Lifecycle 不在一個肯定的狀態時,也就是說是一個暫態,隨時可能會轉移到其餘狀態時,咱們可能想掛起(suspend)咱們的方法執行。舉個栗子,咱們必須在 Lifecycle 的狀態大於等於 STARTED 時,纔可使用 FragmentTransaction。對於這些狀況,Lifecycle 也提供了對應的擴展方法:lifecycle.whenCreatedlifecycle.whenStartedlifecycle.whenResumed。若是 Lifecycle 沒有到達指望的最小狀態時,運行在這些方法內的全部 Coroutines 都會被掛起。 下面這個例子展現了至少須要 Lifecycle 達到 STARTED 狀態時,纔會執行 lifecycle.whenStarted 裏面的代碼。

class MyFragment: Fragment {
    init { // Notice that we can safely launch in the constructor of the Fragment.
        lifecycleScope.launch {
            whenStarted {
                // The block inside will run only when Lifecycle is at least STARTED.
                // It will start executing when fragment is started and
                // can call other suspend methods.
                loadingView.visibility = View.VISIBLE
                val canAccess = withContext(Dispatchers.IO) {
                    checkUserAccess()
                }

                // When checkUserAccess returns, the next line is automatically
                // suspended if the Lifecycle is not *at least* STARTED.
                // We could safely run fragment transactions because we know the
                // code won't run unless the lifecycle is at least STARTED.
                loadingView.visibility = View.GONE
                if (canAccess == false) {
                    findNavController().popBackStack()
                } else {
                    showContent()
                }
            }

            // This line runs only after the whenStarted block above has completed.

        }
    }
}
複製代碼

Lifecyle 銷燬時,使用這些 when 方法啓動的 Coroutines 將會被自動取消。下面的例子,一旦 Lifecyle 的狀態變爲 DESTROYED, finally 代碼塊會當即執行。

class MyFragment: Fragment {
    init {
        lifecycleScope.launchWhenStarted {
            try {
                // Call some suspend functions.
            } finally {
                // This line might execute after Lifecycle is DESTROYED.
                if (lifecycle.state >= STARTED) {
                    // Here, since we've checked, it is safe to run any
                    // Fragment transactions.
                }
            }
        }
    }
}
複製代碼

注意:儘管這些 Lifecycle 的擴展屬性或擴展方法爲咱們提供了很大的便利,但在 Lifecyle 生命週期內,咱們最好在能保證消息有效的狀況下使用它們(好比上面的 precomputed text)。 另外須要注意的是,Activity 重啓(restart) 時,這些 Coroutines 不會被重啓(restart)。

Use coroutines with LiveData

咱們在使用 LiveData 時,常常須要異步獲取數據而後設置給 LiveData。好比,咱們可能須要讀取用戶設置,而後展現到 UI 上。這時,咱們可使用 liveData 擴展構造函數,在該函數內能夠調用 suspend 函數獲取數據並將結果傳遞給 LiveData。 以下所示,loadUser() 是一個 suspend 函數,經過查詢數據庫返回一個 User 對象。使用 liveData 擴展構造函數,咱們能夠異步調用 loadUser(),而後使用 emit() 方法改變 LiveDatavalue

val user: LiveData<User> = liveData {
    val data = database.loadUser() // loadUser is a suspend function.
    emit(data)
}
複製代碼

liveData 擴展構造函數爲 Coroutines 和 LiveData 提供告終構化併發(Structured concurrency)的支持。livedata 包含的代碼塊會在 LiveData 變爲 active 時自動執行,並在 LiveData 變爲 inactive 時在一個可配置的時間後自動取消。若該代碼塊在執行完以前就被取消了,那麼在 LiveData 再次 active 時,代碼塊也會從新執行。但若它已經執行完畢,則不會從新執行。而且該代碼塊只會在被自動取消的狀況下才會在 LiveData 再次 active 時從新執行。 若是該代碼塊因爲其餘緣由(好比拋出 CancelationExeption)被取消了,它也不會從新從新執行。 咱們能夠在 livedata 擴展構造函數內發射多個值。每次發射(調用 emit())都會執行主線程的 suspend 函數,直到該值被設置給 LiveData

val user: LiveData<Result> = liveData {
    emit(Result.loading())
    try {
        emit(Result.success(fetchUser())
    } catch(ioException: Exception) {
        emit(Result.error(ioException))
    }
}
複製代碼

咱們也可使用在 Transformations 提供的操做符內使用 liveData,以下所示:

class MyViewModel: ViewModel() {
    private val userId: LiveData<String> = MutableLiveData()
    val user = userId.switchMap { id ->
        liveData(context = viewModelScope.coroutineContext + Dispatchers.IO) {
            emit(database.loadUserById(id))
        }
    }
}
複製代碼

咱們也能夠在任什麼時候候使用 emitSource() 函數發射多個值給 LiveData。但要注意:每次調用 emit()emitSource() 都會清除以前添加的值。 可見源碼(LiveDataScope)。

class UserDao: Dao {
    @Query("SELECT * FROM User WHERE id = :id")
    fun getUser(id: String): LiveData<User>
}

class MyRepository {
    fun getUser(id: String) = liveData<User> {
        val disposable = emitSource(
            userDao.getUser(id).map {
                Result.loading(it)
            }
        )
        try {
            val user = webservice.fetchUser(id)
            // Stop the previous emission to avoid dispatching the updated user
            // as `loading`.
            disposable.dispose()
            // Update the database.
            userDao.insert(user)
            // Re-establish the emission with success type.
            emitSource(
                userDao.getUser(id).map {
                    Result.success(it)
                }
            )
        } catch(exception: IOException) {
            // Any call to `emit` disposes the previous one automatically so we don't
            // need to dispose it here as we didn't get an updated value.
            emitSource(
                userDao.getUser(id).map {
                    Result.error(exception, it)
                }
            )
        }
    }
}
複製代碼

Reference

聯繫

我是 xiaobailong24,您能夠經過如下平臺找到我:

相關文章
相關標籤/搜索