關於kotlin中的Collections、Sequence、Channel和Flow (二)

關於Collections和Sequence請看關於kotlin中的Collections、Sequence、Channel和Flow (一) html

Channel

image.png

簡介

Channel 是一個和 BlockingQueue 很是類似的概念。其中一個不一樣是它代替了阻塞的 put 操做並提供了掛起的 send,還替代了阻塞的 take 操做並提供了掛起的 receiveChannel 是併發安全的,它能夠用來鏈接協程,實現不一樣協程的通訊。android

簡單使用

val channel = Channel<Int>()
//producer
launch(Dispatchers.IO) {
    var i = 0
    while (true) {
        channel.send(i++)
        delay(1000)
    }
}

//consumer
launch {
    while (true) {
        println(channel.receive())
    }
}
複製代碼

既然咱們說 Channel 實際上就是一個隊列,隊列不該該有緩衝區嗎,那麼這個緩衝區一旦滿了,而且也一直沒有人調用 receive 取走元素的話,send 就掛起了。那麼接下來咱們看下 Channel 的緩衝區的定義:git

public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> RendezvousChannel()
        UNLIMITED -> LinkedListChannel()
        CONFLATED -> ConflatedChannel()
        else -> ArrayChannel(capacity)
    }
複製代碼
  • RENDEZVOUS 就是 0,這個詞本意就是描述「不見不散」的場景,因此你不來 receive,我這 send 就一直擱這兒掛起等着。換句話說,咱們開頭的例子裏面,若是 consumer 不 receive,producer 裏面的第一個 send 就給掛起了。github

  • UNLIMITED 比較好理解,無限制,從它給出的實現 LinkedListChannel 來看,這一點也與咱們的 LinkedBlockingQueue 相似。編程

  • CONFLATED,這個詞是合併的意思,這個類型的 Channel 有一個元素大小的緩衝區,但每次有新元素過來,都會用新的替換舊的。設計模式

  • BUFFERED 了,它接收一個值做爲緩衝區容量的大小,默認64。緩存

Channel是熱流,即便沒有消費者,它的生產操做也會執行。若是你不接收,那麼你可能再也接收不到。 由於剛纔說了channel 相似於BlockingQueue , 它的send()receive() 其實也是入隊出隊的操做,假定有多個消費者那它們就會競爭:安全

val channel = Channel<Int>()
//producer
launch(Dispatchers.IO) {
    var i = 0
    while (true) {
        channel.send(i++)
        delay(1000)
    }
}

//consumer 1
launch {
    while (true) {
        println("~~~"+channel.receive())
    }

}
//consumer 2
launch {
    while (true) {
        println("!!!"+channel.receive())
    }
}

部分輸出:
~~~0
~~~1
!!!2
~~~3
!!!4
~~~5
!!!6
~~~7
!!!8
~~~9
複製代碼

發現基本是交替獲取到值。那若是想全都接收怎麼辦呢: 使用BroadcastChannel :markdown

val channel = BroadcastChannel<Int>(Channel.BUFFERED)
//producer
launch(Dispatchers.IO) {
    var i = 0
    while (true) {
        channel.send(i++)
        delay(1000)
    }
}
//consumer 1
launch {
    while (true) {
        println("~~~"+channel.openSubscription().receive())
    }

}
//consumer 2
launch {
    while (true) {
        println("!!!"+channel.openSubscription().receive())
    }
}

部分輸出:
~~~1
!!!1
~~~2
!!!2
~~~3
!!!3
~~~4
!!!4
複製代碼

還有一點要注意的是,channel須要手動關閉。網絡

Channel 版本的序列生成器

上面說到 sequence 沒法享受更上層的協程框架概念下的各類能力,還有一點 sequence 顯然不是線程安全的,而 Channel 能夠在併發場景下使用。

launch {
    val channel = produce(Dispatchers.Unconfined) {
        send(1)
        send(2)
    }

    for (item in channel) {
        println("got : $item")
    }
}
複製代碼

Channel 即便沒有人「消費」,值依舊會生產,這會形成必定的浪費。
那麼能不能Sequence + Channel 搞一下?

image.png

Flow

image.png

簡介

Flow 是在 Kotlin Coroutines 1.2.0 alpha 以後新增的一套API,也叫作異步流,是 Kotlin 協程與響應式編程模型結合的產物。

什麼是響應式編程

