協程 Flow 最佳實踐 | 基於 Android 開發者峯會應用

本文介紹了咱們在開發 2019 Android 開發者峯會 (ADS) 應用時總結整理的 Flow 最佳實踐 (應用源碼已開源),咱們將和你們共同探討應用中的每一個層級將如何處理數據流。html

ADS 應用的架構遵照 Android 官方的 推薦架構指南,咱們在其中引入了 Domain 層 (用以囊括各類 UseCases 類) 來幫助分離焦點,進而保持代碼的精簡、複用性、可測試性。java

2019 ADS 應用的架構

2019 ADS 應用的架構android

更多關於應用架構指南的分層設計 (Data 層、Domain 層、UI 層),請參考 示例應用 | Plaid 2.0 重構ios

如同許多 Android 應用同樣,ADS 應用從網絡或緩存懶加載數據。咱們發現,這種場景很是適合 Flow。掛起函數 (suspend functions) 更適合於一次性操做。爲了使用協程,咱們將重構分爲兩次 commit 提交: 第一次 遷移了一次性操做,第二次 將其遷移至數據流。git

在本文中,您將看到咱們把應用從 "在全部層級使用 LiveData",重構爲 "只在 View 和 ViewModel 間使用 LiveData 進行通信,並在應用的底層和 UserCase 層架構中使用協程"。github

優先使用 Flow 來暴露數據流 (而不是 Channel)

您有兩種方法在協程中處理數據流: 一種是 Flow API,另外一種是 Channel API。Channels 是一種同步原語,而 Flows 是爲數據流模型所設計的: 它是訂閱數據流的工廠。不過咱們可使用 Channels 來支持 Flows,這一點咱們稍後再說。數據庫

相較於 Channel,Flow 更靈活,並提供了更明確的約束和更多操做符。數組

因爲末端操做符 (terminal operator) 會觸發數據流的執行,同時會根據生產者一側流操做來決定是成功完成操做仍是拋出異常,所以 Flows 會自動地關閉數據流,您基本不會在生產者一側泄漏資源;而一旦 Channel 沒有正確關閉,生產者可能不會清理大型資源,所以 Channels 更容易形成資源泄漏。緩存

應用數據層負責提供數據,一般是從數據庫中讀取,或從網絡獲取數據,例如,示例 是一個數據源接口,它提供了一個用戶事件數據流:網絡

interface UserEventDataSource {
  fun getObservableUserEvent(userId: String): Flow<UserEventResult>
}

如何將 Flow 應用在您的 Android 應用架構中

1. UseCase 層和 Repository 層

介於 View/ViewModel 和數據源之間的層 (在咱們的例子中是 UseCase 和 Repository) 一般須要合併來自多個查詢的數據,或在 ViewModel 層使用以前轉化數據。就像 Kotlin sequences 同樣,Flow 支持大量操做符來轉換數據。目前已經有 大量的可用的操做符,同時您也能夠建立您本身的轉換器 (好比,使用 transform 操做符)。不過 Flow 在許多的操做符中暴露了 suspend lambda 表達式,所以在大多數狀況下沒有必要經過自定義轉換來完成複雜任務,能夠直接在 Flow 中調用掛起函數。

在 ADS 應用中,咱們想將 UserEventResult 和 Repository 層中的會話數據進行綁定。咱們利用 map 操做符來將一個 suspend lambda 表達式應用在從數據源接收到的每個 Flow 的值上:

/* Copyright 2019 Google LLC.  
   SPDX-License-Identifier: Apache-2.0 */
