相對於 RxJava 多線程的學習曲線,Flow 對線程的切換友好地多。java
在以前的 Kotlin Coroutines Flow 系列(一) Flow 基本使用 一文中曾經介紹過 Flow 的切換線程,以及 flowOn 操做符。多線程
Flow 只需使用 flowOn 操做符,而沒必要像 RxJava 須要去深刻理解 observeOn、subscribeOn 之間的區別。併發
RxJava 的 observeOn 操做符,接收一個 Scheduler 參數,用來指定下游操做運行在特定的線程調度器 Scheduler 上。app
Flow 的 flowOn 操做符,接收一個 CoroutineContext 參數,影響的是上游的操做。ide
例如:函數
fun main() = runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.collect {
println("${Thread.currentThread().name}: $it")
}
}
複製代碼
flow builder 和 map 操做符都會受到flowOn
的影響,並使用 Dispatchers.io 線程池。post
再例如:學習
val customerDispatcher = Executors.newFixedThreadPool(5).asCoroutineDispatcher()
fun main() = runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.map {
it+1
}
.flowOn(customerDispatcher)
.collect {
println("${Thread.currentThread().name}: $it")
}
}
複製代碼
flow builder 和兩個 map 操做符都會受到兩個flowOn
的影響,其中 flow builder 和第一個 map 操做符跟上面的例子同樣,第二個 map 操做符會切換到指定的 customerDispatcher 線程池。ui
在 Kotlin Coroutines Flow 系列(二) Flow VS RxJava2 一文中,曾介紹 buffer 操做符對應 RxJava Backpressure 中的 BUFFER 策略。spa
事實上 buffer 操做符也能夠併發地執行任務,它是除了使用 flowOn 操做符以外的另外一種方式,只是不能顯示地指定 Dispatchers。
例如:
fun main() = runBlocking {
val time = measureTimeMillis {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}
.buffer()
.collect { value ->
delay(300)
println(value)
}
}
println("Collected in $time ms")
}
複製代碼
執行結果:
1
2
3
4
5
Collected in 1676 ms
複製代碼
在上述例子中,全部的 delay 所花費的時間是2000ms。然而經過 buffer 操做符併發
地執行 emit,再順序地執行 collect 函數後,所花費的時間在 1700ms 左右。
若是去掉 buffer 操做符。
fun main() = runBlocking {
val time = measureTimeMillis {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}
.collect { value ->
delay(300)
println(value)
}
}
println("Collected in $time ms")
}
複製代碼
執行結果:
1
2
3
4
5
Collected in 2039 ms
複製代碼
所花費的時間比剛纔多了300多ms。
在講解並行操做以前,先來了解一下併發和並行的區別。
併發(concurrency):是指一個處理器同時處理多個任務。 並行(parallelism):是多個處理器或者是多核的處理器同時處理多個不一樣的任務。並行是同時發生的多個併發事件,具備併發的含義,而併發則不必定是並行。
RxJava 能夠藉助 flatMap 操做符實現並行,亦能夠使用 ParallelFlowable 類實現並行操做。
下面,以 flatMap 操做符爲例實現 RxJava 的並行:
Observable.range(1,100)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
return Observable.just(integer)
.subscribeOn(Schedulers.io())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return integer.toString();
}
});
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String str) throws Exception {
System.out.println(str);
}
});
複製代碼
Flow 也有相應的操做符 flatMapMerge 能夠實現並行。
fun main() = runBlocking {
val result = arrayListOf<Int>()
for (index in 1..100){
result.add(index)
}
result.asFlow()
.flatMapMerge {
flow {
emit(it)
}
.flowOn(Dispatchers.IO)
}
.collect { println("$it") }
}
複製代碼
整體而言,Flow 相比於 RxJava 更加簡潔一些。
該系列的相關文章:
Kotlin Coroutines Flow 系列(一) Flow 基本使用