響應式編程基於觀察者模式,是一種面向數據流和變化傳播的聲明式編程方式。換個說法就是:響應式編程是使用異步數據流進行編程。【響應式編程】

Flow 解決了什麼

異步掛起函數可以返回單一值,那麼咱們如何返回多個異步計算的值呢?而這個就是Kotlin Flow解決的問題。

和Channel對比

  • Flow是「冷」🥶的 ,和Sequence同樣,只有遇到末端操做纔會執行,但又不同↓
  • Flow是響應式的,由生產者回調給消費者 (sequence是消費端通知生產端)
  • 它基於協程構建,所以提供告終構化併發和取消的全部好處。
  • 豐富的操做符

channel的【操做符】在kotlin 1.4標記爲棄用,將來是要移除掉的

如何使用

Flow有多種構建方式,如下是最簡單的方式:

viewModelScope.launch{
    //構建 flow
    val testFlow= flow {
         emit(1)
    }

    //消費Flow
    testFlow.collect { 
       println(it)
    }
    
}
複製代碼

怎麼就是冷流了

一個 Flow 建立出來以後,不消費則不生產,屢次消費則屢次生產,生產和消費老是相對應的。 所謂冷數據流,就是隻有消費時纔會生產的數據流,這一點與Channel 相反:Channel 的發送端並不依賴於接收端。

image.png 收集器是具備單一掛起功能的流接口收集,它是終端操做符:

public interface Flow<out T> {    
    public suspend fun collect(collector: FlowCollector<T>)
}
複製代碼

發射器是FlowCollector,具備一個稱爲emit的單個掛起函數

public interface FlowCollector<in T> {  
    public suspend fun emit(value: T)
}
複製代碼

在其內部,收集器和發射器的整個機制只是調用兩邊的函數

而suspend關鍵字則爲其增長魔力。

線程切換

  • 使用flowOn()來切換流的執行線程,flowOn 指定的調度器影響前面的邏輯。
fun main() = runBlocking {
   flow {
        emit("Context")
        println(" emit on ${Thread.currentThread().name}")
    }
    .flowOn(Dispatchers.IO)
    .map {
        println(" map on ${Thread.currentThread().name}")
        it + " Preservation"
     }
     .flowOn(Dispatchers.Default)
     .collect { value ->
        println(" collect on ${Thread.currentThread().name}")
        println(value)
      }
}
複製代碼

輸出:

emit on DefaultDispatcher-worker-2
 map on DefaultDispatcher-worker-1
 collect on main
 Context Preservation
複製代碼

異常處理

Flow從不捕獲或處理下游⬇️流中發生的異常,它們僅使用catch運算符捕獲上游⬆️發生的異常。

flow {
  emit(1)
  throw ArithmeticException("Div 0")
}.catch { t: Throwable ->
  println("caught error: $t")
}
複製代碼

開始和結束

若是你想在流的開始和結尾處進行一些操做。 onCompletion 用起來比較相似於 try ... catch ... finally 中的 finally,不管前面是否存在異常,它都會被調用,參數 t 則是前面未捕獲的異常。

flow {
    emit(1)
}.onStart {
    println("smart")
}.onCompletion {  t: Throwable ->
  println("caught error: $t")
}.collect {
    println(it)
}
複製代碼

輸出:

smart
1
end
複製代碼

Flow 的設計初衷是但願確保流操做中異常透明。所以禁止🚫在flow的構建中 try catch:

Wrong

flow { 
  try {
    emit(1)
    throw ArithmeticException("Div 0")
  } catch (t: Throwable){
    println("caught error: $t")
  } finally {
    println("finally.")
  }
}
複製代碼

末端操做符

前面的例子當中,咱們用 collect 消費 Flow 的數據。collect 是最基本的末端操做符,除了 collect 以外,還有其餘常見的末端操做符,大致分爲三種類:

  1. 集合類型轉換操做,包括 toListtoSet 等。
  2. 聚合操做,包括將 Flow 規約到單值的 reducefold 等操做,以及得到單個元素的操做包括 singlesingleOrNullfirst 等。
  3. 無操做 collect()launchIn()等。

實際上,識別是否爲末端操做符,還有一個簡單方法,因爲 Flow 的消費端必定須要運行在協程當中,所以末端操做符都是掛起函數。

Flow 的取消