class DefaultSessionAndUserEventRepository(
    private val userEventDataSource: UserEventDataSource,
    private val sessionRepository: SessionRepository
) : SessionAndUserEventRepository {

    override fun getObservableUserEvent(
        userId: String?,
        eventId: SessionId
    ): Flow<Result<LoadUserSessionUseCaseResult>> {
        // 處理 userId

        // 監聽用戶事件,並將其與 Session 數據進行合併
        return userEventDataSource.getObservableUserEvent(userId, eventId).map { userEventResult ->
            val event = sessionRepository.getSession(eventId)

           // 將 Session 和用戶數據進行合併,並傳遞結果
            val userSession = UserSession(
                event,
                userEventResult.userEvent ?: createDefaultUserEvent(event)
            )
            Result.Success(LoadUserSessionUseCaseResult(userSession))
        }
    }
}

2. ViewModel

在利用 LiveData 執行 UI ↔ ViewModel 通訊時,ViewModel 層應該利用末端操做符來消費來自數據層的數據流 (好比: collectfirst 或者是 toList) 。

/* Copyright 2019 Google LLC.  
   SPDX-License-Identifier: Apache-2.0 */
// 真實代碼的簡化版
class SessionDetailViewModel(
    private val loadUserSessionUseCase: LoadUserSessionUseCase,
    ...
): ViewModel() {

    private fun listenForUserSessionChanges(sessionId: SessionId) {
        viewModelScope.launch {
            loadUserSessionUseCase(sessionId).collect { loadResult ->
            }
        }
    }
}

完整代碼能夠參考 這裏.

若是您須要將 Flow 轉化爲 LiveData,則可使用 AndroidX lifecycle library 提供的 Flow.asLiveData() 擴展函數 (extension function)。這個擴展函數很是便於使用,由於它共享了 Flow 的底層訂閱,同時根據觀察者的生命週期管理訂閱。此外,LiveData 能夠爲後續添加的觀察者提供最新的數據,其訂閱在配置發生變動的時候依舊可以生效。下面利用一段簡單的代碼來演示如何使用這個擴展函數:

class SimplifiedSessionDetailViewModel(
  private val loadUserSessionUseCase: LoadUserSessionUseCase,
  ...
): ViewModel() {
  val sessions = loadUserSessionUseCase(sessionId).asLiveData()
}

特別說明: 這段代碼不是 ADS 應用的,它只是用來演示如何使用 Flow.asLiveData()。

具體實現時,該在什麼時候使用 BroadcastChannel 或者 Flow

回到數據源的實現,要怎樣去實現以前暴露的 getObservableUserEvent 函數?咱們考慮了兩種實現: flow 構造器,或 BroadcastChannel 接口,這兩種實現應用於不一樣的場景。

1. 何時使用 Flow ?

Flow 是一種 "冷流"(Cold Stream)。"冷流" 是一種數據源,該類數據源的生產者會在每一個監聽者開始消費事件的時候執行,從而在每一個訂閱上建立新的數據流。一旦消費者中止監聽或者生產者的阻塞結束,數據流將會被自動關閉。

Flow 很是適合須要開始/中止數據的產生來匹配觀察者的場景。

您能夠利用 flow 構造器來發送有限個/無限個元素。

val oneElementFlow: Flow<Int> = flow {
  // 生產者代碼開始執行,流被打開
  emit(1)
 // 生產者代碼結束,流將被關閉
}
val unlimitedElementFlow: Flow<Int> = flow {
 // 生產者代碼開始執行,流被打開
  while(true) {
    // 執行計算
    emit(result)
    delay(100)
  }
 // 生產者代碼結束,流將被關閉
}

Flow 經過協程取消功能提供自動清理功能,所以傾向於執行一些重型任務。請注意,這裏提到的取消是有條件的,一個永不掛起的 Flow 是永不會被取消的: 在咱們的例子中,因爲 delay 是一個掛起函數,用於檢查取消狀態,當訂閱者中止監聽時,Flow 將會中止並清理資源。

2. 何時使用 BroadcastChannel

Channel 是一個用於協程間通訊的併發原語。BroadcastChannel 基於 Channel,並加入了多播功能。

可能在這樣一些場景裏,您可能會考慮在數據源層中使用 BroadcastChannel:

