Kotlin Coroutines Flow 系列(一) Flow 基本使用

woman-in-blue-spaghetti-strap-dress-2266519.jpg

一. Kotlin Flow 介紹

Flow 庫是在 Kotlin Coroutines 1.3.2 發佈以後新增的庫。bash

官方文檔給予了一句話簡單的介紹:異步

Flow —  cold asynchronous stream with flow builder and comprehensive operator set (filter, map, etc);async

Flow 從文檔的介紹來看,它有點相似 RxJava 的 Observable。由於 Observable 也有 Cold 、Hot 之分函數

二. Flow 基本使用

Flow 可以返回多個異步計算的值,例以下面的 flow builder :post

flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.collect{
            println(it)
        }
複製代碼

其中 Flow 接口,只有一個 collect 函數ui

public interface Flow<out T> {

    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}
複製代碼

若是熟悉 RxJava 的話,則能夠理解爲 collect() 對應subscribe(),而 emit() 對應onNext()spa

2.1 建立 flow

除了剛剛展現的 flow builder 能夠用於建立 flow,還有其餘的幾種方式:線程

flowOf()code

flowOf(1,2,3,4,5)
        .onEach {
            delay(100)
        }
        .collect{
            println(it)
        }
複製代碼

asFlow()cdn

listOf(1, 2, 3, 4, 5).asFlow()
        .onEach {
            delay(100)
        }.collect {
            println(it)
        }
複製代碼

channelFlow()

channelFlow {
        for (i in 1..5) {
            delay(100)
            send(i)
        }
    }.collect{
        println(it)
    }
複製代碼

最後的 channelFlow builder 跟 flow builder 是有必定差別的。

flow 是 Cold Stream。在沒有切換線程的狀況下,生產者和消費者是同步非阻塞的。 channel 是 Hot Stream。而 channelFlow 實現了生產者和消費者異步非阻塞模型。

下面的代碼,展現了使用 flow builder 的狀況,大體花費1秒:

fun main() = runBlocking {

    val time = measureTimeMillis {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.collect{
            delay(100)
            println(it)
        }
    }

    print("cost $time")
}
複製代碼

flow.png

使用 channelFlow builder 的狀況,大體花費700毫秒:

fun main() = runBlocking {

    val time = measureTimeMillis{
        channelFlow {
            for (i in 1..5) {
                delay(100)
                send(i)
            }
        }.collect{
            delay(100)
            println(it)
        }
    }

    print("cost $time")
}
複製代碼

channelFlow.png

固然,flow 若是切換線程的話,花費的時間也是大體700毫秒,跟使用 channelFlow builder 效果差很少。

fun main() = runBlocking {

    val time = measureTimeMillis{
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.flowOn(Dispatchers.IO)
            .collect {
                delay(100)
                println(it)
            }
    }

    print("cost $time")
}
複製代碼

2.2 切換線程

相比於 RxJava 須要使用 observeOn、subscribeOn 來切換線程,flow 會更加簡單。只需使用 flowOn,下面的例子中,展現了 flow builder 和 map 操做符都會受到 flowOn 的影響。

flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.map {
        it * it
    }.flowOn(Dispatchers.IO)
        .collect {
            println(it)
        }
複製代碼

而 collect() 指定哪一個線程,則須要看整個 flow 處於哪一個 CoroutineScope 下。

例如,下面的代碼 collect() 則是在 main 線程:

fun main() = runBlocking {

    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.map {
        it * it
    }.flowOn(Dispatchers.IO)
        .collect {
            println("${Thread.currentThread().name}: $it")
        }
}
複製代碼

執行結果:

main: 1
main: 4
main: 9
main: 16
main: 25
複製代碼

值得注意的地方,不要使用 withContext() 來切換 flow 的線程。

2.3 flow 取消

若是 flow 是在一個掛起函數內被掛起了,那麼 flow 是能夠被取消的,不然不能取消。

fun main() = runBlocking {

    withTimeoutOrNull(2500) {
        flow {
            for (i in 1..5) {
                delay(1000)
                emit(i)
            }
        }.collect {
            println(it)
        }
    }

    println("Done")
}
複製代碼

執行結果:

1
2
Done
複製代碼

2.4 Terminal flow operators

Flow 的 API 有點相似於 Java Stream 的 API。它也一樣擁有 Intermediate Operations、Terminal Operations。

Flow 的 Terminal 運算符能夠是 suspend 函數,如 collect、single、reduce、toList 等;也能夠是 launchIn 運算符,用於在指定 CoroutineScope 內使用 flow。

@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
    collect() // tail-call
}
複製代碼

整理一下 Flow 的 Terminal 運算符

  • collect
  • single/first
  • toList/toSet/toCollection
  • count
  • fold/reduce
  • launchIn/produceIn/broadcastIn

該系列的相關文章:

Kotlin Coroutines Flow 系列(二) Flow VS RxJava2

相關文章
相關標籤/搜索