破解 Kotlin 協程(11) - Flow 篇

原文連接 BennyHuo 破解 Kotlin 協程(11)-Flow 篇git

Flow 就是 Kotlin 協程與響應式編程模型結合的產物,你會發現它與 RxJava 很是像,兩者之間也有相互轉換的 API,使用起來很是方便。github

隨着 RxJava 的流行,響應式編程模型逐步深刻人心。Flow 就是 Kotlin 協程與響應式編程模型結合的產物。編程

本文基於 Kotlinx.coroutines1.3.3,因爲部分功能尚處於實驗階段,後續也可能會發生細微的調整。緩存

認識 Flow

介紹 Flow 以前,咱們先來回顧下序列生成器:安全

代碼清單1: 序列生成器bash

val ints = sequence {
  (1..3).forEach { 
    yield(it)
  }  
}
複製代碼

每次訪問 ints 的下一個元素的時候它就執行內部的邏輯直到遇到 yield,若是我但願在元素之間加個延時呢?數據結構

代碼清單2:序列生成器中不能調用其餘掛起函數併發

val ints = sequence {
  (1..3).forEach { 
    yield(it)
    delay(1000) // ERROR!
  }  
}
複製代碼

RestrictsSuspension 註解的約束,delay 不能在 SequenceScope 的擴展成員當中被調用,於是不能在序列生成器的協程體內調用了。框架

假設序列生成器不受這個限制,調用 delay 會致使後續的執行流程的線程發生變化,外部的調用者發如今訪問 ints 的下一個元素的時候竟然還會有切換線程的反作用,這個是否是算一個「驚喜」呢?不只如此,我想經過指定調度器來限定序列建立所在的線程,一樣是不能夠的,咱們甚至沒有辦法爲它設置協程上下文。異步

既然序列生成器有這麼多限制,那咱們是時候須要認識一下 Flow 了。它的 API 與序列生成器極爲類似:

代碼清單3:建立 Flow

val intFlow = flow {
  (1..3).forEach { 
    emit(it)
    delay(100)
  }
}
複製代碼

新元素經過 emit 函數提供,Flow 的執行體內部也能夠調用其餘掛起函數,這樣咱們就能夠在每次提供一個新元素後再延時 100ms 了。

Flow 也能夠設定它運行時所使用的調度器:

intFlow.flowOn(Dispatchers.IO)
複製代碼

經過 flowOn 設置的調度器只對它以前的操做有影響,所以這裏意味着 intFlow 的構造邏輯會在 IO 調度器上執行。

最終消費 intFlow 須要調用 collect 函數,這個函數也是一個掛起函數,咱們啓動一個協程來消費 intFlow

代碼清單4: 消費 Flow

GlobalScope.launch(myDispatcher) {
  intFlow.flowOn(Dispatchers.IO)
    .collect { println(it) }
}.join()
複製代碼

爲了區分調度器,咱們爲協程設置了一個自定義的調度器,它會將協程調度到名叫 MyThread 的線程上,結果以下:

[MyThread] 1
[MyThread] 2
[MyThread] 3
複製代碼

對比 RxJava 的線程切換

RxJava 也是一個基於響應式編程模型的異步框架,它提供了兩個切換調度器的 API 分別是 subscribeOnobserveOn

代碼清單5:RxJava 的調度器切換

Observable.create<Int> {
  (1..3).forEach { e ->
    it.onNext(e)
  }
  it.onComplete()
}.subscribeOn(Schedulers.io())
.observeOn(Schedulers.from(myExecutor))
.subscribe {
  println(it)
}
複製代碼

其中 subscribeOn 指定的調度器影響前面的邏輯,observeOn 影響的是後面的邏輯,所以 it.onNext(e) 執行在它的 io 這個調度器上,而最後的 println(it) 執行在經過 myExecutor 建立出來的調度器上。

Flow 的調度器 API 中看似只有 flowOnsubscribeOn 對應,其實否則, collect 所在協程的調度器則與 observeOn 指定的調度器對應。

在 RxJava 的學習和使用過程當中, subscribeOnobserveOn 常常容易被混淆;而在 Flow 當中 collect 所在的協程天然就是觀察者,它想運行在什麼調度器上它本身指定便可,很是容易區分。

