原文:Flow: an intro for an RxJava user
做者:Mohamed Ibrahim
譯者:Fly_with24html
RxJava
多是我使用的最重要的庫,Rx 一般是編寫代碼的另外一種範式,Kotlin
做爲一種新的編程語言,使它能夠輕鬆實現將協程驅動的 flow 實現爲本身的 Rx 實現。 我可能在 Hello Kotlin Coroutines 中介紹了協程,這對於理解 flow 頗有必要java
Kotlin
具備一組擴展,以方便使用集合。 但它不是響應式的git
listOf("Madara", "Kakashi", "Naruto", "Jiraya", "Itachi")
.map { it.length }
.filter { it > 4 }
.forEach {
println(it)
}
複製代碼
在此示例中,若是您深刻研究 map 函數源代碼,您將發現這裏沒有魔法,它只是列表的循環,進行了一些轉換而後爲您提供了一個新列表。 過濾器也同樣。 這種機制稱爲 eager evaluation
,該函數在整個列表中進行操做並提供一個新列表。 可是若是咱們不須要建立這些臨時列表以節省一些內存,那咱們可使用 Sequencesgithub
listOf("Madara", "Kakashi", "Naruto", "Jiraya", "Itachi")
// 使用 Sequence
.asSequence()
.map { it.length }
.filter { it > 4 }
.forEach {
println(it)
}
複製代碼
這裏的區別就是先調用 asSequence
方法,而後使用咱們的操做,再次 查看 map 方法後,咱們發現了一些不一樣之處,它只是 sequence 的修飾符,返回值類型也是 sequence
。 使用 sequence map
時,只能一項一項地進行操做。列表較大時,sequence
比普通集合要好得多。sequence
能夠同步完成其工做,有沒有辦法異步使用那些轉換運算符呢?答案是 flow編程
若是咱們嘗試獲取列表並將其用做 flow ,並在流的末尾調用 collect {..}
,則會收到編譯錯誤。 因爲 flow 是基於協程構建的,所以默認狀況下它具備異步功能,所以您能夠在代碼中使用協程時使用它bash
collect {…}
運算符,您能夠將其想像爲 Rxjava
中的 subscribe
異步
流也是 cold stream
,這意味着,直到您調用操做符(如 collect)後,flow 纔會被執行。 若是您重複調用 collect ,每次您將得到相同的結果編程語言
所以,Collections 擴展功能僅適用於小數據,sequence
能夠節省您沒必要要的工做(不建立臨時列表),而使用 flow,您能夠用協程的強大功能來編寫代碼。 所以,讓咱們學習如何構建它函數
咱們看到 asFlow
方法,它是 Collections 上的擴展函數,可將其轉換爲 flow,咱們查看一下源碼學習
public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
}
複製代碼
若是咱們要編寫前面的示例在數據源中添加一些邏輯,則只需使用 flow{…}
或者 flowof()
flow 擁有一些列的用於轉換的運算符,例如 map
, filter
, groupBy
,scan
等等
在由 Coroutines 提供支持的 flow
中,您能夠天然地在您的操做符中使用異步代碼,假設咱們想要作一些耗時的操做,這裏使用延遲一秒鐘表示。 使用 RxJava
時,您可使用 flatmap
這裏想表達的是 flow 具備更簡單的設計,而且與以其陡峭的學習曲線而聞名的 RxJava
相比易於學習,我在此使用 flow 將它簡化一下
我已經提到 collect()
是 terminal operator,當您在調用它時獲得結果,在 RxJava
中,您能夠經過調用 subscribe()
來啓動它,或者使用阻塞的方式,調用 blockingGet
flow 中的 terminal operator 是須要做用域操做的掛起函數,其餘的 operator 例如
toList(),toSet -> 返回集合中的全部 item
first() -> 僅返回第一個發射
reduce(),fold() -> 使用特定操做獲取結果
爲了發射數據,您須要使用一個掛起函數
//fire a coroutine
someScope.launch {
//fire flow with a terminal operator
flowSampleData().collect { }
}
複製代碼
上面的花括號讓人想起了回調,您可使用 launchIn
函數,處理結果可使用 onEach{...}
flowSampleData()
.onEach {
//handle emissions
}
.launchIn(someScope)
複製代碼
每次設置 RxJava
訂閱時,咱們都必須取消這些訂閱以免內存泄漏或過時的任務在後臺運行,RxJava
提供對訂閱的引用(disposable
)來取消訂閱,disposable().dispose()
。若是您在 CompositeDisposable
使用了多個對象,則調用 clear()
或 dispose()
對於 flow 使用特定 scope 的協程則能夠無需進行額外的工做來達到此目的
RxJava
最有用的功能之一就是處理錯誤的方式,您可使用此 onError()
函數捕獲工做流中的任何錯誤。 flow 有一個相似的稱爲 catch {…}
,若是不使用 catch {…}
,則您的代碼可能會引起異常或應用崩潰。 您就能夠選擇使用常規 try catch 或使用 atch {…} 以聲明方式進行編碼
讓咱們模擬一個錯誤
private fun flowOfAnimeCharacters() = flow {
emit("Madara")
emit("Kakashi")
// 拋出異常
throw IllegalStateException()
emit("Jiraya")
emit("Itachi")
emit("Naruto")
}
複製代碼
使用
runBlocking {
flowOfAnimeCharacters()
.map { stringToLength(it) }
.filter { it > 4 }
.collect {
println(it)
}
}
複製代碼
若是咱們運行此代碼,它將引起異常,而且如咱們所說,您有兩個選項能夠處理錯誤,即常規 try-catch
和 catch {…}
。 這是兩種狀況下的修改代碼
// 使用 try-catch
runBlocking {
try {
flowOfAnimeCharacters()
.map { stringToLength(it) }
.filter { it > 4 }
.collect {
println(it)
}
} catch (e: Exception) {
println(e.stackTrace)
} finally {
println("Beat it")
}
}
複製代碼
// 使用 catch{}
runBlocking {
flowOfAnimeCharacters()
.map { stringToLength(it) }
.filter { it > 4 }
// catch
.catch { println(it) }
.collect {
println(it)
}
}
複製代碼
使用 catch{}
須要注意的是 catch{}
操做符的放置順序,它要放置在 terminal operator 以前,這樣您才能夠捕獲想要的異常
若是錯誤中斷了流,而且咱們打算使用完整備份或默認數據恢復流,在 Rxjava
中使用 onErrorResumeNext()
或 onErrorReturn()
,在 flow 中,咱們仍是使用 catch {…}
,但咱們在其中調用了 emit()
來逐個生成備份,甚至咱們可使用 emitAll()
引入一個全新的 flow,例如若是中途出現了異常,咱們須要「 Minato」 和 「 Hashirama」
runBlocking {
flowOfAnimeCharacters()
.catch {
emitAll(flowOf("Minato", "Hashirama"))
}
.collect {
println(it)
}
}
複製代碼
那麼獲得的結果是
Madara
Kakashi
Minato
Hashirama
複製代碼
默認狀況下,flow 數據源將在調用者上下文中運行,若是要更改它,例如,要使 flow 在 IO 而不是 Main 上運行,則使用 flowOn()
,並更改上游的上下文,上游是調用 flowOn
以前的所有操做符。 這是一個很好的文檔示例
這裏的 flowOn()
充當 RxJava
中的兩個角色 [subscribeOn() — observeOn()]
,您能夠編寫流而後肯定將在哪一個上下文中進行操做
當 flow 完成發射時,您可能須要執行一些操做,onCompletion {…}
能夠解決這一問題,而且它肯定 flow 是正常完成仍是異常完成
已知數據源以下
private fun flowOfAnimeCharacters() = flow {
emit("Madara")
emit("Kakashi")
throw IllegalStateException()
emit("Jiraya")
emit("Itachi")
emit("Naruto")
}
複製代碼
catch {…}
的工做就是捕獲 IllegalStateException()
並從新開始新流程,這使咱們從源頭上留下「 Madara」,「 Kakashi」,在後面留下「 Minato」,「 Hashirama」。 可是 onCompletion {…}
會顯示錯誤嗎?
答案是否認的,catch 捕獲了全部錯誤,接下來是全新的事情,請記住 onCompletion {…}
和 catch {…}
只是中介程序運算符。 它們的順序很重要
您可使用 Flow builders 構建 flow,其中最基本的是 flow{…}
。 若是要開始該 flow,請調用諸如 collect {…}
之類的 terminal operator,而且因爲 terminal operator 是掛起函數,所以須要使用協程構建器 launch {…}
的做用域,或者若是您想要以優雅的風格進行操做, 您能夠結合使用 launchIn()
和 onEach {…}
。 使用 catch {…}
捕獲上游錯誤,並根據須要提供回退流程。 onCompletion {..}
將在上游完成全部發射以後或發生錯誤時觸發。 默認狀況下,全部這些方法都適用於調用程序協程上下文,若是要更改上游上下文,請使用flowOn()
我是 Fly_with24