Flow 沒有提供取消操做,Flow 的消費依賴於collect 這樣的末端操做符,而它們又必須在協程當中調用,所以 Flow 的取消主要依賴於末端操做符所在的協程的狀態

val job = launch {
    val intFlow = flow {
        (1..3).forEach {
            delay(1000)
            emit(it)
        }
    }

    intFlow.collect { println(it) }
}

delay(2500)
job.cancel()
複製代碼

其餘 Flow 的建立方式

flow { ... } 是基礎的建立方式,還有其餘構建器使流的聲明更簡單:

  • flowOf 構建器定義了一個發射固定,集的流。
  • 使用 .asFlow() 擴展函數,能夠將各類集合與序列轉換爲流。

flow { ... }中沒法隨意切換調度器,這是由於 emit 函數不是線程安全的:

flow {
    withContext(Dispatchers.IO){  //error
        emit(2)
    }
    emit(1)
}.collect {
    println(it)
}
複製代碼

image.png 想要在生成元素時切換調度器,就須使用channelFlow 函數來建立 Flow

channelFlow {
  send(1)
  withContext(Dispatchers.IO) {
    send(2)
  }
}
複製代碼

SharedFlow

上面咱們說flow是冷流,只有collect 以後才觸發"生產",那我就想要一個"熱"流咋整呢? **SharedFlow**就是解決這個問題。在SharedFlow以前一般是使用BroadcastChannel而後asFlow 去實現,但這種實現方式不夠優雅,和Channel過於耦合。所以在Coroutine 1.4時推出了SharedFlow. 它是一個**「熱」流**,且能夠有多個訂閱者

簡單使用:

val broadcasts=MutableSharedFlow<String>()

viewModelScope.launch{
      broadcasts.emit("Hello")
      broadcasts.emit("SharedFlow")
}


lifecycleScope.launch{
    broadcasts.collect { 
       print(it)
    }
}
複製代碼

StateFlow

StateFlowSharedFlow 的一個比較特殊的變種,而 SharedFlow又是 Kotlin 數據流當中比較特殊的一種類型。StateFlow 與 LiveData 是最接近的,由於:

  • 它始終是有值的。
  • 它的值是惟一的。
  • 它容許被多個觀察者共用 (所以是共享的數據流)。
  • 它永遠只會把最新的值重現給訂閱者,這與活躍觀察者的數量是無關的。

當暴露 UI 的狀態給視圖時,應該使用 StateFlow。這是一種安全和高效的觀察者,專門用於容納 UI 狀態。

簡單來講就是相似LiveData,可是更好用!

StateFlow僅在值已更新且不相同值時返回。簡單來講,假定兩個值x和y,其中x是最初發出的值,y是要發出的值,若是(x == y)不執行任何操做,(x !=y)則僅在此狀況下才發出新值。 簡單使用:

val stateFlow = MutableStateFlow(UIState.Loading)//初始狀態

stateFlow.value = UIState.Error

launch {
    stateFlow.collect {
       ...
    }
}
複製代碼

更多信息參閱:StateFlow和SharedFlow

背壓

只要是響應式編程,就必定會有背壓問題,咱們先來看看背壓到底是什麼: 生產者生產數據的速度超過了消費者消費的速度致使的問題。 但得益於suspend功能,能夠在Kotlin流程中實現透明的背壓管理。 當流的收集器不堪重負時,它能夠簡單地掛起發射器,並在準備好接受更多元素時將其resume。 但爲了保證數據不丟失,咱們也會考慮添加緩存來緩解問題:

flow {
  List(100) {
    emit(it)
  }
}.buffer()
複製代碼

咱們也能夠爲 buffer 指定一個容量。不過,若是咱們只是單純地添加緩存,而不是從根本上解決問題就始終會形成數據積壓。 (就像咱們板球的聊天室消息緩存池)。

問題產生的根本緣由是生產和消費速率的不匹配,除直接優化消費者的性能之外,咱們也能夠採起一些取捨的手段。

第一種是 conflate 新數據會覆蓋老數據,例如:

flow {
  List(100) {
    emit(it)
  }
}.conflate()
.collect { value ->
  println("Collecting $value")
  delay(100) 
  println("$value collected")
}
複製代碼

咱們快速地發送了 100 個元素,最後接收到的只有兩個,固然這個結果每次都不必定同樣:

Collecting 1
1 collected
Collecting 99
99 collected
複製代碼

