每個 Flow 其內部是按照順序執行的,這一點跟 Sequences 很相似。java
Flow 跟 Sequences 之間的區別是 Flow 不會阻塞主線程的運行,而 Sequences 會阻塞主線程的運行。git
使用 flow:github
fun main() = runBlocking {
launch {
for (j in 1..5) {
delay(100)
println("I'm not blocked $j")
}
}
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.collect { println(it) }
println("Done")
}
複製代碼
執行結果:編程
1
I'm not blocked 1 2 I'm not blocked 2
3
I'm not blocked 3 4 I'm not blocked 4
5
Done
I'm not blocked 5 複製代碼
使用 sequence:緩存
fun main() = runBlocking {
launch {
for (k in 1..5) {
delay(100)
println("I'm blocked $k")
}
}
sequence {
for (i in 1..5) {
Thread.sleep(100)
yield(i)
}
}.forEach { println(it) }
println("Done")
}
複製代碼
執行結果:bash
1
2
3
4
5
Done
I'm blocked 1 I'm blocked 2
I'm blocked 3 I'm blocked 4
I'm blocked 5 複製代碼
由此,能夠得出 Flow 在使用各個 suspend 函數時(本例子中使用了collect、emit函數)不會阻塞主線程的運行。異步
Kotlin 協程庫的設計自己也參考了 RxJava ,下圖展現瞭如何從 RxJava 遷移到 Kotlin 協程。(火和冰形象地表示了 Hot、Cold Stream)函數
flow 的代碼塊只有調用 collected() 纔開始運行,正如 RxJava 建立的 Observables 只有調用 subscribe() 纔開始運行同樣。post
如圖上所示,能夠藉助 Kotlin Channel 來實現 Hot Stream。ui
Flow 完成時(正常或出現異常時),若是須要執行一個操做,它能夠經過兩種方式完成:imperative、declarative。
經過使用 try ... finally 實現
fun main() = runBlocking {
try {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.collect { println(it) }
} finally {
println("Done")
}
}
複製代碼
經過 onCompletion() 函數實現
fun main() = runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.onCompletion { println("Done") }
.collect { println(it) }
}
複製代碼
藉助擴展函數能夠實現相似 RxJava 的 onCompleted() 功能,只有在正常結束時纔會被調用:
fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
collect { value -> emit(value) }
action()
}
複製代碼
它的使用相似於 onCompletion()
fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
collect { value -> emit(value) }
action()
}
fun main() = runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.onCompleted { println("Completed...") }
.collect{println(it)}
}
複製代碼
可是假如 Flow 異常結束時,是不會執行 onCompleted() 函數的。
Backpressure 是響應式編程的功能之一。
RxJava2 Flowable 支持的 Backpressure 策略,包括:
而 Flow 的 Backpressure 是經過 suspend 函數實現。
fun currTime() = System.currentTimeMillis()
var start: Long = 0
fun main() = runBlocking {
val time = measureTimeMillis {
(1..5)
.asFlow()
.onStart { start = currTime() }
.onEach {
delay(100)
println("Emit $it (${currTime() - start}ms) ")
}
.buffer()
.collect {
println("Collect $it starts (${currTime() - start}ms) ")
delay(500)
println("Collect $it ends (${currTime() - start}ms) ")
}
}
println("Cost $time ms")
}
複製代碼
執行結果:
Emit 1 (104ms)
Collect 1 starts (108ms)
Emit 2 (207ms)
Emit 3 (309ms)
Emit 4 (411ms)
Emit 5 (513ms)
Collect 1 ends (613ms)
Collect 2 starts (613ms)
Collect 2 ends (1114ms)
Collect 3 starts (1114ms)
Collect 3 ends (1615ms)
Collect 4 starts (1615ms)
Collect 4 ends (2118ms)
Collect 5 starts (2118ms)
Collect 5 ends (2622ms)
Collected in 2689 ms
複製代碼
fun main() = runBlocking {
val time = measureTimeMillis {
(1..5)
.asFlow()
.onStart { start = currTime() }
.onEach {
delay(100)
println("Emit $it (${currTime() - start}ms) ")
}
.conflate()
.collect {
println("Collect $it starts (${currTime() - start}ms) ")
delay(500)
println("Collect $it ends (${currTime() - start}ms) ")
}
}
println("Cost $time ms")
}
複製代碼
執行結果:
Emit 1 (106ms)
Collect 1 starts (110ms)
Emit 2 (213ms)
Emit 3 (314ms)
Emit 4 (419ms)
Emit 5 (520ms)
Collect 1 ends (613ms)
Collect 5 starts (613ms)
Collect 5 ends (1113ms)
Cost 1162 ms
複製代碼
RxJava 的 contributor:David Karnok, 他寫了一個kotlin-flow-extensions庫,其中包括:FlowOnBackpressureDrop.kt,這個類支持 DROP 策略。
/** * Drops items from the upstream when the downstream is not ready to receive them. */
@FlowPreview
fun <T> Flow<T>.onBackpressurureDrop() : Flow<T> = FlowOnBackpressureDrop(this)
複製代碼
使用這個庫的話,能夠經過使用 Flow 的擴展函數 onBackpressurureDrop() 來支持 DROP 策略。
該系列的相關文章: