【譯】kotlin 協程 Flow:給 RxJava 使用者的介紹

原文:Flow: an intro for an RxJava user
做者:Mohamed Ibrahim
譯者:Fly_with24html


RxJava 多是我使用的最重要的庫,Rx 一般是編寫代碼的另外一種範式,Kotlin 做爲一種新的編程語言,使它能夠輕鬆實現將協程驅動的 flow 實現爲本身的 Rx 實現。 我可能在 Hello Kotlin Coroutines 中介紹了協程,這對於理解 flow 頗有必要java

Kotlin 具備一組擴展,以方便使用集合。 但它不是響應式的git

listOf("Madara", "Kakashi", "Naruto", "Jiraya", "Itachi")
    .map { it.length }
    .filter { it > 4 }
    .forEach {
        println(it)
    }
複製代碼

在此示例中,若是您深刻研究 map 函數源代碼,您將發現這裏沒有魔法,它只是列表的循環,進行了一些轉換而後爲您提供了一個新列表。 過濾器也同樣。 這種機制稱爲 eager evaluation ,該函數在整個列表中進行操做並提供一個新列表。 可是若是咱們不須要建立這些臨時列表以節省一些內存,那咱們可使用 Sequencesgithub

listOf("Madara", "Kakashi", "Naruto", "Jiraya", "Itachi")
	// 使用 Sequence
    .asSequence()
    .map { it.length }
    .filter { it > 4 }
    .forEach {
        println(it)
    }
複製代碼

這裏的區別就是先調用 asSequence 方法,而後使用咱們的操做,再次 查看 map 方法後,咱們發現了一些不一樣之處,它只是 sequence 的修飾符,返回值類型也是 sequence 。 使用 sequence map 時,只能一項一項地進行操做。列表較大時,sequence 比普通集合要好得多。sequence 能夠同步完成其工做,有沒有辦法異步使用那些轉換運算符呢?答案是 flow編程

flow

若是咱們嘗試獲取列表並將其用做 flow ,並在流的末尾調用 collect {..},則會收到編譯錯誤。 因爲 flow 是基於協程構建的,所以默認狀況下它具備異步功能,所以您能夠在代碼中使用協程時使用它bash

collect {…} 運算符,您能夠將其想像爲 Rxjava 中的 subscribe異步

流也是 cold stream ,這意味着,直到您調用操做符(如 collect)後,flow 纔會被執行。 若是您重複調用 collect ,每次您將得到相同的結果編程語言

所以,Collections 擴展功能僅適用於小數據,sequence 能夠節省您沒必要要的工做(不建立臨時列表),而使用 flow,您能夠用協程的強大功能來編寫代碼。 所以,讓咱們學習如何構建它函數

構建 flow

咱們看到 asFlow 方法,它是 Collections 上的擴展函數,可將其轉換爲 flow,咱們查看一下源碼學習

public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}
複製代碼

若是咱們要編寫前面的示例在數據源中添加一些邏輯,則只需使用 flow{…} 或者 flowof()

轉換操做符

flow 擁有一些列的用於轉換的運算符,例如 mapfiltergroupByscan 等等

在由 Coroutines 提供支持的 flow 中,您能夠天然地在您的操做符中使用異步代碼,假設咱們想要作一些耗時的操做,這裏使用延遲一秒鐘表示。 使用 RxJava 時,您可使用 flatmap

這裏想表達的是 flow 具備更簡單的設計,而且與以其陡峭的學習曲線而聞名的 RxJava 相比易於學習,我在此使用 flow 將它簡化一下

terminal 操做符

我已經提到 collect() 是 terminal operator,當您在調用它時獲得結果,在 RxJava 中,您能夠經過調用 subscribe() 來啓動它,或者使用阻塞的方式,調用 blockingGet

flow 中的 terminal operator 是須要做用域操做的掛起函數,其餘的 operator 例如

  • toList(),toSet -> 返回集合中的全部 item

  • first() -> 僅返回第一個發射

  • reduce(),fold() -> 使用特定操做獲取結果

發射數據

爲了發射數據,您須要使用一個掛起函數

//fire a coroutine
someScope.launch {
  //fire flow with a terminal operator
  flowSampleData().collect { }
}
複製代碼

上面的花括號讓人想起了回調,您可使用 launchIn 函數,處理結果可使用 onEach{...}

flowSampleData()
    .onEach {
     //handle emissions
    }
    .launchIn(someScope)
複製代碼

取消

每次設置 RxJava 訂閱時,咱們都必須取消這些訂閱以免內存泄漏或過時的任務在後臺運行,RxJava 提供對訂閱的引用(disposable)來取消訂閱,disposable().dispose() 。若是您在 CompositeDisposable 使用了多個對象,則調用 clear()dispose()