若是生產者和消費者的生命週期不一樣或者彼此徹底獨立運行時,請使用 BroadcastChannel。

若是您但願生產者有獨立的生命週期,同時向任何存在的監聽者發送當前數據的時候,BroadcastChannel API 很是適合這種場景。在這種狀況下,當新的監聽者開始消費事件時,生產者不須要每次都被執行。

您依然能夠向調用者提供 Flow,它們不須要知道具體的實現。您可使用 BroadcastChannel.asFlow() 這個擴展函數來將一個 BroadcastChannel 做爲一個 Flow 使用。

不過,關閉這個特殊的 Flow 不會取消訂閱。當使用 BroadcastChannel 的時候,您必須本身管理生命週期。BroadcastChannel 沒法感知到當前是否還存在監聽者,除非關閉或取消 BroadcastChannel,不然將會一直持有資源。請確保在不須要 BroadcastChannel 的時候將其關閉。同時請注意關閉後的 BroadcastChannel 沒法再次被使用,若是須要,您須要從新建立實例。

接下來,咱們將分享如何使用 BroadcastChannel API 的示例。

3. 特別說明

部分 Flow 和 Channel API 仍處於實驗階段,極可能會發生變更。在一些狀況下,您可能會正在使用 Channel,不過在將來可能會建議您使用 Flow。具體來說,StateFlow 和 Flow 的 share operator 方案可能在將來會減小 Channel 的使用。

將數據流中基於回調的 API 轉化爲協程

包含 Room 在內的不少庫已經支持將協程用於數據流操做。對於那些還不支持的庫,您能夠將任何基於回調的 API 轉換爲協程。

1. Flow 的實現

若是您想將一個基於回調的流 API 轉換爲使用 Flow,您可使用 channelFlow 函數 (固然也可使用 callbackFlow,它們都基於相同的實現)。channelFlow 將會建立一個 Flow 的實例,該實例中的元素將傳遞給一個 Channel。這樣能夠容許咱們在不一樣的上下文或併發中提供元素。

如下示例中,咱們想要把從回調中拿到的元素髮送到 Flow 中:

  1. 利用 channelFlow 構造器建立一個能夠把回調註冊到第三方庫的流;
  2. 將從回調接收到的全部數據傳遞給 Flow;
  3. 當訂閱者中止監聽,咱們利用掛 起函數 "awaitClose" 來解除 API 的訂閱。
/* Copyright 2019 Google LLC.  
   SPDX-License-Identifier: Apache-2.0 */
override fun getObservableUserEvent(userId: String, eventId: SessionId): Flow<UserEventResult> {
     // 1) 利用 channelFlow 建立一個 Flow
    return channelFlow<UserEventResult> {

        val eventDocument = firestore.collection(USERS_COLLECTION)
            .document(userId)
            .collection(EVENTS_COLLECTION)
            .document(eventId)
    
       // 1) 將回調註冊到 API 上
        val subscription = eventDocument.addSnapshotListener { snapshot, _ ->
            val userEvent = if (snapshot.exists()) {
                parseUserEvent(snapshot)
            } else { null }
            
            // 2) 將數據發送到 Flow
            channel.offer(UserEventResult(userEvent))
        }
        // 3) 請不要關閉數據流,在消費者關閉或者 API 調用 onCompleted/onError 函數以前,請保證數據流
       // 一直處於打開狀態。
        // 當數據流關閉後,請取消第三方庫的訂閱。
        awaitClose { subscription.remove() }
    }
}

詳細代碼能夠參考 這裏

2. BroadcastChannel 實現

對於使用 Firestore 跟蹤用戶身份認證的數據流,咱們使用了 BroadcastChannel API,由於咱們但願註冊一個有獨立生命週期的 Authentication 監聽者,同時也但願能向全部正在監聽的對象廣播當前的結果。

