本文介紹了咱們在開發 2019 Android 開發者峯會 (ADS) 應用時總結整理的 Flow 最佳實踐 (應用源碼已開源),咱們將和你們共同探討應用中的每一個層級將如何處理數據流。html
ADS 應用的架構遵照 Android 官方的 推薦架構指南,咱們在其中引入了 Domain 層 (用以囊括各類 UseCases 類) 來幫助分離焦點,進而保持代碼的精簡、複用性、可測試性。java
2019 ADS 應用的架構android
更多關於應用架構指南的分層設計 (Data 層、Domain 層、UI 層),請參考 示例應用 | Plaid 2.0 重構。ios
如同許多 Android 應用同樣,ADS 應用從網絡或緩存懶加載數據。咱們發現,這種場景很是適合 Flow。掛起函數 (suspend functions) 更適合於一次性操做。爲了使用協程,咱們將重構分爲兩次 commit 提交: 第一次 遷移了一次性操做,第二次 將其遷移至數據流。git
在本文中,您將看到咱們把應用從 "在全部層級使用 LiveData",重構爲 "只在 View 和 ViewModel 間使用 LiveData 進行通信,並在應用的底層和 UserCase 層架構中使用協程"。github
您有兩種方法在協程中處理數據流: 一種是 Flow API,另外一種是 Channel API。Channels 是一種同步原語,而 Flows 是爲數據流模型所設計的: 它是訂閱數據流的工廠。不過咱們可使用 Channels 來支持 Flows,這一點咱們稍後再說。數據庫
相較於 Channel,Flow 更靈活,並提供了更明確的約束和更多操做符。數組
因爲末端操做符 (terminal operator) 會觸發數據流的執行,同時會根據生產者一側流操做來決定是成功完成操做仍是拋出異常,所以 Flows 會自動地關閉數據流,您基本不會在生產者一側泄漏資源;而一旦 Channel 沒有正確關閉,生產者可能不會清理大型資源,所以 Channels 更容易形成資源泄漏。緩存
應用數據層負責提供數據,一般是從數據庫中讀取,或從網絡獲取數據,例如,示例 是一個數據源接口,它提供了一個用戶事件數據流:網絡
interface UserEventDataSource { fun getObservableUserEvent(userId: String): Flow<UserEventResult> }
1. UseCase 層和 Repository 層
介於 View/ViewModel 和數據源之間的層 (在咱們的例子中是 UseCase 和 Repository) 一般須要合併來自多個查詢的數據,或在 ViewModel 層使用以前轉化數據。就像 Kotlin sequences 同樣,Flow 支持大量操做符來轉換數據。目前已經有 大量的可用的操做符,同時您也能夠建立您本身的轉換器 (好比,使用 transform 操做符)。不過 Flow 在許多的操做符中暴露了 suspend lambda 表達式,所以在大多數狀況下沒有必要經過自定義轉換來完成複雜任務,能夠直接在 Flow 中調用掛起函數。
在 ADS 應用中,咱們想將 UserEventResult 和 Repository 層中的會話數據進行綁定。咱們利用 map 操做符來將一個 suspend lambda 表達式應用在從數據源接收到的每個 Flow 的值上:
/* Copyright 2019 Google LLC. SPDX-License-Identifier: Apache-2.0 */ class DefaultSessionAndUserEventRepository( private val userEventDataSource: UserEventDataSource, private val sessionRepository: SessionRepository ) : SessionAndUserEventRepository { override fun getObservableUserEvent( userId: String?, eventId: SessionId ): Flow<Result<LoadUserSessionUseCaseResult>> { // 處理 userId // 監聽用戶事件,並將其與 Session 數據進行合併 return userEventDataSource.getObservableUserEvent(userId, eventId).map { userEventResult -> val event = sessionRepository.getSession(eventId) // 將 Session 和用戶數據進行合併,並傳遞結果 val userSession = UserSession( event, userEventResult.userEvent ?: createDefaultUserEvent(event) ) Result.Success(LoadUserSessionUseCaseResult(userSession)) } } }
2. ViewModel
在利用 LiveData 執行 UI ↔ ViewModel 通訊時,ViewModel 層應該利用末端操做符來消費來自數據層的數據流 (好比: collect、first 或者是 toList) 。
/* Copyright 2019 Google LLC. SPDX-License-Identifier: Apache-2.0 */ // 真實代碼的簡化版 class SessionDetailViewModel( private val loadUserSessionUseCase: LoadUserSessionUseCase, ... ): ViewModel() { private fun listenForUserSessionChanges(sessionId: SessionId) { viewModelScope.launch { loadUserSessionUseCase(sessionId).collect { loadResult -> } } } }
完整代碼能夠參考 這裏.
若是您須要將 Flow 轉化爲 LiveData,則可使用 AndroidX lifecycle library 提供的 Flow.asLiveData() 擴展函數 (extension function)。這個擴展函數很是便於使用,由於它共享了 Flow 的底層訂閱,同時根據觀察者的生命週期管理訂閱。此外,LiveData 能夠爲後續添加的觀察者提供最新的數據,其訂閱在配置發生變動的時候依舊可以生效。下面利用一段簡單的代碼來演示如何使用這個擴展函數:
class SimplifiedSessionDetailViewModel( private val loadUserSessionUseCase: LoadUserSessionUseCase, ... ): ViewModel() { val sessions = loadUserSessionUseCase(sessionId).asLiveData() }
特別說明: 這段代碼不是 ADS 應用的,它只是用來演示如何使用 Flow.asLiveData()。
回到數據源的實現,要怎樣去實現以前暴露的 getObservableUserEvent 函數?咱們考慮了兩種實現: flow 構造器,或 BroadcastChannel 接口,這兩種實現應用於不一樣的場景。
1. 何時使用 Flow ?
Flow 是一種 "冷流"(Cold Stream)。"冷流" 是一種數據源,該類數據源的生產者會在每一個監聽者開始消費事件的時候執行,從而在每一個訂閱上建立新的數據流。一旦消費者中止監聽或者生產者的阻塞結束,數據流將會被自動關閉。
Flow 很是適合須要開始/中止數據的產生來匹配觀察者的場景。
您能夠利用 flow 構造器來發送有限個/無限個元素。
val oneElementFlow: Flow<Int> = flow { // 生產者代碼開始執行,流被打開 emit(1) // 生產者代碼結束,流將被關閉 } val unlimitedElementFlow: Flow<Int> = flow { // 生產者代碼開始執行,流被打開 while(true) { // 執行計算 emit(result) delay(100) } // 生產者代碼結束,流將被關閉 }
Flow 經過協程取消功能提供自動清理功能,所以傾向於執行一些重型任務。請注意,這裏提到的取消是有條件的,一個永不掛起的 Flow 是永不會被取消的: 在咱們的例子中,因爲 delay 是一個掛起函數,用於檢查取消狀態,當訂閱者中止監聽時,Flow 將會中止並清理資源。
2. 何時使用 BroadcastChannel
Channel 是一個用於協程間通訊的併發原語。BroadcastChannel 基於 Channel,並加入了多播功能。
可能在這樣一些場景裏,您可能會考慮在數據源層中使用 BroadcastChannel:
若是生產者和消費者的生命週期不一樣或者彼此徹底獨立運行時,請使用 BroadcastChannel。
若是您但願生產者有獨立的生命週期,同時向任何存在的監聽者發送當前數據的時候,BroadcastChannel API 很是適合這種場景。在這種狀況下,當新的監聽者開始消費事件時,生產者不須要每次都被執行。
您依然能夠向調用者提供 Flow,它們不須要知道具體的實現。您可使用 BroadcastChannel.asFlow() 這個擴展函數來將一個 BroadcastChannel 做爲一個 Flow 使用。
不過,關閉這個特殊的 Flow 不會取消訂閱。當使用 BroadcastChannel 的時候,您必須本身管理生命週期。BroadcastChannel 沒法感知到當前是否還存在監聽者,除非關閉或取消 BroadcastChannel,不然將會一直持有資源。請確保在不須要 BroadcastChannel 的時候將其關閉。同時請注意關閉後的 BroadcastChannel 沒法再次被使用,若是須要,您須要從新建立實例。
接下來,咱們將分享如何使用 BroadcastChannel API 的示例。
3. 特別說明
部分 Flow 和 Channel API 仍處於實驗階段,極可能會發生變更。在一些狀況下,您可能會正在使用 Channel,不過在將來可能會建議您使用 Flow。具體來說,StateFlow 和 Flow 的 share operator 方案可能在將來會減小 Channel 的使用。
將數據流中基於回調的 API 轉化爲協程
包含 Room 在內的不少庫已經支持將協程用於數據流操做。對於那些還不支持的庫,您能夠將任何基於回調的 API 轉換爲協程。
1. Flow 的實現
若是您想將一個基於回調的流 API 轉換爲使用 Flow,您可使用 channelFlow 函數 (固然也可使用 callbackFlow,它們都基於相同的實現)。channelFlow 將會建立一個 Flow 的實例,該實例中的元素將傳遞給一個 Channel。這樣能夠容許咱們在不一樣的上下文或併發中提供元素。
如下示例中,咱們想要把從回調中拿到的元素髮送到 Flow 中:
/* Copyright 2019 Google LLC. SPDX-License-Identifier: Apache-2.0 */ override fun getObservableUserEvent(userId: String, eventId: SessionId): Flow<UserEventResult> { // 1) 利用 channelFlow 建立一個 Flow return channelFlow<UserEventResult> { val eventDocument = firestore.collection(USERS_COLLECTION) .document(userId) .collection(EVENTS_COLLECTION) .document(eventId) // 1) 將回調註冊到 API 上 val subscription = eventDocument.addSnapshotListener { snapshot, _ -> val userEvent = if (snapshot.exists()) { parseUserEvent(snapshot) } else { null } // 2) 將數據發送到 Flow channel.offer(UserEventResult(userEvent)) } // 3) 請不要關閉數據流,在消費者關閉或者 API 調用 onCompleted/onError 函數以前,請保證數據流 // 一直處於打開狀態。 // 當數據流關閉後,請取消第三方庫的訂閱。 awaitClose { subscription.remove() } } }
詳細代碼能夠參考 這裏。
2. BroadcastChannel 實現
對於使用 Firestore 跟蹤用戶身份認證的數據流,咱們使用了 BroadcastChannel API,由於咱們但願註冊一個有獨立生命週期的 Authentication 監聽者,同時也但願能向全部正在監聽的對象廣播當前的結果。
轉化回調 API 爲 BroadcastChannel 相比轉化爲 Flow 要略複雜一點。您能夠建立一個類,並設置將實例化後的 BroadcastChannel 做爲變量保存。在初始化期間,註冊回調,像之前同樣將元素髮送到 BroadcastChannel:
/* Copyright 2019 Google LLC. SPDX-License-Identifier: Apache-2.0 */ class FirebaseAuthStateUserDataSource(...) : AuthStateUserDataSource { private val channel = ConflatedBroadcastChannel<Result<AuthenticatedUserInfo>>() private val listener: ((FirebaseAuth) -> Unit) = { auth -> // 數據處理邏輯 // 將當前的用戶 (數據) 發送給消費者 if (!channel.isClosedForSend) { channel.offer(Success(FirebaseUserInfo(auth.currentUser))) } else { unregisterListener() } } @Synchronized override fun getBasicUserInfo(): Flow<Result<AuthenticatedUserInfo>> { if (!isListening) { firebase.addAuthStateListener(listener) isListening = true } return channel.asFlow() } }
詳細代碼能夠參考 這裏
爲了測試 Flow 轉換 (就像咱們在 UseCase 和 Repository 層中所作的那樣),您能夠利用 flow 構造器返回一個假數據,例如:
/* Copyright 2019 Google LLC. SPDX-License-Identifier: Apache-2.0 */ object FakeUserEventDataSource : UserEventDataSource { override fun getObservableUserEvents(userId: String) = flow { emit(UserEventsResult(userEvents)) } } class DefaultSessionAndUserEventRepositoryTest { @Test fun observableUserEvents_areMappedCorrectly() = runBlockingTest { // 準備一個 repo val userEvents = repository .getObservableUserEvents("user", true).first() // 對接收到的用戶事件進行斷言 } }
爲了成功完成測試,一個比較好的作法是使用 take 操做符來從 Flow 中獲取一些數據,使用 toList 做爲末端操做符來從數組中獲取結果。示例以下:
class AnotherStreamDataSourceImplTest { @Test fun `Test happy path`() = runBlockingTest { //準備好 subject val result = subject.flow.take(1).toList() // 斷言結果和預期的一致 } }
take 操做符很是適合在獲取到數據後關閉 Flow。在測試完畢後不關閉 Flow 或 BroadcastChannel 將會致使內存泄漏以及測試結果不一致。
注意: 若是在數據源的實現是經過 BroadcastChannel 完成的,那麼上面的代碼還不夠。您須要本身管理數據源的生命週期,並確保 BroadcastChannel 在測試開始以前已經啓動,同時須要在測試結束後將其關閉,不然將會致使內存泄漏。你能夠 在這裏獲取更多信息。
協程測試的最佳實踐在這裏依然適用。若是您在測試代碼中建立新的協程,則可能想要在測試線程中執行它來確保測試得到執行。
您也能夠經過視頻回顧 2019 Android 開發者峯會演講 —— 在 Android 上測試協程 獲取更多相關信息。
2019 ADS 應用在 GitHub 開源,請點擊 這裏 在 GitHub 上查看更詳細的代碼實現。