Kotlin Coroutines Flow 系列(四) 線程操做

photo-of-woman-wearing-denim-jacket-2419423.jpg

七. Flow 線程操做

7.1 更爲簡化的線程切換

相對於 RxJava 多線程的學習曲線,Flow 對線程的切換友好地多。java

在以前的 Kotlin Coroutines Flow 系列(一) Flow 基本使用 一文中曾經介紹過 Flow 的切換線程,以及 flowOn 操做符。多線程

Flow 只需使用 flowOn 操做符,而沒必要像 RxJava 須要去深刻理解 observeOn、subscribeOn 之間的區別。併發

7.2 flowOn VS RxJava 的 observeOn

RxJava 的 observeOn 操做符,接收一個 Scheduler 參數,用來指定下游操做運行在特定的線程調度器 Scheduler 上。app

Flow 的 flowOn 操做符,接收一個 CoroutineContext 參數,影響的是上游的操做。ide

例如:函數

fun main() = runBlocking {

    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.map {
            it * it
        }.flowOn(Dispatchers.IO)
        .collect {
            println("${Thread.currentThread().name}: $it")
        }
}
複製代碼

flow builder 和 map 操做符都會受到flowOn的影響,並使用 Dispatchers.io 線程池。post

再例如:學習

val customerDispatcher = Executors.newFixedThreadPool(5).asCoroutineDispatcher()

fun main() = runBlocking {

    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.map {
            it * it
        }.flowOn(Dispatchers.IO)
        .map {
            it+1
        }
        .flowOn(customerDispatcher)
        .collect {
            println("${Thread.currentThread().name}: $it")
        }
}
複製代碼

flow builder 和兩個 map 操做符都會受到兩個flowOn的影響,其中 flow builder 和第一個 map 操做符跟上面的例子同樣,第二個 map 操做符會切換到指定的 customerDispatcher 線程池。ui

7.3 buffer 實現併發操做

Kotlin Coroutines Flow 系列(二) Flow VS RxJava2 一文中,曾介紹 buffer 操做符對應 RxJava Backpressure 中的 BUFFER 策略。spa

事實上 buffer 操做符也能夠併發地執行任務,它是除了使用 flowOn 操做符以外的另外一種方式,只是不能顯示地指定 Dispatchers。

例如:

fun main() = runBlocking {
    val time = measureTimeMillis {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }
        .buffer()
        .collect { value ->
            delay(300)
            println(value)
        }
    }
    println("Collected in $time ms")
}
複製代碼

執行結果:

1
2
3
4
5
Collected in 1676 ms
複製代碼

在上述例子中,全部的 delay 所花費的時間是2000ms。然而經過 buffer 操做符併發地執行 emit,再順序地執行 collect 函數後,所花費的時間在 1700ms 左右。

若是去掉 buffer 操做符。

fun main() = runBlocking {
    val time = measureTimeMillis {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }
        .collect { value ->
            delay(300)
            println(value)
        }
    }
    println("Collected in $time ms")
}
複製代碼

執行結果:

1
2
3
4
5
Collected in 2039 ms
複製代碼

所花費的時間比剛纔多了300多ms。

7.4 並行操做

在講解並行操做以前,先來了解一下併發和並行的區別。

併發(concurrency):是指一個處理器同時處理多個任務。 並行(parallelism):是多個處理器或者是多核的處理器同時處理多個不一樣的任務。並行是同時發生的多個併發事件,具備併發的含義,而併發則不必定是並行。

RxJava 能夠藉助 flatMap 操做符實現並行,亦能夠使用 ParallelFlowable 類實現並行操做。

下面,以 flatMap 操做符爲例實現 RxJava 的並行:

Observable.range(1,100)
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        return Observable.just(integer)
                                .subscribeOn(Schedulers.io())
                                .map(new Function<Integer, String>() {

                                    @Override
                                    public String apply(Integer integer) throws Exception {
                                        return integer.toString();
                                    }
                                });
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String str) throws Exception {

                        System.out.println(str);
                    }
                });
複製代碼

Flow 也有相應的操做符 flatMapMerge 能夠實現並行。

fun main() = runBlocking {

    val result = arrayListOf<Int>()
    for (index in 1..100){
        result.add(index)
    }

    result.asFlow()
        .flatMapMerge {
            flow {
                emit(it)
            }
            .flowOn(Dispatchers.IO)
        }
        .collect { println("$it") }
}
複製代碼

整體而言,Flow 相比於 RxJava 更加簡潔一些。

該系列的相關文章:

Kotlin Coroutines Flow 系列(一) Flow 基本使用

Kotlin Coroutines Flow 系列(二) Flow VS RxJava2

Kotlin Coroutines Flow 系列(三) 異常處理

相關文章
相關標籤/搜索