Flow 庫是在 Kotlin Coroutines 1.3.2 發佈以後新增的庫。bash
官方文檔給予了一句話簡單的介紹:異步
Flow — cold asynchronous stream with flow builder and comprehensive operator set (filter, map, etc);async
Flow 從文檔的介紹來看,它有點相似 RxJava 的 Observable。由於 Observable 也有 Cold 、Hot 之分。函數
Flow 可以返回多個異步計算的值,例以下面的 flow builder :post
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.collect{
println(it)
}
複製代碼
其中 Flow 接口,只有一個 collect 函數ui
public interface Flow<out T> {
@InternalCoroutinesApi
public suspend fun collect(collector: FlowCollector<T>)
}
複製代碼
若是熟悉 RxJava 的話,則能夠理解爲 collect() 對應subscribe()
,而 emit() 對應onNext()
。spa
除了剛剛展現的 flow builder 能夠用於建立 flow,還有其餘的幾種方式:線程
flowOf()code
flowOf(1,2,3,4,5)
.onEach {
delay(100)
}
.collect{
println(it)
}
複製代碼
asFlow()cdn
listOf(1, 2, 3, 4, 5).asFlow()
.onEach {
delay(100)
}.collect {
println(it)
}
複製代碼
channelFlow()
channelFlow {
for (i in 1..5) {
delay(100)
send(i)
}
}.collect{
println(it)
}
複製代碼
最後的 channelFlow builder 跟 flow builder 是有必定差別的。
flow 是 Cold Stream。在沒有切換線程的狀況下,生產者和消費者是同步非阻塞的。 channel 是 Hot Stream。而 channelFlow 實現了生產者和消費者異步非阻塞模型。
下面的代碼,展現了使用 flow builder 的狀況,大體花費1秒:
fun main() = runBlocking {
val time = measureTimeMillis {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.collect{
delay(100)
println(it)
}
}
print("cost $time")
}
複製代碼
使用 channelFlow builder 的狀況,大體花費700毫秒:
fun main() = runBlocking {
val time = measureTimeMillis{
channelFlow {
for (i in 1..5) {
delay(100)
send(i)
}
}.collect{
delay(100)
println(it)
}
}
print("cost $time")
}
複製代碼
固然,flow 若是切換線程的話,花費的時間也是大體700毫秒,跟使用 channelFlow builder 效果差很少。
fun main() = runBlocking {
val time = measureTimeMillis{
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.flowOn(Dispatchers.IO)
.collect {
delay(100)
println(it)
}
}
print("cost $time")
}
複製代碼
相比於 RxJava 須要使用 observeOn、subscribeOn 來切換線程,flow 會更加簡單。只需使用 flowOn
,下面的例子中,展現了 flow builder 和 map 操做符都會受到 flowOn 的影響。
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.collect {
println(it)
}
複製代碼
而 collect() 指定哪一個線程,則須要看整個 flow 處於哪一個 CoroutineScope 下。
例如,下面的代碼 collect() 則是在 main 線程:
fun main() = runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.collect {
println("${Thread.currentThread().name}: $it")
}
}
複製代碼
執行結果:
main: 1
main: 4
main: 9
main: 16
main: 25
複製代碼
值得注意的地方,不要使用 withContext() 來切換 flow 的線程。
若是 flow 是在一個掛起函數內被掛起了,那麼 flow 是能夠被取消的,不然不能取消。
fun main() = runBlocking {
withTimeoutOrNull(2500) {
flow {
for (i in 1..5) {
delay(1000)
emit(i)
}
}.collect {
println(it)
}
}
println("Done")
}
複製代碼
執行結果:
1
2
Done
複製代碼
Flow 的 API 有點相似於 Java Stream 的 API。它也一樣擁有 Intermediate Operations、Terminal Operations。
Flow 的 Terminal 運算符能夠是 suspend 函數,如 collect、single、reduce、toList 等;也能夠是 launchIn 運算符,用於在指定 CoroutineScope 內使用 flow。
@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
collect() // tail-call
}
複製代碼
整理一下 Flow 的 Terminal 運算符
該系列的相關文章: