本文介紹了咱們在開發 2019 Android 開發者峯會 (ADS) 應用時總結整理的 Flow 最佳實踐 (應用源碼已開源),咱們將和你們共同探討應用中的每一個層級將如何處理數據流。html
ADS 應用的架構遵照 Android 官方的推薦架構指南,咱們在其中引入了 Domain 層 (用以囊括各類 UseCases 類) 來幫助分離焦點,進而保持代碼的精簡、複用性、可測試性。java
如同許多 Android 應用同樣,ADS 應用從網絡或緩存懶加載數據。咱們發現,這種場景很是適合 Flow。掛起函數 (suspend functions) 更適合於一次性操做。爲了使用協程,咱們將重構分爲兩次 commit 提交: 第一次遷移了一次性操做,第二次將其遷移至數據流。android
在本文中,您將看到咱們把應用從 "在全部層級使用 LiveData",重構爲 "只在 View 和 ViewModel 間使用 LiveData 進行通信,並在應用的底層和 UserCase 層架構中使用協程"。ios
您有兩種方法在協程中處理數據流: 一種是 Flow API,另外一種是 Channel API。Channels 是一種同步原語,而 Flows 是爲數據流模型所設計的: 它是訂閱數據流的工廠。不過咱們可使用 Channels 來支持 Flows,這一點咱們稍後再說。git
相較於 Channel,Flow 更靈活,並提供了更明確的約束和更多操做符。github
因爲末端操做符 (terminal operator) 會觸發數據流的執行,同時會根據生產者一側流操做來決定是成功完成操做仍是拋出異常,所以 Flows 會自動地關閉數據流,您基本不會在生產者一側泄漏資源;而一旦 Channel 沒有正確關閉,生產者可能不會清理大型資源,所以 Channels 更容易形成資源泄漏。數據庫
應用數據層負責提供數據,一般是從數據庫中讀取,或從網絡獲取數據,例如,示例是一個數據源接口,它提供了一個用戶事件數據流:數組
interface UserEventDataSource {
fun getObservableUserEvent(userId: String): Flow<UserEventResult>
}
複製代碼
1. UseCase 層和 Repository 層緩存
介於 View/ViewModel 和數據源之間的層 (在咱們的例子中是 UseCase 和 Repository) 一般須要合併來自多個查詢的數據,或在 ViewModel 層使用以前轉化數據。就像 Kotlin sequences 同樣,Flow 支持大量操做符來轉換數據。目前已經有大量的可用的操做符,同時您也能夠建立您本身的轉換器 (好比,使用 transform 操做符)。不過 Flow 在許多的操做符中暴露了 suspend lambda 表達式,所以在大多數狀況下沒有必要經過自定義轉換來完成複雜任務,能夠直接在 Flow 中調用掛起函數。bash
在 ADS 應用中,咱們想將 UserEventResult 和 Repository 層中的會話數據進行綁定。咱們利用 map 操做符來將一個 suspend lambda 表達式應用在從數據源接收到的每個 Flow 的值上:
/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
class DefaultSessionAndUserEventRepository(
private val userEventDataSource: UserEventDataSource,
private val sessionRepository: SessionRepository
) : SessionAndUserEventRepository {
override fun getObservableUserEvent(
userId: String?,
eventId: SessionId
): Flow<Result<LoadUserSessionUseCaseResult>> {
// 處理 userId
// 監聽用戶事件,並將其與 Session 數據進行合併
return userEventDataSource.getObservableUserEvent(userId, eventId).map { userEventResult ->
val event = sessionRepository.getSession(eventId)
// 將 Session 和用戶數據進行合併,並傳遞結果
val userSession = UserSession(
event,
userEventResult.userEvent ?: createDefaultUserEvent(event)
)
Result.Success(LoadUserSessionUseCaseResult(userSession))
}
}
}
複製代碼
2. ViewModel
在利用 LiveData 執行 UI ↔ ViewModel 通訊時,ViewModel 層應該利用末端操做符來消費來自數據層的數據流 (好比: collect、first 或者是 toList) 。
/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
// 真實代碼的簡化版
class SessionDetailViewModel(
private val loadUserSessionUseCase: LoadUserSessionUseCase,
...
): ViewModel() {
private fun listenForUserSessionChanges(sessionId: SessionId) {
viewModelScope.launch {
loadUserSessionUseCase(sessionId).collect { loadResult ->
}
}
}
}
複製代碼
完整代碼能夠參考這裏: github.com/google/iosc…
若是您須要將 Flow 轉化爲 LiveData,則可使用 AndroidX lifecycle library 提供的 Flow.asLiveData() 擴展函數 (extension function)。這個擴展函數很是便於使用,由於它共享了 Flow 的底層訂閱,同時根據觀察者的生命週期管理訂閱。此外,LiveData 能夠爲後續添加的觀察者提供最新的數據,其訂閱在配置發生變動的時候依舊可以生效。下面利用一段簡單的代碼來演示如何使用這個擴展函數:
class SimplifiedSessionDetailViewModel(
private val loadUserSessionUseCase: LoadUserSessionUseCase,
...
): ViewModel() {
val sessions = loadUserSessionUseCase(sessionId).asLiveData()
}
複製代碼
特別說明: 這段代碼不是 ADS 應用的,它只是用來演示如何使用 Flow.asLiveData()。
回到數據源的實現,要怎樣去實現以前暴露的 getObservableUserEvent 函數?咱們考慮了兩種實現: flow 構造器,或 BroadcastChannel 接口,這兩種實現應用於不一樣的場景。
1. 何時使用 Flow ?
Flow 是一種 "冷流"(Cold Stream)。"冷流" 是一種數據源,該類數據源的生產者會在每一個監聽者開始消費事件的時候執行,從而在每一個訂閱上建立新的數據流。一旦消費者中止監聽或者生產者的阻塞結束,數據流將會被自動關閉。
Flow 很是適合須要開始/中止數據的產生來匹配觀察者的場景。
您能夠利用 flow 構造器來發送有限個/無限個元素。
val oneElementFlow: Flow<Int> = flow {
// 生產者代碼開始執行,流被打開
emit(1)
// 生產者代碼結束,流將被關閉
}
val unlimitedElementFlow: Flow<Int> = flow {
// 生產者代碼開始執行,流被打開
while(true) {
// 執行計算
emit(result)
delay(100)
}
// 生產者代碼結束,流將被關閉
}
複製代碼
Flow 經過協程取消功能提供自動清理功能,所以傾向於執行一些重型任務。請注意,這裏提到的取消是有條件的,一個永不掛起的 Flow 是永不會被取消的: 在咱們的例子中,因爲 delay 是一個掛起函數,用於檢查取消狀態,當訂閱者中止監聽時,Flow 將會中止並清理資源。
2. 何時使用 BroadcastChannel
Channel 是一個用於協程間通訊的併發原語。BroadcastChannel 基於 Channel,並加入了多播功能。
可能在這樣一些場景裏,您可能會考慮在數據源層中使用 BroadcastChannel:
若是生產者和消費者的生命週期不一樣或者彼此徹底獨立運行時,請使用 BroadcastChannel。
若是您但願生產者有獨立的生命週期,同時向任何存在的監聽者發送當前數據的時候,BroadcastChannel API 很是適合這種場景。在這種狀況下,當新的監聽者開始消費事件時,生產者不須要每次都被執行。
您依然能夠向調用者提供 Flow,它們不須要知道具體的實現。您可使用 BroadcastChannel.asFlow() 這個擴展函數來將一個 BroadcastChannel 做爲一個 Flow 使用。
不過,關閉這個特殊的 Flow 不會取消訂閱。當使用 BroadcastChannel 的時候,您必須本身管理生命週期。BroadcastChannel 沒法感知到當前是否還存在監聽者,除非關閉或取消 BroadcastChannel,不然將會一直持有資源。請確保在不須要 BroadcastChannel 的時候將其關閉。同時請注意關閉後的 BroadcastChannel 沒法再次被使用,若是須要,您須要從新建立實例。
接下來,咱們將分享如何使用 BroadcastChannel API 的示例。
3. 特別說明
部分 Flow 和 Channel API 仍處於實驗階段,極可能會發生變更。在一些狀況下,您可能會正在使用 Channel,不過在將來可能會建議您使用 Flow。具體來說,StateFlow 和 Flow 的 share operator 方案可能在將來會減小 Channel 的使用。
包含 Room 在內的不少庫已經支持將協程用於數據流操做。對於那些還不支持的庫,您能夠將任何基於回調的 API 轉換爲協程。
1. Flow 的實現
若是您想將一個基於回調的流 API 轉換爲使用 Flow,您可使用 channelFlow 函數 (固然也可使用 callbackFlow,它們都基於相同的實現)。channelFlow 將會建立一個 Flow 的實例,該實例中的元素將傳遞給一個 Channel。這樣能夠容許咱們在不一樣的上下文或併發中提供元素。
如下示例中,咱們想要把從回調中拿到的元素髮送到 Flow 中:
/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
override fun getObservableUserEvent(userId: String, eventId: SessionId): Flow<UserEventResult> {
// 1) 利用 channelFlow 建立一個 Flow
return channelFlow<UserEventResult> {
val eventDocument = firestore.collection(USERS_COLLECTION)
.document(userId)
.collection(EVENTS_COLLECTION)
.document(eventId)
// 1) 將回調註冊到 API 上
val subscription = eventDocument.addSnapshotListener { snapshot, _ ->
val userEvent = if (snapshot.exists()) {
parseUserEvent(snapshot)
} else { null }
// 2) 將數據發送到 Flow
channel.offer(UserEventResult(userEvent))
}
// 3) 請不要關閉數據流,在消費者關閉或者 API 調用 onCompleted/onError 函數以前,請保證數據流
// 一直處於打開狀態。
// 當數據流關閉後,請取消第三方庫的訂閱。
awaitClose { subscription.remove() }
}
}
複製代碼
詳細代碼能夠參考這裏:
2. BroadcastChannel 實現
對於使用 Firestore 跟蹤用戶身份認證的數據流,咱們使用了 BroadcastChannel API,由於咱們但願註冊一個有獨立生命週期的 Authentication 監聽者,同時也但願能向全部正在監聽的對象廣播當前的結果。
轉化回調 API 爲 BroadcastChannel 相比轉化爲 Flow 要略複雜一點。您能夠建立一個類,並設置將實例化後的 BroadcastChannel 做爲變量保存。在初始化期間,註冊回調,像之前同樣將元素髮送到 BroadcastChannel:
/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
class FirebaseAuthStateUserDataSource(...) : AuthStateUserDataSource {
private val channel = ConflatedBroadcastChannel<Result<AuthenticatedUserInfo>>()
private val listener: ((FirebaseAuth) -> Unit) = { auth ->
// 數據處理邏輯
// 將當前的用戶 (數據) 發送給消費者
if (!channel.isClosedForSend) {
channel.offer(Success(FirebaseUserInfo(auth.currentUser)))
} else {
unregisterListener()
}
}
@Synchronized
override fun getBasicUserInfo(): Flow<Result<AuthenticatedUserInfo>> {
if (!isListening) {
firebase.addAuthStateListener(listener)
isListening = true
}
return channel.asFlow()
}
}
複製代碼
爲了測試 Flow 轉換 (就像咱們在 UseCase 和 Repository 層中所作的那樣),您能夠利用 flow 構造器返回一個假數據,例如:
/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
object FakeUserEventDataSource : UserEventDataSource {
override fun getObservableUserEvents(userId: String) = flow {
emit(UserEventsResult(userEvents))
}
}
class DefaultSessionAndUserEventRepositoryTest {
@Test
fun observableUserEvents_areMappedCorrectly() = runBlockingTest {
// 準備一個 repo
val userEvents = repository
.getObservableUserEvents("user", true).first()
// 對接收到的用戶事件進行斷言
}
}
複製代碼
爲了成功完成測試,一個比較好的作法是使用 take 操做符來從 Flow 中獲取一些數據,使用 toList 做爲末端操做符來從數組中獲取結果。示例以下:
class AnotherStreamDataSourceImplTest {
@Test
fun `Test happy path`() = runBlockingTest {
//準備好 subject
val result = subject.flow.take(1).toList()
// 斷言結果和預期的一致
}
}
複製代碼
take 操做符很是適合在獲取到數據後關閉 Flow。在測試完畢後不關閉 Flow 或 BroadcastChannel 將會致使內存泄漏以及測試結果不一致。
注意: 若是在數據源的實現是經過 BroadcastChannel 完成的,那麼上面的代碼還不夠。您須要本身管理數據源的生命週期,並確保 BroadcastChannel 在測試開始以前已經啓動,同時須要在測試結束後將其關閉,不然將會致使內存泄漏。
協程測試的最佳實踐在這裏依然適用。若是您在測試代碼中建立新的協程,則可能想要在測試線程中執行它來確保測試得到執行。
您也能夠經過視頻回顧 2019 Android 開發者峯會演講 —— 在 Android 上測試協程:
點擊查看視頻:v.qq.com/x/page/d303…
2019 ADS 應用在 GitHub 開源,請訪問下方連接在 GitHub 上查看更詳細的代碼實現: github.com/google/iosc…