Kotlin Coroutines Flow 系列(二) Flow VS RxJava2

三. Flow VS Sequences

每個 Flow 其內部是按照順序執行的,這一點跟 Sequences 很相似。java

Flow 跟 Sequences 之間的區別是 Flow 不會阻塞主線程的運行,而 Sequences 會阻塞主線程的運行。git

使用 flow:github

fun main() = runBlocking {

    launch {
        for (j in 1..5) {
            delay(100)
            println("I'm not blocked $j")
        }
    }

    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.collect { println(it) }

    println("Done")
}
複製代碼

執行結果:編程

1
I'm not blocked 1 2 I'm not blocked 2
3
I'm not blocked 3 4 I'm not blocked 4
5
Done
I'm not blocked 5 複製代碼

使用 sequence:緩存

fun main() = runBlocking {

    launch {
        for (k in 1..5) {
            delay(100)
            println("I'm blocked $k")
        }
    }

    sequence {
        for (i in 1..5) {
            Thread.sleep(100)
            yield(i)
        }
    }.forEach { println(it) }

    println("Done")
}
複製代碼

執行結果:bash

1
2
3
4
5
Done
I'm blocked 1 I'm blocked 2
I'm blocked 3 I'm blocked 4
I'm blocked 5 複製代碼

由此,能夠得出 Flow 在使用各個 suspend 函數時(本例子中使用了collect、emit函數)不會阻塞主線程的運行。異步

四. Flow VS RxJava

Kotlin 協程庫的設計自己也參考了 RxJava ,下圖展現瞭如何從 RxJava 遷移到 Kotlin 協程。(火和冰形象地表示了 Hot、Cold Stream)函數

migration from rxjava.jpeg

4.1 Cold Stream

flow 的代碼塊只有調用 collected() 纔開始運行,正如 RxJava 建立的 Observables 只有調用 subscribe() 纔開始運行同樣。post

4.2 Hot Stream

如圖上所示,能夠藉助 Kotlin Channel 來實現 Hot Stream。ui

4.3. Completion

Flow 完成時(正常或出現異常時),若是須要執行一個操做,它能夠經過兩種方式完成:imperative、declarative。

4.3.1 imperative

經過使用 try ... finally 實現

fun main() = runBlocking {
    try {
        flow {
            for (i in 1..5) {
                delay(100)
                emit(i)
            }
        }.collect { println(it) }
    } finally {
        println("Done")
    }
}
複製代碼

4.3.2 declarative

經過 onCompletion() 函數實現

fun main() = runBlocking {
    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.onCompletion { println("Done") }
        .collect { println(it) }
}
複製代碼

4.3.3 onCompleted (藉助擴展函數實現)

藉助擴展函數能夠實現相似 RxJava 的 onCompleted() 功能,只有在正常結束時纔會被調用:

fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {

    collect { value -> emit(value) }

    action()
}
複製代碼

它的使用相似於 onCompletion()

fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {

    collect { value -> emit(value) }

    action()
}

fun main() = runBlocking {
    flow {
        for (i in 1..5) {
            delay(100)
            emit(i)
        }
    }.onCompleted { println("Completed...") }
        .collect{println(it)}
}
複製代碼

可是假如 Flow 異常結束時,是不會執行 onCompleted() 函數的。

4.4 Backpressure

Backpressure 是響應式編程的功能之一。

RxJava2 Flowable 支持的 Backpressure 策略,包括:

  • MISSING:建立的 Flowable 沒有指定背壓策略,不會對經過 OnNext 發射的數據作緩存或丟棄處理。
  • ERROR:若是放入 Flowable 的異步緩存池中的數據超限了,則會拋出 MissingBackpressureException 異常。
  • BUFFER:Flowable 的異步緩存池同 Observable 的同樣,沒有固定大小,能夠無限制添加數據,不會拋出 MissingBackpressureException 異常,但會致使 OOM。
  • DROP:若是 Flowable 的異步緩存池滿了,會丟掉將要放入緩存池中的數據。
  • LATEST:若是緩存池滿了,會丟掉將要放入緩存池中的數據。這一點跟 DROP 策略同樣,不一樣的是,無論緩存池的狀態如何,LATEST 策略會將最後一條數據強行放入緩存池中。

而 Flow 的 Backpressure 是經過 suspend 函數實現。

4.4.1 buffer() 對應 BUFFER 策略

fun currTime() = System.currentTimeMillis()

var start: Long = 0

fun main() = runBlocking {

    val time = measureTimeMillis {
        (1..5)
            .asFlow()
            .onStart { start = currTime() }
            .onEach {
                delay(100)
                println("Emit $it (${currTime() - start}ms) ")
            }
            .buffer()
            .collect {
                println("Collect $it starts (${currTime() - start}ms) ")
                delay(500)
                println("Collect $it ends (${currTime() - start}ms) ")
            }
    }

    println("Cost $time ms")
}
複製代碼

執行結果:

Emit 1 (104ms) 
Collect 1 starts (108ms) 
Emit 2 (207ms) 
Emit 3 (309ms) 
Emit 4 (411ms) 
Emit 5 (513ms) 
Collect 1 ends (613ms) 
Collect 2 starts (613ms) 
Collect 2 ends (1114ms) 
Collect 3 starts (1114ms) 
Collect 3 ends (1615ms) 
Collect 4 starts (1615ms) 
Collect 4 ends (2118ms) 
Collect 5 starts (2118ms) 
Collect 5 ends (2622ms) 
Collected in 2689 ms
複製代碼

4.4.2 conflate() 對應 LATEST 策略

fun main() = runBlocking {

    val time = measureTimeMillis {
        (1..5)
            .asFlow()
            .onStart { start = currTime() }
            .onEach {
                delay(100)
                println("Emit $it (${currTime() - start}ms) ")
            }
            .conflate()
            .collect {
                println("Collect $it starts (${currTime() - start}ms) ")
                delay(500)
                println("Collect $it ends (${currTime() - start}ms) ")
            }
    }

    println("Cost $time ms")
}
複製代碼

執行結果:

Emit 1 (106ms) 
Collect 1 starts (110ms) 
Emit 2 (213ms) 
Emit 3 (314ms) 
Emit 4 (419ms) 
Emit 5 (520ms) 
Collect 1 ends (613ms) 
Collect 5 starts (613ms) 
Collect 5 ends (1113ms) 
Cost 1162 ms
複製代碼

4.4.3 DROP 策略

RxJava 的 contributor:David Karnok, 他寫了一個kotlin-flow-extensions庫,其中包括:FlowOnBackpressureDrop.kt,這個類支持 DROP 策略。

/** * Drops items from the upstream when the downstream is not ready to receive them. */
@FlowPreview
fun <T> Flow<T>.onBackpressurureDrop() : Flow<T> = FlowOnBackpressureDrop(this)
複製代碼

使用這個庫的話,能夠經過使用 Flow 的擴展函數 onBackpressurureDrop() 來支持 DROP 策略。

該系列的相關文章:

Kotlin Coroutines Flow 系列(一) Flow 基本使用

相關文章
相關標籤/搜索