轉化回調 API 爲 BroadcastChannel 相比轉化爲 Flow 要略複雜一點。您能夠建立一個類,並設置將實例化後的 BroadcastChannel 做爲變量保存。在初始化期間,註冊回調,像之前同樣將元素髮送到 BroadcastChannel:

/* Copyright 2019 Google LLC.  
   SPDX-License-Identifier: Apache-2.0 */
class FirebaseAuthStateUserDataSource(...) : AuthStateUserDataSource {

    private val channel = ConflatedBroadcastChannel<Result<AuthenticatedUserInfo>>()

    private val listener: ((FirebaseAuth) -> Unit) = { auth ->
       // 數據處理邏輯
       
       // 將當前的用戶 (數據) 發送給消費者
        if (!channel.isClosedForSend) {
            channel.offer(Success(FirebaseUserInfo(auth.currentUser)))
        } else {
            unregisterListener()
        }
    }

    @Synchronized
    override fun getBasicUserInfo(): Flow<Result<AuthenticatedUserInfo>> {
        if (!isListening) {
            firebase.addAuthStateListener(listener)
            isListening = true
        }
        return channel.asFlow()
    }
}

詳細代碼能夠參考 這裏

測試小建議

爲了測試 Flow 轉換 (就像咱們在 UseCase 和 Repository 層中所作的那樣),您能夠利用 flow 構造器返回一個假數據,例如:

/* Copyright 2019 Google LLC.
   SPDX-License-Identifier: Apache-2.0 */
object FakeUserEventDataSource : UserEventDataSource {
  override fun getObservableUserEvents(userId: String) = flow {
    emit(UserEventsResult(userEvents))
  }
}
class DefaultSessionAndUserEventRepositoryTest {
  @Test
  fun observableUserEvents_areMappedCorrectly() = runBlockingTest {
   // 準備一個 repo
    val userEvents = repository
          .getObservableUserEvents("user", true).first()
   // 對接收到的用戶事件進行斷言
  }
}

爲了成功完成測試,一個比較好的作法是使用 take 操做符來從 Flow 中獲取一些數據,使用 toList 做爲末端操做符來從數組中獲取結果。示例以下:

class AnotherStreamDataSourceImplTest {
  @Test
  fun `Test happy path`() = runBlockingTest {
   //準備好 subject
    val result = subject.flow.take(1).toList()
   // 斷言結果和預期的一致
  }
}

take 操做符很是適合在獲取到數據後關閉 Flow。在測試完畢後不關閉 Flow 或 BroadcastChannel 將會致使內存泄漏以及測試結果不一致。

注意: 若是在數據源的實現是經過 BroadcastChannel 完成的,那麼上面的代碼還不夠。您須要本身管理數據源的生命週期,並確保 BroadcastChannel 在測試開始以前已經啓動,同時須要在測試結束後將其關閉,不然將會致使內存泄漏。你能夠 在這裏獲取更多信息

協程測試的最佳實踐在這裏依然適用。若是您在測試代碼中建立新的協程,則可能想要在測試線程中執行它來確保測試得到執行。

您也能夠經過視頻回顧 2019 Android 開發者峯會演講 —— 在 Android 上測試協程 獲取更多相關信息。

總結

  • 由於 Flow 所提供的更加明確的約束和各類操做符,咱們更建議向消費者暴露 Flow 而不是 Channel;
  • 使用 Flow 時,生產者會在每次有新的監聽者時被執行,同時數據流的生命週期將會被自動處理;
  • 使用 BroadcastChannel 時,您能夠共享生產者,但須要本身管理它的生命週期;
  • 請考慮將基於回調的 API 轉化爲協程,以便在您的應用中更好、更慣用地集成 API;
  • 使用 taketoList 操做符能夠簡化 Flow 的相關代碼測試。

2019 ADS 應用在 GitHub 開源,請點擊 這裏 在 GitHub 上查看更詳細的代碼實現。

相關文章
相關標籤/搜索