kotlinx-coroutines-core 1.4.2版本出來了,沒有了以前的實驗方法警告,終於能夠能夠愉快的玩耍了,如今準備把有關flow的相關資料整理彙總一下java
A cold asynchronous data stream that sequentially emits values and completes normally or with an exception。
複製代碼
意思是:按順序發出值並正常完成或異常完成的Cold異步數據流。markdown
與rxjava做用相似,可能會在之後的開發中逐步代替rxjava,使整個開發生態更加趨向一體化異步
emptyFlow<String>()
複製代碼
flowOf(1, 2, 3)
// 1, 2, 3
flowOf(listOf(1,2,3))
// [1, 2, 3]
複製代碼
listOf(1, 2, 3).asFlow()
// 1, 2, 3
複製代碼
fun flowBuilderFunction(): Int {
return 10
}
::flowBuilderFunction.asFlow()
// 10
複製代碼
suspend fun flowBuilderFunction(): Int {
return 10
}
::flowBuilderFunction.asFlow()
// 10
複製代碼
LongRange(1, 5).asFlow().collect { value -> println(value) }
複製代碼
debounceasync
特性:函數
最後一個值不受影響,老是會被釋放emit。 [timeout]能夠傳毫秒,也能夠傳Durationui
flow {
emit(1)
delay(3000)
emit(2)
delay(1000)
emit(3)
delay(1000)
emit(4)
}.debounce(2000)
// 結果:1 4
// 解釋:
// 2和1的間隔大於2000,1被釋放
// 3和2的間隔小於2000, 2被忽略
// 4和3的間隔小於2000, 3被忽略
// 4是最後一個值不受timeout值的影響, 4被釋放
flow {
emit(1)
delay(3000)
emit(2)
delay(1000)
emit(3)
delay(1000)
emit(4)
}.debounce(2000.milliseconds)
// 結果:1 4
應用:可用於搜索框的反覆輸入內容篩選
複製代碼
distinctUntilChangedspa
1.若是生產的值和上個發送的值相同,值就會被過濾掉code
flow {
emit(1)
emit(1)
emit(2)
emit(2)
emit(3)
emit(4)
}.distinctUntilChanged()
// 結果:1 2 3 4
// 解釋:
// 第一個1被釋放
// 第二個1因爲和第一個1相同,被過濾掉
// 第一個2被釋放
// 第二個2因爲和第一個2相同,被過濾掉
// 第一個3被釋放
// 第一個4被釋放
複製代碼
private class Person(val age: Int, val name: String)
flow {
emit(Person(20, "張三"))
emit(Person(21, "李四"))
emit(Person(21, "王五"))
emit(Person(22, "趙六"))
}.distinctUntilChanged{old, new -> old.age == new.age }
.collect{ value -> println(value.name) }
// 結果:張三 李四 趙六
// 解釋:本例子定義若是年齡相同就認爲是相同的值,因此王五被過濾掉了
複製代碼
flow {
emit(Person(20, "張三"))
emit(Person(21, "李四"))
emit(Person(21, "王五"))
emit(Person(22, "趙六"))
}.distinctUntilChangedBy { person -> person.age }
// 結果:張三 李四 趙六
複製代碼
transformorm
對每一個值進行轉換協程
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.transform {
if (it % 2 == 0) {
emit(it * it)
}
}
// 結果:4 16
// 解釋:
// 1 不是偶數,被忽略
// 2 是偶數,2的平方4
// 3 不是偶數,被忽略
// 4 是偶數,4的平方16
複製代碼
onStart
第一個值被釋放以前被執行
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.onStart { emit(1000) }
// 結果:1000 1 2 3 4
// 解釋:
// 第一個值1被釋放的時候調用了emit(10 00), 因此1000在1以前被釋放
複製代碼
onCompletion
最後一個值釋放完成以後被執行
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.onCompletion { emit(1000) }
// 結果:1 2 3 4 1000
// 解釋:
// 第一個值4被釋放的時候調用了emit(100 0), 因此1000在4以後被釋放
複製代碼
drop
忽略最開始的[count]個值
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.drop(2)
// 結果:3 4
// 解釋:
// 最開始釋放的兩個值(1,2)被忽略了
複製代碼
dropWhile
判斷第一個值若是知足(T) -> Boolean這個條件就忽略
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.dropWhile {
it % 2 == 0
}
// 結果:1 2 3 4
// 解釋:
// 第一個值不是偶數,因此1被釋放
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.dropWhile {
it % 2 != 0
}
// 結果:2 3 4
// 解釋:
// 第一個值是偶數,因此1被忽略
複製代碼
take
只釋放前面[count]個值
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.take(2)
// 結果:1 2
// 解釋:
// 前面兩個值被釋放
複製代碼
takeWhile
判斷第一個值若是知足(T) -> Boolean這個條件就釋放
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.takeWhile { it%2 != 0 }
// 結果:1
// 解釋:
// 第一個值知足是奇數條件
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.takeWhile { it%2 == 0 }
// 結果:無
// 解釋:
// 第一個值不知足是奇數條件
複製代碼
flowOn
能夠切換CoroutineContext 說明:flowOn隻影響該運算符以前的CoroutineContext,對它以後的CoroutineContext沒有任何影響
buffer
將flow的多個任務分配到不一樣的協程中去執行,加快執行的速度。
conflate
若是值的生產速度大於值的消耗速度,就忽略掉中間將來得及處理的值,只處理最新的值。
val flow1 = flow {
delay(2000)
emit(1)
delay(2000)
emit(2)
delay(2000)
emit(3)
delay(2000)
emit(4)
}.conflate()
flow1.collect { value ->
println(value)
delay(5000)
}
// 結果: 1 3 4
// 解釋:
// 2000毫秒後生產了1這個值,交由collect 執行,花費了5000毫秒,當1這個值執行co llect完成後已經通過了7000毫秒。
// 這7000毫秒中,生產了2,可是collect還 沒執行完成又生產了3,因此7000毫秒之後 會直接執行3的collect方法,忽略了2這 個值
// collect執行完3後,還有一個4,繼續執 行。
複製代碼
將原始的Flow<T>經過[transform]轉換成Flow<Flow<T>>,而後將Flow<Flow<T>>釋放的Flow<T>其中釋放的值一個個釋放。
flow {
delay(1000)
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(1000)
emit(4)
}.flatMapConcat {
flow {
emit("$it 產生第一個flow值")
delay(2500)
emit("$it 產生第二個flow值")
}
}.collect { value ->
println(value)
}
// 結果
// I/System.out: 1 產生第一個flow值
// I/System.out: 1 產生第二個flow值
// I/System.out: 2 產生第一個flow值
// I/System.out: 2 產生第二個flow值
// I/System.out: 3 產生第一個flow值
// I/System.out: 3 產生第二個flow值
// I/System.out: 4 產生第一個flow值
// I/System.out: 4 產生第二個flow值
// 解釋:
// 原始Flow<Int>經過flatMapConcat被轉換成Flow<Flow<Int>>
// 原始Flow<Int>首先釋放1,接着Flow<Flow<Int>> 就會釋放 1產生第一個flow值 和 1產生第二個flow值 兩個值
// Flow<Int>釋放2,...
// Flow<Int>釋放3,...
// Flow<Int>釋放4,...
複製代碼
flattenConcat
和flatMapConcat相似,只是少了一步Map操做。
flow {
delay(1000)
emit(flow {
emit("1 產生第一個flow值")
delay(2000)
emit("1 產生第二個flow值") })
delay(1000)
emit(flow {
emit("2 產生第一個flow值")
delay(2000)
emit("3 產生第二個flow值") })
delay(1000)
emit(flow {
emit("3 產生第一個flow值")
delay(2000)
emit("3 產生第二個flow值") })
delay(1000)
emit(flow {
emit("4 產生第一個flow值")
delay(2500)
emit("4 產生第二個flow值") })
}.flattenConcat()
// 結果
// I/System.out: 1 產生第一個flow值
// I/System.out: 1 產生第二個flow值
// I/System.out: 2 產生第一個flow值
// I/System.out: 2 產生第二個flow值
// I/System.out: 3 產生第一個flow值
// I/System.out: 3 產生第二個flow值
// I/System.out: 4 產生第一個flow值
// I/System.out: 4 產生第二個flow值
複製代碼
flatMapMerge
將原始的Flow經過[transform]轉換成Flow<Flow>,而後將Flow<Flow>釋放的Flow其中釋放的值一個個釋放。 它與flatMapConcat的區別是:Flow<Flow>釋放的Flow其中釋放的值沒有順序性,誰先產生誰先釋放。
flow {
delay(1000)
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(1000)
emit(4)
}.flatMapMerge {
flow {
emit("$it 產生第一個flow值")
delay(2500)
emit("$it 產生第二個flow值")
}
}.collect { value ->
println(value)
}
複製代碼
merge
將Iterable<Flow>合併成一個Flow
val flow1 = listOf(
flow {
emit(1)
delay(500)
emit(2)
},
flow {
emit(3)
delay(500)
emit(4)
},
flow {
emit(5)
delay(500)
emit(6)
}
)
flow1.merge().collect { value -> println("$value") }
// 結果: 1 3 5 2 4 6
// 解釋:
// 按Iterable的順序和耗時順序依次釋放值
複製代碼
transformLatest
原始flow會觸發transformLatest轉換後的flow, 當原始flow有新的值釋放後,transformLatest轉換後的flow會被取消,接着觸發新的轉換後的flow
flatMapLatest
和transformLatest相似, 原始flow會觸發transformLatest轉換後的flow, 當原始flow有新的值釋放後,transformLatest轉換後的flow會被取消,接着觸發新的轉換後的flow
區別:flatMapLatest的transform轉換成的是Flow, transformLatest的transform轉換成的是Unit
mapLatest
和transformLatest相似, 原始flow會觸發transformLatest轉換後的flow, 當原始flow有新的值釋放後,transformLatest轉換後的flow會被取消,接着觸發新的轉換後的flow
區別:mapLatest的transform轉換成的是T,flatMapLatest的transform轉換成的是Flow,transformLatest的transform轉換成的是Unit
filter
經過predicate進行過濾,知足條件則被釋放
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.filter { it % 2 == 0 }
// 結果: 2 4
// 解釋:
// 2和4知足it % 2 == 0,被釋放
複製代碼
filterNot
經過predicate進行過濾,不知足條件則被釋放
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.filterNot { it % 2 == 0 }
// 結果: 1 3
// 解釋:
// 1和3不知足it % 2 == 0,被釋放
複製代碼
filterIsInstance
若是是某個數據類型則被釋放
flow {
emit(1)
emit("2")
emit("3")
emit(4)
}.filterIsInstance<String>()
// 結果: "2" "3"
// 解釋:
// "2" "3"是String類型,被釋放
複製代碼
filterNotNull
若是數據是非空,則被釋放
flow {
emit(1)
emit("2")
emit("3")
emit(null)
}.filterNotNull()
// 結果: 1 "2" "3"
複製代碼
map
將一個值轉換成另一個值
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.map { it * it }
// 結果: 1 4 9 16
// 解釋:
// 將1,2,3,4轉換成對應的平方數
複製代碼
mapNotNull
將一個非空值轉換成另一個值
withIndex
將值封裝成IndexedValue對象
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.withIndex()
// 結果:
// I/System.out: IndexedValue(index=0, value=1)
// I/System.out: IndexedValue(index=1, value=2)
// I/System.out: IndexedValue(index=2, value=3)
// I/System.out: IndexedValue(index=3, value=4)
複製代碼
onEach
每一個值釋放的時候能夠執行的一段代碼
scan
有一個初始值,而後每一個值都和初始值進行運算,而後這個值做爲後一個值的初始值
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.scan(100) { acc, value ->
acc * value
}
// 結果: 100 100 200 600 2400
// 解釋:
// 初始值 100
// 1 100 * 1 = 100
// 2 100 * 2 = 200
// 3 200 * 3 = 600
// 4 600 * 4 = 2400
複製代碼
runningReduce
和scan相似,可是沒有初始值,最開始是它自己
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.runningReduce { acc, value ->
acc * value
}
// 結果: 1 2 6 24
// 解釋:
// 1 1
// 2 1 * 2 = 2
// 3 2 * 3 = 6
// 4 6 * 4 = 24
複製代碼
zip
將兩個Flow在回調函數中進行處理返回一個新的值 R 當兩個flow的長度不等時只發送最短長度的事件
val nums = (1..4).asFlow()
val strs = flowOf("one", "two", "three")
nums.zip(strs) { a, b -> "$a -> $b" }
.collect { println(it) }
// 結果:
1 -> one
2 -> two
3 -> three
複製代碼
combine
任意一個flow釋放值且都有釋放值後會調用combine後的代碼塊,且值爲每一個flow的最新值。 和zip的區別: 組合兩個流,在通過第一次發射之後,任意方有新數據來的時候就能夠發射,另外一方有多是已經發射過的數據
val flow1 = flowOf(1, 2, 3, 4).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c", "d").onEach { delay(20) }
flow1.combine(flow2) { first, second ->
"$first$second"
}.collect { println("$it") }
// 結果:1a 2a 2b 3b 4b 4c 4d
// 解釋:
// 開始 --- flow1 釋放 1,flow2 釋放 a, 釋放1a
// 10毫秒 --- flow1 釋放 2,釋放2a
// 20毫秒 --- flow2 釋放 b,此時釋放2b
// 30毫秒 --- flow1 釋放 3,此時釋放3b
// 40毫秒 --- flow1 釋放 4,此時釋放4b
// 40毫秒 --- flow2 釋放 c,此時釋放4c
// 60毫秒 --- flow2 釋放 d,此時釋放4d
複製代碼
public fun <T> Flow<T>.retry( retries: Long = Long.MAX_VALUE, // 重試次數 predicate: suspend (cause: Throwable) -> Boolean = { true } ): Flow<T> 複製代碼
public fun <T> Flow<T>.retryWhen( predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean ): Flow<T>
複製代碼
Collect相關的末端操做符
collect
接收值
launchIn
scope.launch { flow.collect() }的縮寫, 表明在某個協程上下文環境中去接收釋放的值
val flow1 = flow {
delay(1000)
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(1000)
emit(4)
}
flow1.onEach { println("$it") }
.launchIn(GlobalScope)
// 結果:1 2 3 4
複製代碼
collectIndexed
和withIndex對應的,接收封裝的IndexedValue
val flow1 = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.withIndex()
flow1.collectIndexed { index, value ->
println("index = $index, value = $value")
}
// 結果:
// I/System.out: index = 0, value = IndexedValue(index=0, value=1)
// I/System.out: index = 1, value = IndexedValue(index=1, value=2)
// I/System.out: index = 2, value = IndexedValue(index=2, value=3)
// I/System.out: index = 3, value = IndexedValue(index=3, value=4)
複製代碼
collectLatest
collectLatest與collect的區別是,若是有新的值釋放,上一個值的操做若是沒執行完則將會被取消
val flow1 = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(2000)
emit(4)
}
flow1.collectLatest {
println("正在計算收到的值 $it")
delay(1500)
println("收到的值 $it")
}
// 結果:
// I/System.out: 正在計算收到的值 1
// I/System.out: 正在計算收到的值 2
// I/System.out: 正在計算收到的值 3
// I/System.out: 收到的值 3
// I/System.out: 正在計算收到的值 4
// I/System.out: 收到的值 4
// 解釋:
// 1間隔1000毫秒後釋放2,2間隔1000毫秒後釋放3,這間隔小於須要接收的時間1500毫秒,因此當2和3 到來後,以前的操做被取消了。
// 3和4 之間的間隔夠長可以等待執行完畢,4是最後一個值也能執行
複製代碼
Collection相關的末端操做符
toList
將釋放的值轉換成List
flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(2000)
emit(4)
}
println(flow1.toList())
// 結果:[1, 2, 3, 4]
複製代碼
toSet
將釋放的值轉換成Set
flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(2000)
emit(4)
}
println(flow1.toSet())
// 結果:[1, 2, 3, 4]
複製代碼
Count相關的末端操做符
count
1.計算釋放值的個數
val flow1 = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(2000)
emit(4)
}
println(flow1.count())
// 結果:4
2.計算知足某一條件的釋放值的個數
val flow1 = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(2000)
emit(4)
}
println(flow1.count { it % 2 == 0 })
// 結果:2
// 解釋:
// 偶數有2個值 2 4
```
複製代碼
Reduce相關的末端操做符
reduce
和runningReduce相似,可是隻計算最後的結果。
val flow1 = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
println(flow1.reduce { acc, value -> acc * value })
// 結果:24
// 解釋:計算最後的結果,1 * 2 * 3 * 4 = 24
複製代碼
fold
和scan相似,有一個初始值,可是隻計算最後的結果。
val flow1 = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
println(flow1.fold(100) { acc, value -> acc * value })
// 結果:2400
// 解釋:計算最後的結果,100 * 1 * 2 * 3 * 4 = 2400
複製代碼
single
只接收一個值的Flow 注意:多於1個或者沒有值都會報錯
val flow1 = flow {
emit(1)
}
println(flow1.single())
// 結果:1
複製代碼
singleOrNull
接收一個值的Flow或者一個空值的Flow
first/firstOrNull
val flow1 = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
println(flow1.first())
// 結果:1
```
2. 接收第一個知足某個條件的值
val flow1 = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
println(flow1.first { it % 2 == 0})
// 結果:2
```
複製代碼
能夠經過 try catch 捕獲錯誤異常
try {
flow {
for (i in 1..3) {
emit(i)
}
}.collect {
println("接收值 $it")
check(it <= 1) { "$it 大於1" }
}
} catch (e: Throwable) {
println("收到了異常: $e")
}
// 結果:
// I/System.out: 接收值 1
// I/System.out: 接收值 2
// I/System.out: 收到了異常: java.lang.IllegalStateException: 2 大於1
// 解釋:
// 收到2的時候就拋出了異常,讓後flow被取消,異常被捕獲
複製代碼
經過catch函數
catch函數可以捕獲以前產生的異常,以後的異常沒法捕獲。
flow {
for (i in 1..3) {
emit(i)
}
}.map {
check(it <= 1) { "$it 大於1" }
it
}
.catch { e -> println("Caught $e") }
.collect()
// 結果:
// Caught java.lang.IllegalStateException: 2 大於1
複製代碼
CoroutineScope.cancel
GlobalScope.launch {
val flow1 = flow {
for(i in 1..4){
emit(i)
}
}
flow1.collect { value ->
println("$value")
if (value >= 3) {
cancel()
}
}
}
// 結果:1 2 3
複製代碼
流取消檢測
在協程處於繁忙循環的狀況下,必須明確檢測是否取消。 能夠添加 .onEach { currentCoroutineContext().ensureActive() }, 可是這裏提供了一個現成的 cancellable 操做符來執行此操做:
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
println(value)
}
複製代碼