冷數據流

一個 Flow 建立出來以後,不消費則不生產,屢次消費則屢次生產,生產和消費老是相對應的。

代碼清單6:Flow 能夠被重複消費

GlobalScope.launch(dispatcher) {
  intFlow.collect { println(it) }
  intFlow.collect { println(it) }
}.join()
複製代碼

intFlow 就是本節最開始咱們建立的 Flow,消費它會輸出 1,2,3,重複消費它會重複輸出 1,2,3。

這一點其實相似於咱們前面提到的 sequence 和 RxJava 例子,它們也都有本身的消費端。咱們建立一個序列而後去迭代它,每次迭代都會建立一個新的迭代器從頭開始迭代;RxJava 的 Observable 也是如此,每次調用它的 subscribe 都會從新消費一次。

所謂冷數據流,就是隻有消費時纔會生產的數據流,這一點與 Channel 正對應:Channel 的發送端並不依賴於接收端。

說明 RxJava 也存在熱數據流,能夠經過必定的手段實現冷熱數據流的轉化。不過相比之下,冷數據流的應用場景更爲豐富。

異常處理

Flow 的異常處理也比較直接,直接調用 catch 函數便可:

代碼清單7:捕獲 Flow 的異常

flow {
  emit(1)
  throw ArithmeticException("Div 0")
}.catch { t: Throwable ->
  println("caught error: $t")
}
複製代碼

咱們在 Flow 的參數中拋了一個異常,在 catch 函數中就能夠直接捕獲到這個異常。若是沒有調用 catch 函數,未捕獲異常會在消費時拋出。請注意,catch 函數只能捕獲它的上游的異常。

若是咱們想要在流完成時執行邏輯,可使用 onCompletion

代碼清單8:訂閱流的完成

flow {
  emit(1)
  throw ArithmeticException("Div 0")
}.catch { t: Throwable ->
  println("caught error: $t")
}.onCompletion { t: Throwable? ->
  println("finally.")
}
複製代碼

onCompletion 用起來比較相似於 try ... catch ... finally 中的 finally,不管前面是否存在異常,它都會被調用,參數 t 則是前面未捕獲的異常。

Flow 的設計初衷是但願確保流操做中異常透明。所以,如下寫法是違反 Flow 的設計原則的:

代碼清單9:命令式的異常處理(不推薦)

flow { 
  try {
    emit(1)
    throw ArithmeticException("Div 0")
  } catch (t: Throwable){
    println("caught error: $t")
  } finally {
    println("finally.")
  }
}
複製代碼

在流操做內部使用 try ... catch ... finally 這樣的寫法後續可能被禁用。

在 RxJava 當中還有 onErrorReturn 相似的操做:

代碼清單10:RxJava 從異常中恢復

val observable = Observable.create<Int> {
  ...
}.onErrorReturn {
  println(t)
  10
}
複製代碼

捕獲異常後,返回 10 做爲下一個值。

咱們在 Flow 當中也能夠模擬這樣的操做:

代碼清單11:Flow 從異常中恢復

flow {
  emit(1)
  throw ArithmeticException("Div 0")
}.catch { t: Throwable ->
  println("caught error: $t")
  emit(10)
}
複製代碼

這裏咱們可使用 emit 從新生產新元素出來。細心的讀者必定會發現,emit 定義在 FlowCollector 當中,所以只要遇到 Receiver 爲 FlowCollector 的函數,咱們就能夠生產新元素。

說明 onCompletion 預計在協程框架的 1.4 版本中會被從新設計,以後它的做用相似於 RxJava 中 Subscriber 的 onComplete,即做爲整個 Flow 的完成回調使用,回調的參數也將包含整個 Flow 的未捕獲異常,參見 GitHub Issue:Breaking change: Experimental Flow.onCompletion contract for cause #1732

末端操做符

前面的例子當中,咱們用 collect 消費 Flow 的數據。collect 是最基本的末端操做符,功能與 RxJava 的 subscribe 相似。除了 collect 以外,還有其餘常見的末端操做符,大致分爲兩類:

  1. 集合類型轉換操做,包括 toListtoSet 等。
  2. 聚合操做,包括將 Flow 規約到單值的 reducefold 等操做,以及得到單個元素的操做包括 singlesingleOrNullfirst 等。

實際上,識別是否爲末端操做符,還有一個簡單方法,因爲 Flow 的消費端必定須要運行在協程當中,所以末端操做符都是掛起函數。

分離 flow 的消費和觸發

咱們除了能夠在 collect 處消費 Flow 的元素之外,還能夠經過 onEach 來作到這一點。這樣消費的具體操做就不須要與末端操做符放到一塊兒,collect 函數能夠放到其餘任意位置調用,例如:

代碼清單12:分離 Flow 的消費和觸發

fun createFlow() = flow<Int> {
    (1..3).forEach {
      emit(it)
      delay(100)
    }
  }.onEach { println(it) }

fun main(){
  GlobalScope.launch {
    createFlow().collect()
  }
}
複製代碼

由此,咱們又能夠衍生出一種新的消費 Flow 的寫法:

代碼清單13:使用協程做用域直接觸發 Flow

fun main(){
  createFlow().launchIn(GlobalScope)
}
複製代碼

其中 launchIn 函數只接收一個 CoroutineScope 類型的參數。

Flow 的取消

Flow 沒有提供取消操做,緣由很簡單:不須要。

咱們前面已經介紹了 Flow 的消費依賴於 collect 這樣的末端操做符,而它們又必須在協程當中調用,所以 Flow 的取消主要依賴於末端操做符所在的協程的狀態。

代碼清單14:Flow 的取消

val job = GlobalScope.launch {
  val intFlow = flow {
    (1..3).forEach {
      delay(1000)
      emit(it)
    }
  }

  intFlow.collect { println(it) }
}

delay(2500)
job.cancelAndJoin()
複製代碼

每隔 1000ms 生產一個元素,2500ms 之後協程被取消,所以最後一個元素生產前 Flow 就已經被取消,輸出爲:

1
▶ 1000ms later
2
複製代碼

如此看來,想要取消 Flow 只須要取消它所在的協程便可。

其餘 Flow 的建立方式

咱們已經知道了 flow { ... } 這種形式的建立方式,不過在這當中沒法隨意切換調度器,這是由於 emit 函數不是線程安全的:

代碼清單15:不能在 Flow 中直接切換調度器

flow { // BAD!!
  emit(1)
  withContext(Dispatchers.IO){
    emit(2)
  }
}
複製代碼

想要在生成元素時切換調度器,就必須使用 channelFlow 函數來建立 Flow:

channelFlow {
  send(1)
  withContext(Dispatchers.IO) {
    send(2)
  }
}
複製代碼

此外,咱們也能夠經過集合框架來建立 Flow:

listOf(1, 2, 3, 4).asFlow()
setOf(1, 2, 3, 4).asFlow()
flowOf(1, 2, 3, 4)
複製代碼

Flow 的背壓

只要是響應式編程,就必定會有背壓問題,咱們先來看看背壓到底是什麼。

背壓問題在生產者的生產速率高於消費者的處理速率的狀況下出現。爲了保證數據不丟失,咱們也會考慮添加緩存來緩解問題:

代碼清單16:爲 Flow 添加緩衝

flow {
  List(100) {
    emit(it)
  }
}.buffer()
複製代碼

咱們也能夠爲 buffer 指定一個容量。不過,若是咱們只是單純地添加緩存,而不是從根本上解決問題就始終會形成數據積壓。

問題產生的根本緣由是生產和消費速率的不匹配,除直接優化消費者的性能之外,咱們也能夠採起一些取捨的手段。

第一種是 conflate。與 ChannelConflate 模式一致,新數據會覆蓋老數據,例如:

代碼清單17:使用 conflate 解決背壓問題

flow {
  List(100) {
    emit(it)
  }
}.conflate()
.collect { value ->
  println("Collecting $value")
  delay(100) 
  println("$value collected")
}
複製代碼

咱們快速地發送了 100 個元素,最後接收到的只有兩個,固然這個結果每次都不必定同樣:

Collecting 1
1 collected
Collecting 99
99 collected
複製代碼

第二種是 collectLatest。顧名思義,只處理最新的數據,這看上去彷佛與 conflate 沒有區別,其實區別大了:它並不會直接用新數據覆蓋老數據,而是每個都會被處理,只不過若是前一個還沒被處理完後一個就來了的話,處理前一個數據的邏輯就會被取消。