對於 flow 使用特定 scope 的協程則能夠無需進行額外的工做來達到此目的

錯誤處理

RxJava 最有用的功能之一就是處理錯誤的方式,您可使用此 onError() 函數捕獲工做流中的任何錯誤。 flow 有一個相似的稱爲 catch {…} ,若是不使用 catch {…} ,則您的代碼可能會引起異常或應用崩潰。 您就能夠選擇使用常規 try catch 或使用 atch {…} 以聲明方式進行編碼

讓咱們模擬一個錯誤

private fun flowOfAnimeCharacters() = flow {
    emit("Madara")
    emit("Kakashi")
    // 拋出異常
    throw IllegalStateException()
    emit("Jiraya")
    emit("Itachi")
    emit("Naruto")
}
複製代碼

使用

runBlocking {
    flowOfAnimeCharacters()
        .map { stringToLength(it) }
        .filter { it > 4 }
        .collect {
            println(it)
        }
}
複製代碼

若是咱們運行此代碼,它將引起異常,而且如咱們所說,您有兩個選項能夠處理錯誤,即常規 try-catchcatch {…}。 這是兩種狀況下的修改代碼

// 使用 try-catch
runBlocking {
    try {
        flowOfAnimeCharacters()
            .map { stringToLength(it) }
            .filter { it > 4 }
            .collect {
                println(it)
            }
    } catch (e: Exception) {
        println(e.stackTrace)
    } finally {
        println("Beat it")
    }
}
複製代碼
// 使用 catch{}
runBlocking {
    flowOfAnimeCharacters()
        .map { stringToLength(it) }
        .filter { it > 4 }
         // catch
        .catch { println(it) }
        .collect {
            println(it)
        }
}
複製代碼

使用 catch{} 須要注意的是 catch{} 操做符的放置順序,它要放置在 terminal operator 以前,這樣您才能夠捕獲想要的異常

恢復

若是錯誤中斷了流,而且咱們打算使用完整備份或默認數據恢復流,在 Rxjava 中使用 onErrorResumeNext()onErrorReturn() ,在 flow 中,咱們仍是使用 catch {…},但咱們在其中調用了 emit() 來逐個生成備份,甚至咱們可使用 emitAll() 引入一個全新的 flow,例如若是中途出現了異常,咱們須要「 Minato」 和 「 Hashirama」

runBlocking {
    flowOfAnimeCharacters()
        .catch {
            emitAll(flowOf("Minato", "Hashirama"))
        }
        .collect {
            println(it)
        }
}
複製代碼

那麼獲得的結果是

Madara
Kakashi
Minato
Hashirama
複製代碼

flowOn()

默認狀況下,flow 數據源將在調用者上下文中運行,若是要更改它,例如,要使 flow 在 IO 而不是 Main 上運行,則使用 flowOn(),並更改上游的上下文,上游是調用 flowOn 以前的所有操做符。 這是一個很好的文檔示例

這裏的 flowOn() 充當 RxJava 中的兩個角色 [subscribeOn() — observeOn()],您能夠編寫流而後肯定將在哪一個上下文中進行操做

完成

當 flow 完成發射時,您可能須要執行一些操做,onCompletion {…} 能夠解決這一問題,而且它肯定 flow 是正常完成仍是異常完成

已知數據源以下

private fun flowOfAnimeCharacters() = flow {
    emit("Madara")
    emit("Kakashi")
    throw IllegalStateException()
    emit("Jiraya")
    emit("Itachi")
    emit("Naruto")
}

複製代碼

catch {…} 的工做就是捕獲 IllegalStateException() 並從新開始新流程,這使咱們從源頭上留下「 Madara」,「 Kakashi」,在後面留下「 Minato」,「 Hashirama」。 可是 onCompletion {…} 會顯示錯誤嗎?

答案是否認的,catch 捕獲了全部錯誤,接下來是全新的事情,請記住 onCompletion {…}catch {…} 只是中介程序運算符。 它們的順序很重要

總結

您可使用 Flow builders 構建 flow,其中最基本的是 flow{…}。 若是要開始該 flow,請調用諸如 collect {…} 之類的 terminal operator,而且因爲 terminal operator 是掛起函數,所以須要使用協程構建器 launch {…} 的做用域,或者若是您想要以優雅的風格進行操做, 您能夠結合使用 launchIn()onEach {…}。 使用 catch {…} 捕獲上游錯誤,並根據須要提供回退流程。 onCompletion {..} 將在上游完成全部發射以後或發生錯誤時觸發。 默認狀況下,全部這些方法都適用於調用程序協程上下文,若是要更改上游上下文,請使用flowOn()

關於我

我是 Fly_with24

相關文章
相關標籤/搜索