關於Collections和Sequence請看:關於kotlin中的Collections、Sequence、Channel和Flow (一) html
Channel
是一個和 BlockingQueue
很是類似的概念。其中一個不一樣是它代替了阻塞的 put
操做並提供了掛起的 send
,還替代了阻塞的 take
操做並提供了掛起的 receive
。 Channel
是併發安全的,它能夠用來鏈接協程,實現不一樣協程的通訊。android
val channel = Channel<Int>()
//producer
launch(Dispatchers.IO) {
var i = 0
while (true) {
channel.send(i++)
delay(1000)
}
}
//consumer
launch {
while (true) {
println(channel.receive())
}
}
複製代碼
既然咱們說 Channel
實際上就是一個隊列,隊列不該該有緩衝區嗎,那麼這個緩衝區一旦滿了,而且也一直沒有人調用 receive
取走元素的話,send
就掛起了。那麼接下來咱們看下 Channel
的緩衝區的定義:git
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
when (capacity) {
RENDEZVOUS -> RendezvousChannel()
UNLIMITED -> LinkedListChannel()
CONFLATED -> ConflatedChannel()
else -> ArrayChannel(capacity)
}
複製代碼
RENDEZVOUS
就是 0,這個詞本意就是描述「不見不散」的場景,因此你不來 receive,我這 send 就一直擱這兒掛起等着。換句話說,咱們開頭的例子裏面,若是 consumer 不 receive,producer 裏面的第一個 send 就給掛起了。github
UNLIMITED
比較好理解,無限制,從它給出的實現 LinkedListChannel
來看,這一點也與咱們的 LinkedBlockingQueue
相似。編程
CONFLATED
,這個詞是合併的意思,這個類型的 Channel
有一個元素大小的緩衝區,但每次有新元素過來,都會用新的替換舊的。設計模式
BUFFERED
了,它接收一個值做爲緩衝區容量的大小,默認64。緩存
但Channel
是熱流,即便沒有消費者,它的生產操做也會執行。若是你不接收,那麼你可能再也接收不到。 由於剛纔說了channel
相似於BlockingQueue
, 它的send()
和receive()
其實也是入隊出隊的操做,假定有多個消費者那它們就會競爭:安全
val channel = Channel<Int>()
//producer
launch(Dispatchers.IO) {
var i = 0
while (true) {
channel.send(i++)
delay(1000)
}
}
//consumer 1
launch {
while (true) {
println("~~~"+channel.receive())
}
}
//consumer 2
launch {
while (true) {
println("!!!"+channel.receive())
}
}
部分輸出:
~~~0
~~~1
!!!2
~~~3
!!!4
~~~5
!!!6
~~~7
!!!8
~~~9
複製代碼
發現基本是交替獲取到值。那若是想全都接收怎麼辦呢: 使用BroadcastChannel
:markdown
val channel = BroadcastChannel<Int>(Channel.BUFFERED)
//producer
launch(Dispatchers.IO) {
var i = 0
while (true) {
channel.send(i++)
delay(1000)
}
}
//consumer 1
launch {
while (true) {
println("~~~"+channel.openSubscription().receive())
}
}
//consumer 2
launch {
while (true) {
println("!!!"+channel.openSubscription().receive())
}
}
部分輸出:
~~~1
!!!1
~~~2
!!!2
~~~3
!!!3
~~~4
!!!4
複製代碼
還有一點要注意的是,channel
須要手動關閉。網絡
上面說到 sequence
沒法享受更上層的協程框架概念下的各類能力,還有一點 sequence
顯然不是線程安全的,而 Channel
能夠在併發場景下使用。
launch {
val channel = produce(Dispatchers.Unconfined) {
send(1)
send(2)
}
for (item in channel) {
println("got : $item")
}
}
複製代碼
但Channel
即便沒有人「消費」,值依舊會生產,這會形成必定的浪費。
那麼能不能Sequence + Channel
搞一下?
Flow
是在 Kotlin Coroutines 1.2.0 alpha
以後新增的一套API,也叫作異步流,是 Kotlin
協程與響應式編程模型結合的產物。
響應式編程基於觀察者模式,是一種面向數據流和變化傳播的聲明式編程方式。換個說法就是:響應式編程是使用異步數據流進行編程。【響應式編程】
異步掛起函數可以返回單一值,那麼咱們如何返回多個異步計算的值呢?而這個就是
Kotlin Flow
解決的問題。
channel的【操做符】在kotlin 1.4標記爲棄用,將來是要移除掉的
Flow
有多種構建方式,如下是最簡單的方式:
viewModelScope.launch{
//構建 flow
val testFlow= flow {
emit(1)
}
//消費Flow
testFlow.collect {
println(it)
}
}
複製代碼
一個 Flow
建立出來以後,不消費則不生產,屢次消費則屢次生產,生產和消費老是相對應的。 所謂冷數據流,就是隻有消費時纔會生產的數據流,這一點與Channel
相反:Channel
的發送端並不依賴於接收端。
收集器是具備單一掛起功能的流接口收集,它是終端操做符:
public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}
複製代碼
發射器是FlowCollector,具備一個稱爲emit的單個掛起函數
public interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
複製代碼
在其內部,收集器和發射器的整個機制只是調用兩邊的函數,
而suspend關鍵字則爲其增長魔力。
fun main() = runBlocking {
flow {
emit("Context")
println(" emit on ${Thread.currentThread().name}")
}
.flowOn(Dispatchers.IO)
.map {
println(" map on ${Thread.currentThread().name}")
it + " Preservation"
}
.flowOn(Dispatchers.Default)
.collect { value ->
println(" collect on ${Thread.currentThread().name}")
println(value)
}
}
複製代碼
輸出:
emit on DefaultDispatcher-worker-2
map on DefaultDispatcher-worker-1
collect on main
Context Preservation
複製代碼
Flow
從不捕獲或處理下游⬇️流中發生的異常,它們僅使用catch
運算符捕獲上游⬆️發生的異常。
flow {
emit(1)
throw ArithmeticException("Div 0")
}.catch { t: Throwable ->
println("caught error: $t")
}
複製代碼
若是你想在流的開始和結尾處進行一些操做。 onCompletion 用起來比較相似於 try ... catch ... finally
中的 finally
,不管前面是否存在異常,它都會被調用,參數 t
則是前面未捕獲的異常。
flow {
emit(1)
}.onStart {
println("smart")
}.onCompletion { t: Throwable ->
println("caught error: $t")
}.collect {
println(it)
}
複製代碼
輸出:
smart
1
end
複製代碼
Flow
的設計初衷是但願確保流操做中異常透明。所以禁止🚫在flow
的構建中 try catch
:
Wrong:
flow {
try {
emit(1)
throw ArithmeticException("Div 0")
} catch (t: Throwable){
println("caught error: $t")
} finally {
println("finally.")
}
}
複製代碼
前面的例子當中,咱們用 collect 消費 Flow 的數據。collect
是最基本的末端操做符,除了 collect
以外,還有其餘常見的末端操做符,大致分爲三種類:
toList
、toSet
等。reduce
、fold
等操做,以及得到單個元素的操做包括 single
、singleOrNull
、first
等。collect()
和 launchIn()
等。實際上,識別是否爲末端操做符,還有一個簡單方法,因爲 Flow
的消費端必定須要運行在協程當中,所以末端操做符都是掛起函數。
Flow 沒有提供取消操做,Flow 的消費依賴於collect
這樣的末端操做符,而它們又必須在協程當中調用,所以 Flow 的取消主要依賴於末端操做符所在的協程的狀態。
val job = launch {
val intFlow = flow {
(1..3).forEach {
delay(1000)
emit(it)
}
}
intFlow.collect { println(it) }
}
delay(2500)
job.cancel()
複製代碼
flow { ... }
是基礎的建立方式,還有其餘構建器使流的聲明更簡單:
flowOf
構建器定義了一個發射固定,集的流。.asFlow()
擴展函數,能夠將各類集合與序列轉換爲流。在flow { ... }
中沒法隨意切換調度器,這是由於 emit
函數不是線程安全的:
flow {
withContext(Dispatchers.IO){ //error
emit(2)
}
emit(1)
}.collect {
println(it)
}
複製代碼
想要在生成元素時切換調度器,就須使用
channelFlow
函數來建立 Flow
:
channelFlow {
send(1)
withContext(Dispatchers.IO) {
send(2)
}
}
複製代碼
上面咱們說flow
是冷流,只有collect 以後才觸發"生產",那我就想要一個"熱"流咋整呢? **SharedFlow**
就是解決這個問題。在SharedFlow
以前一般是使用BroadcastChannel
而後asFlow
去實現,但這種實現方式不夠優雅,和Channel過於耦合。所以在Coroutine
1.4時推出了SharedFlow. 它是一個**「熱」流**,且能夠有多個訂閱者。
簡單使用:
val broadcasts=MutableSharedFlow<String>()
viewModelScope.launch{
broadcasts.emit("Hello")
broadcasts.emit("SharedFlow")
}
lifecycleScope.launch{
broadcasts.collect {
print(it)
}
}
複製代碼
StateFlow
是SharedFlow
的一個比較特殊的變種,而SharedFlow
又是Kotlin
數據流當中比較特殊的一種類型。StateFlow 與 LiveData 是最接近的,由於:
- 它始終是有值的。
- 它的值是惟一的。
- 它容許被多個觀察者共用 (所以是共享的數據流)。
- 它永遠只會把最新的值重現給訂閱者,這與活躍觀察者的數量是無關的。
當暴露 UI 的狀態給視圖時,應該使用
StateFlow
。這是一種安全和高效的觀察者,專門用於容納 UI 狀態。
簡單來講就是相似LiveData
,可是更好用!
StateFlow
僅在值已更新且不相同值時返回。簡單來講,假定兩個值x和y,其中x是最初發出的值,y是要發出的值,若是(x == y)
不執行任何操做,(x !=y)
則僅在此狀況下才發出新值。 簡單使用:
val stateFlow = MutableStateFlow(UIState.Loading)//初始狀態
stateFlow.value = UIState.Error
launch {
stateFlow.collect {
...
}
}
複製代碼
更多信息參閱:StateFlow和SharedFlow
只要是響應式編程,就必定會有背壓問題,咱們先來看看背壓到底是什麼: 生產者生產數據的速度超過了消費者消費的速度致使的問題。 但得益於suspend功能,能夠在Kotlin流程中實現透明的背壓管理。 當流的收集器不堪重負時,它能夠簡單地掛起發射器,並在準備好接受更多元素時將其resume。 但爲了保證數據不丟失,咱們也會考慮添加緩存來緩解問題:
flow {
List(100) {
emit(it)
}
}.buffer()
複製代碼
咱們也能夠爲 buffer 指定一個容量。不過,若是咱們只是單純地添加緩存,而不是從根本上解決問題就始終會形成數據積壓。 (就像咱們板球的聊天室消息緩存池)。
問題產生的根本緣由是生產和消費速率的不匹配,除直接優化消費者的性能之外,咱們也能夠採起一些取捨的手段。
第一種是 conflate
新數據會覆蓋老數據,例如:
flow {
List(100) {
emit(it)
}
}.conflate()
.collect { value ->
println("Collecting $value")
delay(100)
println("$value collected")
}
複製代碼
咱們快速地發送了 100 個元素,最後接收到的只有兩個,固然這個結果每次都不必定同樣:
Collecting 1
1 collected
Collecting 99
99 collected
複製代碼
第二種是 collectLatest
。顧名思義,只處理最新的數據,這看上去彷佛與 conflate
沒有區別,其實區別大了:它並不會直接用新數據覆蓋老數據,而是每個都會被處理,只不過若是前一個還沒被處理完後一個就來了的話,處理前一個數據的邏輯就會被取消。 仍是前面的例子,咱們稍做修改:
flow {
List(100) {
emit(it)
}
}.collectLatest { value ->
println("Collecting $value")
delay(100)
println("$value collected")
}
複製代碼
結果:
Collecting 0
Collecting 1
...
Collecting 97
Collecting 98
Collecting 99
▶ 100ms later
99 collected
複製代碼
除 collectLatest
以外還有 mapLatest
、flatMapLatest
等等,都是這個做用。
近年來flow
是谷歌大力支持技術,像Room
,DataStore
, Paging3
等都支持了Flow
,那你還等什麼呢,學起來,用起來。
能夠發送多個值,UI 狀態徹底由數據驅動,好比Follow
按鈕就能夠改造一下,先Loading
後展現結果:
@WorkerThread
fun getObservableUserEvents(userId: String?):Flow<Result<ObservableUserEvents>{
return flow {
emit(Result.Loading)
if (userId == null) {
emit(sessionRepository.getSessions())
}
}
}
複製代碼
我要給某一網絡請求增長重試機制:
override suspend fun getTrendsList() = flow<Result<xxx>> {
...
emit(Result.Success(result))
}.retry(2).catch { e ->
emit(Result.Error(e))
}
複製代碼
搜索有多個tab,都要監聽搜索的觸發,可是一次預期是觸發一個tab的搜索。在ViewPager裏,旁邊的Fragment是onPause,此時依舊能夠收到livedata
回調,可是使用lifecycleScope
和flow
便可解決這個問題,由於launchWhenResumed
不在Resume
時會掛起:
lifecycleScope.launchWhenResumed{
searchRequestFlow.collect{request->
doSearch(request)
}
}
複製代碼
而在Lifecycle 2.4.0
以後提供了一個新的API repeatOnLifecycle
,能夠指定生命週期狀態,而且在離開狀態時不是簡單的掛起,而是取消協程,當生命週期恢復時:
lifecycleScope.launch {
lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.navigationActions.collect {
...
}
}
}
複製代碼
官方傾向於使用 repeatOnLifecycle API
收集數據流,而不是在 launchWhenX API
內部進行收集。因爲後面的 API
會掛起協程,而不是在 Lifecycle
處於 STOPPED
狀態時取消。上游數據流會在後臺保持活躍狀態,並可能會發出新的項並耗用資源。
在以前,咱們會使用LiveData
發送Event
去和UI
交互,或者執行某段邏輯,可是有些時候頁面重建致使LiveData從新綁定,此時會當即收到回調致使觸發邏輯。爲了解決這個問題,在Flow
和Channel
都還沒穩定的時候,谷歌的示例使用封裝的Event
來判斷事件是否處理過:
open class Event<out T>(private val content: T) {
var hasBeenHandled = false
private set // Allow external read but not write
/** * Returns the content and prevents its use again. */
fun getContentIfNotHandled(): T? {
return if (hasBeenHandled) {
null
} else {
hasBeenHandled = true
content
}
}
/** * Returns the content, even if it's already been handled. */
fun peekContent(): T = content
}
複製代碼
但這只是個簡單的封裝,只能有一個觀察者,想要應用在複雜場景還得設計個Manager
來管理多個觀察者。可是使用SharedFlow
則不會有這個問題,畢竟LiveData
原本就不也是用來幹這活的,人家設計來是和UI
來綁定的。
由於SharedFlow
是熱流,事件被廣播給未知數量的訂閱者。在沒有訂閱者的狀況下,任何發佈的事件都會當即刪除。它是一種用於必須當即處理或根本不處理的事件的設計模式。 使用示例:
val scrollToEvent: SharedFlow<ScheduleScrollEvent> =
loadSessionsResult.combineTransform(currentEventIndex) { result, currentEventIndex ->
emit(ScheduleScrollEvent(currentEventIndex))
}.shareIn(viewModelScope, WhileViewSubscribed, replay = 0)
複製代碼
針對事件發送有些時候也可使用Channel
,這個看業務場景:Channel
會每一個事件都傳遞給單個訂閱者。一旦Channel
緩衝區滿了,會嘗試在沒有訂閱者的狀況下暫停事件發佈,等待訂閱者出現。默認狀況下永遠不會刪除已發佈的事件。(不過經過設置也能夠無緩存或者僅緩存一個)
使用示例:
// SIDE EFFECTS: Navigation actions
private val _navigationActions = Channel<NavigationAction>(capacity = Channel.CONFLATED)
val navigationActions = _navigationActions.receiveAsFlow()
複製代碼
搜索監聽輸入框,輸入時執行搜索,這裏要進行debounce,避免發出過多的sug請求:
val query=MutableStateFlow<String?>(null)
fun onTextChanged(text:String){
query.value=text
}
launch{
query.debounce(100).collect{text->
text?.let{
doSearch(text)
}
}
}
複製代碼
同時請求緩存和網絡,網絡先到則更新緩存,並取消協程,緩存先到則數據發送到UI後繼續執行,直到網絡數據返回。
listOf(
async { dataSource.getCacheData() },
async { dataSource.getRemoteData() })
.map { deferred ->
flow { emit(deferred.await()) }
}.merge().onEach { result ->
//網絡數據
if (result.requestType == RequestType.NETWORK) {
if (isActive) {
_source.postValue(result)
}
if (result is Result.Success) {
result.data?.let { newData ->
//更新緩存
dataSource.flushCache(newData)
}
cancel()
}
} else {
//緩存數據
if (result is Result.Success) {
if (isActive) {
_source.postValue(result)
}
}
}
}.onCompletion {
isPreLoading.set(false)
}.launchIn(this)
複製代碼
每次各取一個,一旦其中一個流完成,結果流就完成,並在剩餘流上調用cancel。
val flow = flowOf("4K顯示器", "2K顯示器", "1080P顯示器")
val flow2 = flowOf("小明", "小陳", "小紅", "小十一郎")
flow.zip(flow2) { i, s -> i + " 發給了 "+s }.collect {
println(it)
}
4K顯示器 發給了 小明
2K顯示器 發給了 小陳
1080P顯示器 發給了 小紅
複製代碼
經過組合每一個流的最近發射的值,使用轉換函數生成其值:
val flow = flowOf("Tom", "Jack", "Lucifer")
val flow2 = flowOf("小明", "小陳", "小紅", "小十一郎")
flow.combine(flow2) { i, s -> i + " 和 " + s + "握了手" }.collect {
println(it)
}
Tom 和 小明握了手
Jack 和 小明握了手
Jack 和 小陳握了手
Lucifer 和 小陳握了手
Lucifer 和 小紅握了手
Lucifer 和 小十一郎握了手
複製代碼
若是咱們對 第一個flow的發射加一個延遲:
val flow = flowOf("Tom", "Jack", "Lucifer").onEach { delay(10) }
val flow2 = flowOf("小明", "小陳", "小紅", "小十一郎")
flow.combine(flow2) { i, s -> i + " 和 " + s + "握了手" }.collect {
println(it)
}
Tom 和 小十一郎握了手
Jack 和 小十一郎握了手
Lucifer 和 小十一郎握了手
複製代碼
因爲第一個流加了延遲,當數據發射時,第二個流已經發送完畢了,那麼對於第二個流來講,最新值就是「小十一郎」。因此結果就成了上面那樣。
Flow
操做符雖然比RxJava
少些,但知足大部分場景,其餘操做符剩餘的你們自行研究吧~ 更多操做符請查閱: Kotlin Flow
SharedFlow
支持replay
,支持多觀察者,再加上協程特性,結合Lifecycle
,一個FlowEventBus
不就出來了?!
【Kotlin】就幾行代碼?! 用SharedFlow寫個FlowEventBus
協程 Flow 最佳實踐 | 基於 Android 開發者峯會應用
哇 你都看到這了,點個贊再走唄。~