仍是前面的例子,咱們稍做修改:

代碼清單18:使用 collectLatest 解決背壓問題

flow {
  List(100) {
    emit(it)
  }
}.collectLatest { value ->
  println("Collecting $value")
  delay(100)
  println("$value collected")
}
複製代碼

運行結果以下:

Collecting 0
Collecting 1
...
Collecting 97
Collecting 98
Collecting 99
▶ 100ms later
99 collected
複製代碼

前面的 Collecting 輸出了 0 ~ 99 的全部結果,而 collected 卻只有 99,由於後面的數據到達時,處理上一個數據的操做正好被掛起了(請注意delay(100))。

collectLatest 以外還有 mapLatestflatMapLatest 等等,都是這個做用。

Flow 的變換

咱們已經對集合框架的變換很是熟悉了,Flow 看上去極其相似於這樣的數據結構,這一點與 RxJava 的 Observable 的表現也基本一致。

例如咱們可使用 map 來變換 Flow 的數據:

代碼清單19:Flow 的元素變換

flow {
  List(5){ emit(it) } 
}.map { 
  it * 2
}
複製代碼

也能夠映射成其餘 Flow:

代碼清單20:Flow 的嵌套

flow {
  List(5){ emit(it) } 
}.map {
  flow { List(it) { emit(it) } }
}
複製代碼

這實際上獲得的是一個數據類型爲 FlowFlow,若是但願將它們拼接起來,可使用 flattenConcat

代碼清單21:拼接 Flow

flow {
  List(5){ emit(it) } 
}.map {
  flow { List(it) { emit(it) } }
}.flattenConcat()
  .collect { println(it) }
複製代碼

拼接的操做中 flattenConcat 是按順序拼接的,結果的順序仍然是生產時的順序;還有一個是 flattenMerge,它會併發拼接,所以結果不會保證順序。

使用 Flow 實現多路複用

多數狀況下,咱們能夠經過構造合適的 Flow 來實現多路複用的效果。

上一篇文章破解 Kotlin 協程(10) - Select 篇中對 await 的複用咱們能夠用 Flow 實現以下:

代碼清單22:使用 Flow 實現對 await 的多路複用

coroutineScope {
  val login = "..."
  listOf(::getUserFromApi, ::getUserFromLocal) ... ①
    .map { function ->
      function.call(login) ... ②
    }
    .map { deferred ->
      flow { emit(deferred.await()) } ... ③
    }
    .merge() ... ④
    .onEach { user ->
      println("Result: $user")
    }.launchIn(this)
}
複製代碼

這其中,① 處用建立了兩個函數引用組成的 List;② 處調用它們獲得 deferred;③ 處比較關鍵,對於每個 deferred 咱們建立一個單獨的 Flow,並在 Flow 內部發送 deferred.await() 返回的結果,即返回的 User 對象;如今咱們有了兩個 Flow 實例,咱們須要將它們整合成一個 Flow 進行處理,調用 merge 函數便可。

圖1:使用 merge 合併 Flow

一樣的,對 Channel 的讀取複用的場景也可使用 Flow 來完成。對照破解 Kotlin 協程(10) - Select 篇,咱們給出 Flow 的實現版本:

代碼清單23:使用 Flow 實現對 Channel 的複用

val channels = List(10) { Channel<Int>() }
...
val result = channels.map {
    it.consumeAsFlow()
  }
  .merge()
  .first()
複製代碼

這比 select 的版本看上去要更簡潔明瞭,每一個 Channel 都經過 consumeAsFlow 函數被映射成 Flow,再 merge 成一個 Flow,取第一個元素。

小結

Flow 是協程當中比較重要的異步工具,它的用法與其餘相似的響應式編程框架很是相近,你們能夠採起類比的學習方式去了解它的功能。

歡迎關注 Kotlin 中文社區!

中文官網:www.kotlincn.net/

中文官方博客:www.kotliner.cn/

公衆號:Kotlin

知乎專欄:Kotlin

CSDN:Kotlin中文社區

掘金:Kotlin中文社區

簡書:Kotlin中文社區

相關文章
相關標籤/搜索