第二種是 collectLatest。顧名思義,只處理最新的數據,這看上去彷佛與 conflate 沒有區別,其實區別大了:它並不會直接用新數據覆蓋老數據,而是每個都會被處理,只不過若是前一個還沒被處理完後一個就來了的話,處理前一個數據的邏輯就會被取消。 仍是前面的例子,咱們稍做修改:

flow {
  List(100) {
    emit(it)
  }
}.collectLatest { value ->
  println("Collecting $value")
  delay(100)
  println("$value collected")
}
複製代碼

結果:

Collecting 0
Collecting 1
...
Collecting 97
Collecting 98
Collecting 99
▶ 100ms later
99 collected
複製代碼

collectLatest 以外還有 mapLatestflatMapLatest 等等,都是這個做用。

在項目中的實戰

近年來flow是谷歌大力支持技術,像RoomDataStore, Paging3 等都支持了Flow ,那你還等什麼呢,學起來,用起來。

image.png

普通suspend請求改造

能夠發送多個值,UI 狀態徹底由數據驅動,好比Follow按鈕就能夠改造一下,先Loading後展現結果:

@WorkerThread
 fun getObservableUserEvents(userId: String?):Flow<Result<ObservableUserEvents>{
    return flow {
        emit(Result.Loading)
        if (userId == null) {
            emit(sessionRepository.getSessions())
        }
    }
}
複製代碼

重試機制

我要給某一網絡請求增長重試機制:

override suspend fun getTrendsList() = flow<Result<xxx>> {
   ...
   emit(Result.Success(result))
}.retry(2).catch { e ->
   emit(Result.Error(e))
}
複製代碼

當lifecycleScope趕上flow

搜索有多個tab,都要監聽搜索的觸發,可是一次預期是觸發一個tab的搜索。在ViewPager裏,旁邊的Fragment是onPause,此時依舊能夠收到livedata回調,可是使用lifecycleScopeflow 便可解決這個問題,由於launchWhenResumed不在Resume時會掛起:

lifecycleScope.launchWhenResumed{
    searchRequestFlow.collect{request->
        doSearch(request)
    }
}
複製代碼

而在Lifecycle 2.4.0以後提供了一個新的API repeatOnLifecycle,能夠指定生命週期狀態,而且在離開狀態時不是簡單的掛起,而是取消協程,當生命週期恢復時:

lifecycleScope.launch {
    lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
        viewModel.navigationActions.collect {
            ...
        }
    }
}
複製代碼

官方傾向於使用 repeatOnLifecycle API 收集數據流,而不是在 launchWhenX API 內部進行收集。因爲後面的 API 會掛起協程,而不是在 Lifecycle 處於 STOPPED 狀態時取消。上游數據流會在後臺保持活躍狀態,並可能會發出新的項並耗用資源。

"數據倒灌"?不存在的

在以前,咱們會使用LiveData發送Event去和UI交互,或者執行某段邏輯,可是有些時候頁面重建致使LiveData從新綁定,此時會當即收到回調致使觸發邏輯。爲了解決這個問題,在FlowChannel都還沒穩定的時候,谷歌的示例使用封裝的Event來判斷事件是否處理過:

open class Event<out T>(private val content: T) {

    var hasBeenHandled = false
        private set // Allow external read but not write

    /** * Returns the content and prevents its use again. */
    fun getContentIfNotHandled(): T? {
        return if (hasBeenHandled) {
            null
        } else {
            hasBeenHandled = true
            content
        }
    }

    /** * Returns the content, even if it's already been handled. */
    fun peekContent(): T = content
}
複製代碼

但這只是個簡單的封裝,只能有一個觀察者,想要應用在複雜場景還得設計個Manager來管理多個觀察者。可是使用SharedFlow 則不會有這個問題,畢竟LiveData原本就不也是用來幹這活的,人家設計來是和UI來綁定的。

由於SharedFlow是熱流,事件被廣播給未知數量的訂閱者。在沒有訂閱者的狀況下,任何發佈的事件都會當即刪除。它是一種用於必須當即處理或根本不處理的事件的設計模式。 使用示例:

val scrollToEvent: SharedFlow<ScheduleScrollEvent> =
    loadSessionsResult.combineTransform(currentEventIndex) { result, currentEventIndex ->
       emit(ScheduleScrollEvent(currentEventIndex))
    }.shareIn(viewModelScope, WhileViewSubscribed, replay = 0) 
