原文連接 BennyHuo 破解 Kotlin 協程(11)-Flow 篇git
Flow
就是 Kotlin 協程與響應式編程模型結合的產物,你會發現它與 RxJava 很是像,兩者之間也有相互轉換的 API,使用起來很是方便。github
隨着 RxJava 的流行,響應式編程模型逐步深刻人心。Flow
就是 Kotlin 協程與響應式編程模型結合的產物。編程
本文基於 Kotlinx.coroutines1.3.3,因爲部分功能尚處於實驗階段,後續也可能會發生細微的調整。緩存
介紹 Flow
以前,咱們先來回顧下序列生成器:安全
代碼清單1: 序列生成器bash
val ints = sequence {
(1..3).forEach {
yield(it)
}
}
複製代碼
每次訪問 ints
的下一個元素的時候它就執行內部的邏輯直到遇到 yield
,若是我但願在元素之間加個延時呢?數據結構
代碼清單2:序列生成器中不能調用其餘掛起函數併發
val ints = sequence {
(1..3).forEach {
yield(it)
delay(1000) // ERROR!
}
}
複製代碼
受 RestrictsSuspension
註解的約束,delay
不能在 SequenceScope
的擴展成員當中被調用,於是不能在序列生成器的協程體內調用了。框架
假設序列生成器不受這個限制,調用 delay
會致使後續的執行流程的線程發生變化,外部的調用者發如今訪問 ints
的下一個元素的時候竟然還會有切換線程的反作用,這個是否是算一個「驚喜」呢?不只如此,我想經過指定調度器來限定序列建立所在的線程,一樣是不能夠的,咱們甚至沒有辦法爲它設置協程上下文。異步
既然序列生成器有這麼多限制,那咱們是時候須要認識一下 Flow
了。它的 API 與序列生成器極爲類似:
代碼清單3:建立 Flow
val intFlow = flow {
(1..3).forEach {
emit(it)
delay(100)
}
}
複製代碼
新元素經過 emit
函數提供,Flow 的執行體內部也能夠調用其餘掛起函數,這樣咱們就能夠在每次提供一個新元素後再延時 100ms 了。
Flow 也能夠設定它運行時所使用的調度器:
intFlow.flowOn(Dispatchers.IO)
複製代碼
經過 flowOn
設置的調度器只對它以前的操做有影響,所以這裏意味着 intFlow 的構造邏輯會在 IO
調度器上執行。
最終消費 intFlow
須要調用 collect
函數,這個函數也是一個掛起函數,咱們啓動一個協程來消費 intFlow
:
代碼清單4: 消費 Flow
GlobalScope.launch(myDispatcher) {
intFlow.flowOn(Dispatchers.IO)
.collect { println(it) }
}.join()
複製代碼
爲了區分調度器,咱們爲協程設置了一個自定義的調度器,它會將協程調度到名叫 MyThread
的線程上,結果以下:
[MyThread] 1
[MyThread] 2
[MyThread] 3
複製代碼
RxJava 也是一個基於響應式編程模型的異步框架,它提供了兩個切換調度器的 API 分別是 subscribeOn
和 observeOn
:
代碼清單5:RxJava 的調度器切換
Observable.create<Int> {
(1..3).forEach { e ->
it.onNext(e)
}
it.onComplete()
}.subscribeOn(Schedulers.io())
.observeOn(Schedulers.from(myExecutor))
.subscribe {
println(it)
}
複製代碼
其中 subscribeOn
指定的調度器影響前面的邏輯,observeOn
影響的是後面的邏輯,所以 it.onNext(e)
執行在它的 io
這個調度器上,而最後的 println(it)
執行在經過 myExecutor
建立出來的調度器上。
Flow 的調度器 API 中看似只有 flowOn
與 subscribeOn
對應,其實否則, collect
所在協程的調度器則與 observeOn
指定的調度器對應。
在 RxJava 的學習和使用過程當中, subscribeOn
和 observeOn
常常容易被混淆;而在 Flow 當中 collect
所在的協程天然就是觀察者,它想運行在什麼調度器上它本身指定便可,很是容易區分。
一個 Flow 建立出來以後,不消費則不生產,屢次消費則屢次生產,生產和消費老是相對應的。
代碼清單6:Flow 能夠被重複消費
GlobalScope.launch(dispatcher) {
intFlow.collect { println(it) }
intFlow.collect { println(it) }
}.join()
複製代碼
intFlow
就是本節最開始咱們建立的 Flow,消費它會輸出 1,2,3,重複消費它會重複輸出 1,2,3。
這一點其實相似於咱們前面提到的 sequence
和 RxJava 例子,它們也都有本身的消費端。咱們建立一個序列而後去迭代它,每次迭代都會建立一個新的迭代器從頭開始迭代;RxJava 的 Observable
也是如此,每次調用它的 subscribe
都會從新消費一次。
所謂冷數據流,就是隻有消費時纔會生產的數據流,這一點與 Channel
正對應:Channel
的發送端並不依賴於接收端。
說明 RxJava 也存在熱數據流,能夠經過必定的手段實現冷熱數據流的轉化。不過相比之下,冷數據流的應用場景更爲豐富。
Flow 的異常處理也比較直接,直接調用 catch
函數便可:
代碼清單7:捕獲 Flow 的異常
flow {
emit(1)
throw ArithmeticException("Div 0")
}.catch { t: Throwable ->
println("caught error: $t")
}
複製代碼
咱們在 Flow 的參數中拋了一個異常,在 catch
函數中就能夠直接捕獲到這個異常。若是沒有調用 catch
函數,未捕獲異常會在消費時拋出。請注意,catch
函數只能捕獲它的上游的異常。
若是咱們想要在流完成時執行邏輯,可使用 onCompletion
:
代碼清單8:訂閱流的完成
flow {
emit(1)
throw ArithmeticException("Div 0")
}.catch { t: Throwable ->
println("caught error: $t")
}.onCompletion { t: Throwable? ->
println("finally.")
}
複製代碼
onCompletion
用起來比較相似於 try ... catch ... finally
中的 finally
,不管前面是否存在異常,它都會被調用,參數 t 則是前面未捕獲的異常。
Flow 的設計初衷是但願確保流操做中異常透明。所以,如下寫法是違反 Flow 的設計原則的:
代碼清單9:命令式的異常處理(不推薦)
flow {
try {
emit(1)
throw ArithmeticException("Div 0")
} catch (t: Throwable){
println("caught error: $t")
} finally {
println("finally.")
}
}
複製代碼
在流操做內部使用 try ... catch ... finally
這樣的寫法後續可能被禁用。
在 RxJava 當中還有 onErrorReturn
相似的操做:
代碼清單10:RxJava 從異常中恢復
val observable = Observable.create<Int> {
...
}.onErrorReturn {
println(t)
10
}
複製代碼
捕獲異常後,返回 10 做爲下一個值。
咱們在 Flow 當中也能夠模擬這樣的操做:
代碼清單11:Flow 從異常中恢復
flow {
emit(1)
throw ArithmeticException("Div 0")
}.catch { t: Throwable ->
println("caught error: $t")
emit(10)
}
複製代碼
這裏咱們可使用 emit
從新生產新元素出來。細心的讀者必定會發現,emit
定義在 FlowCollector
當中,所以只要遇到 Receiver 爲 FlowCollector
的函數,咱們就能夠生產新元素。
說明 onCompletion 預計在協程框架的 1.4 版本中會被從新設計,以後它的做用相似於 RxJava 中 Subscriber 的 onComplete,即做爲整個 Flow 的完成回調使用,回調的參數也將包含整個 Flow 的未捕獲異常,參見 GitHub Issue:Breaking change: Experimental Flow.onCompletion contract for cause #1732。
前面的例子當中,咱們用 collect
消費 Flow 的數據。collect
是最基本的末端操做符,功能與 RxJava 的 subscribe
相似。除了 collect
以外,還有其餘常見的末端操做符,大致分爲兩類:
toList
、toSet
等。reduce
、fold
等操做,以及得到單個元素的操做包括 single
、singleOrNull
、first
等。實際上,識別是否爲末端操做符,還有一個簡單方法,因爲 Flow 的消費端必定須要運行在協程當中,所以末端操做符都是掛起函數。
咱們除了能夠在 collect
處消費 Flow 的元素之外,還能夠經過 onEach
來作到這一點。這樣消費的具體操做就不須要與末端操做符放到一塊兒,collect
函數能夠放到其餘任意位置調用,例如:
代碼清單12:分離 Flow 的消費和觸發
fun createFlow() = flow<Int> {
(1..3).forEach {
emit(it)
delay(100)
}
}.onEach { println(it) }
fun main(){
GlobalScope.launch {
createFlow().collect()
}
}
複製代碼
由此,咱們又能夠衍生出一種新的消費 Flow 的寫法:
代碼清單13:使用協程做用域直接觸發 Flow
fun main(){
createFlow().launchIn(GlobalScope)
}
複製代碼
其中 launchIn
函數只接收一個 CoroutineScope
類型的參數。
Flow 沒有提供取消操做,緣由很簡單:不須要。
咱們前面已經介紹了 Flow 的消費依賴於 collect
這樣的末端操做符,而它們又必須在協程當中調用,所以 Flow 的取消主要依賴於末端操做符所在的協程的狀態。
代碼清單14:Flow 的取消
val job = GlobalScope.launch {
val intFlow = flow {
(1..3).forEach {
delay(1000)
emit(it)
}
}
intFlow.collect { println(it) }
}
delay(2500)
job.cancelAndJoin()
複製代碼
每隔 1000ms 生產一個元素,2500ms 之後協程被取消,所以最後一個元素生產前 Flow 就已經被取消,輸出爲:
1
▶ 1000ms later
2
複製代碼
如此看來,想要取消 Flow 只須要取消它所在的協程便可。
咱們已經知道了 flow { ... }
這種形式的建立方式,不過在這當中沒法隨意切換調度器,這是由於 emit
函數不是線程安全的:
代碼清單15:不能在 Flow 中直接切換調度器
flow { // BAD!!
emit(1)
withContext(Dispatchers.IO){
emit(2)
}
}
複製代碼
想要在生成元素時切換調度器,就必須使用 channelFlow
函數來建立 Flow:
channelFlow {
send(1)
withContext(Dispatchers.IO) {
send(2)
}
}
複製代碼
此外,咱們也能夠經過集合框架來建立 Flow:
listOf(1, 2, 3, 4).asFlow()
setOf(1, 2, 3, 4).asFlow()
flowOf(1, 2, 3, 4)
複製代碼
只要是響應式編程,就必定會有背壓問題,咱們先來看看背壓到底是什麼。
背壓問題在生產者的生產速率高於消費者的處理速率的狀況下出現。爲了保證數據不丟失,咱們也會考慮添加緩存來緩解問題:
代碼清單16:爲 Flow 添加緩衝
flow {
List(100) {
emit(it)
}
}.buffer()
複製代碼
咱們也能夠爲 buffer
指定一個容量。不過,若是咱們只是單純地添加緩存,而不是從根本上解決問題就始終會形成數據積壓。
問題產生的根本緣由是生產和消費速率的不匹配,除直接優化消費者的性能之外,咱們也能夠採起一些取捨的手段。
第一種是 conflate
。與 Channel
的 Conflate
模式一致,新數據會覆蓋老數據,例如:
代碼清單17:使用 conflate 解決背壓問題
flow {
List(100) {
emit(it)
}
}.conflate()
.collect { value ->
println("Collecting $value")
delay(100)
println("$value collected")
}
複製代碼
咱們快速地發送了 100 個元素,最後接收到的只有兩個,固然這個結果每次都不必定同樣:
Collecting 1
1 collected
Collecting 99
99 collected
複製代碼
第二種是 collectLatest
。顧名思義,只處理最新的數據,這看上去彷佛與 conflate
沒有區別,其實區別大了:它並不會直接用新數據覆蓋老數據,而是每個都會被處理,只不過若是前一個還沒被處理完後一個就來了的話,處理前一個數據的邏輯就會被取消。
仍是前面的例子,咱們稍做修改:
代碼清單18:使用 collectLatest 解決背壓問題
flow {
List(100) {
emit(it)
}
}.collectLatest { value ->
println("Collecting $value")
delay(100)
println("$value collected")
}
複製代碼
運行結果以下:
Collecting 0
Collecting 1
...
Collecting 97
Collecting 98
Collecting 99
▶ 100ms later
99 collected
複製代碼
前面的 Collecting
輸出了 0 ~ 99 的全部結果,而 collected
卻只有 99,由於後面的數據到達時,處理上一個數據的操做正好被掛起了(請注意delay(100)
)。
除 collectLatest
以外還有 mapLatest
、flatMapLatest
等等,都是這個做用。
咱們已經對集合框架的變換很是熟悉了,Flow
看上去極其相似於這樣的數據結構,這一點與 RxJava 的 Observable
的表現也基本一致。
例如咱們可使用 map 來變換 Flow 的數據:
代碼清單19:Flow 的元素變換
flow {
List(5){ emit(it) }
}.map {
it * 2
}
複製代碼
也能夠映射成其餘 Flow:
代碼清單20:Flow 的嵌套
flow {
List(5){ emit(it) }
}.map {
flow { List(it) { emit(it) } }
}
複製代碼
這實際上獲得的是一個數據類型爲 Flow
的 Flow
,若是但願將它們拼接起來,可使用 flattenConcat
:
代碼清單21:拼接 Flow
flow {
List(5){ emit(it) }
}.map {
flow { List(it) { emit(it) } }
}.flattenConcat()
.collect { println(it) }
複製代碼
拼接的操做中 flattenConcat
是按順序拼接的,結果的順序仍然是生產時的順序;還有一個是 flattenMerge
,它會併發拼接,所以結果不會保證順序。
多數狀況下,咱們能夠經過構造合適的 Flow 來實現多路複用的效果。
上一篇文章破解 Kotlin 協程(10) - Select 篇中對 await 的複用咱們能夠用 Flow 實現以下:
代碼清單22:使用 Flow 實現對 await 的多路複用
coroutineScope {
val login = "..."
listOf(::getUserFromApi, ::getUserFromLocal) ... ①
.map { function ->
function.call(login) ... ②
}
.map { deferred ->
flow { emit(deferred.await()) } ... ③
}
.merge() ... ④
.onEach { user ->
println("Result: $user")
}.launchIn(this)
}
複製代碼
這其中,① 處用建立了兩個函數引用組成的 List;② 處調用它們獲得 deferred;③ 處比較關鍵,對於每個 deferred 咱們建立一個單獨的 Flow,並在 Flow 內部發送 deferred.await() 返回的結果,即返回的 User 對象;如今咱們有了兩個 Flow 實例,咱們須要將它們整合成一個 Flow 進行處理,調用 merge 函數便可。
圖1:使用 merge 合併 Flow
一樣的,對 Channel 的讀取複用的場景也可使用 Flow 來完成。對照破解 Kotlin 協程(10) - Select 篇,咱們給出 Flow 的實現版本:
代碼清單23:使用 Flow 實現對 Channel 的複用
val channels = List(10) { Channel<Int>() }
...
val result = channels.map {
it.consumeAsFlow()
}
.merge()
.first()
複製代碼
這比 select
的版本看上去要更簡潔明瞭,每一個 Channel 都經過 consumeAsFlow
函數被映射成 Flow,再 merge 成一個 Flow,取第一個元素。
Flow
是協程當中比較重要的異步工具,它的用法與其餘相似的響應式編程框架很是相近,你們能夠採起類比的學習方式去了解它的功能。
中文官網:www.kotlincn.net/
中文官方博客:www.kotliner.cn/
公衆號:Kotlin
知乎專欄:Kotlin
CSDN:Kotlin中文社區
掘金:Kotlin中文社區
簡書:Kotlin中文社區