複製代碼

針對事件發送有些時候也可使用Channel,這個看業務場景:Channel會每一個事件都傳遞給單個訂閱者。一旦Channel緩衝區滿了,會嘗試在沒有訂閱者的狀況下暫停事件發佈,等待訂閱者出現。默認狀況下永遠不會刪除已發佈的事件。(不過經過設置也能夠無緩存或者僅緩存一個)

使用示例:

// SIDE EFFECTS: Navigation actions
private val _navigationActions = Channel<NavigationAction>(capacity = Channel.CONFLATED)
val navigationActions = _navigationActions.receiveAsFlow()
複製代碼

debounce

搜索監聽輸入框,輸入時執行搜索,這裏要進行debounce,避免發出過多的sug請求:

val query=MutableStateFlow<String?>(null)

fun onTextChanged(text:String){
    query.value=text
}

launch{
    query.debounce(100).collect{text->
        text?.let{
             doSearch(text)
        }
    }
}
複製代碼

多路複用

同時請求緩存和網絡,網絡先到則更新緩存,並取消協程,緩存先到則數據發送到UI後繼續執行,直到網絡數據返回。

listOf(
        async { dataSource.getCacheData() },
        async { dataSource.getRemoteData() })
.map { deferred ->
    flow { emit(deferred.await()) }
}.merge().onEach { result ->
     //網絡數據
    if (result.requestType == RequestType.NETWORK) {
        if (isActive) {
            _source.postValue(result)
        }
        if (result is Result.Success) {
            result.data?.let { newData ->
                 //更新緩存
                dataSource.flushCache(newData)
            }           
           cancel()
        }
    } else {
    //緩存數據
        if (result is Result.Success) {
            if (isActive) {
                _source.postValue(result)
            }
        }
    }
}.onCompletion {
    isPreLoading.set(false)
}.launchIn(this)
複製代碼

組合多個流

  • Zip

每次各取一個,一旦其中一個流完成,結果流就完成,並在剩餘流上調用cancel。

val flow = flowOf("4K顯示器", "2K顯示器", "1080P顯示器")
val flow2 = flowOf("小明", "小陳", "小紅", "小十一郎")
flow.zip(flow2) { i, s -> i + " 發給了 "+s }.collect {
    println(it)
}

4K顯示器 發給了 小明
2K顯示器 發給了 小陳
1080P顯示器 發給了 小紅
複製代碼
  • Combine

經過組合每一個流的最近發射的值,使用轉換函數生成其值:

val flow = flowOf("Tom", "Jack", "Lucifer")
val flow2 = flowOf("小明", "小陳", "小紅", "小十一郎")
flow.combine(flow2) { i, s -> i + " 和 " + s + "握了手" }.collect {
    println(it)
}

Tom 和 小明握了手
Jack 和 小明握了手
Jack 和 小陳握了手
Lucifer 和 小陳握了手
Lucifer 和 小紅握了手
Lucifer 和 小十一郎握了手
複製代碼

若是咱們對 第一個flow的發射加一個延遲:

val flow = flowOf("Tom", "Jack", "Lucifer").onEach { delay(10) }
val flow2 = flowOf("小明", "小陳", "小紅", "小十一郎")
flow.combine(flow2) { i, s -> i + " 和 " + s + "握了手" }.collect {
    println(it)
}

Tom 和 小十一郎握了手
Jack 和 小十一郎握了手
Lucifer 和 小十一郎握了手
複製代碼

因爲第一個流加了延遲,當數據發射時,第二個流已經發送完畢了,那麼對於第二個流來講,最新值就是「小十一郎」。因此結果就成了上面那樣。

Flow操做符雖然比RxJava少些,但知足大部分場景,其餘操做符剩餘的你們自行研究吧~ 更多操做符請查閱: Kotlin Flow

我用SharedFlow寫了個Eventbus

SharedFlow支持replay,支持多觀察者,再加上協程特性,結合Lifecycle,一個FlowEventBus不就出來了?!

【Kotlin】就幾行代碼?! 用SharedFlow寫個FlowEventBus

參考

Bennyhuo 的博客

協程 Flow 最佳實踐 | 基於 Android 開發者峯會應用

kotlin中文語言站 -Flow

StateFlow和SharedFlow

哇 你都看到這了,點個贊再走唄。~

相關文章
相關標籤/搜索