Kotlin 異步 | Flow 限流的應用場景及原理

「本文已參與好文召集令活動,點擊查看:後端、大前端雙賽道投稿,2萬元獎池等你挑戰!前端

異步數據流中的生產者可能會生產過多的數據,而消費者並不須要那麼多,因此限流就有用武之地了。App 開發中有一些常見的限流場景,好比搜索框防抖、點擊事件防抖、防過分刷新。這一篇就以這三個場景爲線索探究一下如何實現及背後的原理android

閱讀本篇須要瞭解 Flow 的基礎知識。關於這些知識的詳細介紹能夠點擊Kotlin 異步 | Flow 應用場景及原理,現援引以下:後端

  1. 異步數據流能夠理解爲一條時間軸上按序產生的數據,它可用於表達多個連續的異步過程
  2. 異步數據流也能夠用「生產者/消費者」模型來理解,生產者和消費者之間就好像有一條管道,生產者從管道的一頭插入數據,消費者從另外一頭取數據。由於管道的存在,數據是有序的,遵循先進先出的原則。
  3. Kotlin 中的suspend方法用於表達一個異步過程,而Flow用於表達多連續個異步過程。Flow是冷流,冷流不會發射數據,直到它被收集的那一刻,因此冷流是「聲明式的」。
  4. Flow被收集的瞬間,數據開始生產並被髮射出去,經過流收集器FlowCollector將其傳遞給消費者。流和流收集器是成對出現的概念。流是一組按序產生的數據,數據的產生表現爲經過流收集器發射數據,在這裏流收集器像是流數據容器(雖然它不持有任何一條數據),它定義瞭如何將數據傳遞給消費者。
  5. 異步數據流中,生產者和消費者之間能夠插入中間消費者。中間消費者創建了流上的攔截並轉發機制:新建下游流,它生產數據的方式是經過收集上游數據,並轉發到一個帶有發射數據能力的 lambda 中。擁有多箇中間消費者的流就像「套娃」同樣,下游流套在上游流外面。中間消費者經過這種方式攔截了原始數據,就能夠對其作任意變換再轉發給下游消費者。
  6. 全部能觸發收集數據動做的消費者稱爲終端消費者,它就像點燃鞭炮的星火,使得被若干個中間消費者套娃的流從外向內(從下游到上游)一個個的被收集,最終傳導到原始流,觸發數據的發射。
  7. 默認狀況下,流中生產和消費數據是在同一個線程中進行的。但能夠經過flowOn()改變上游流執行的線程,這並不影響下游流所執行的線程。
  8. Flow中生產和消費數據的操做都被包裝在用 suspend 修飾的 lambda 中,用協程就能夠輕鬆的實現異步生產,異步消費。

搜索框防抖

「在搜索框中輸入內容,而後點擊搜索按鈕,通過一段等待,搜索結果以列表形式展示」。好久之前的 app 是這樣進行搜索的。markdown

如今搜索體驗就要好不少了,不須要手動點擊搜索按鈕,輸入內容後,搜索是自動觸發。網絡

爲了實現這效果就得監聽輸入框內容的變化:app

// 構建監聽器
val textWatcher = object : android.text.TextWatcher {
    override fun afterTextChanged(s: Editable?) {}
    override fun beforeTextChanged(text: CharSequence?,start: Int,count: Int,after: Int) {}
    override fun onTextChanged(text: CharSequence?, start: Int, before: Int, count: Int) {
        search(text.toString())
    }
}
// 設置輸入框內容監聽器
editText.addTextChangedListener(textWatcher)
// 訪問網絡進行搜索
fun search(key: String) {}
複製代碼

這樣實現有一個缺點,會進行屢次無效的網絡訪問。好比搜索「kotlin flow」時,onTextChanged()會被回調 10 次,就觸發了 10 次網絡請求,而只有最後一次纔是有效的。異步

優化方案也很容易想到,只有在用戶中止輸入時才進行請求。但並無這樣的回調通知業務層用戶已經中止輸入。。。ide

那就只能設置一個超時,即用戶多久未輸入內容後就斷定已中止輸入。oop

但實現起來還挺複雜的:得在每次輸入框內容變化後啓動超時倒計時,若倒計時歸零時輸入框內容沒有發生新變化,則用輸入框當前內容發起請求,不然將倒計時重置,從新開始倒計時。post

在需求迭代中,會有時間去實現這麼一個複雜的小功能?

還好 Kotlin 的 Flow 替咱們封裝了這個功能。

用流的思想從新理解上面的場景:輸入框是流數據的生產者,其內容每變化一次,就是在流上生產了一個新數據。但並非每個數據都須要被消費,因此得作「限流」,即丟棄一切發射間隔太短的數據,直到生產出某個數據以後一段時間內再也不有新數據。

Kotlin 預約義了一些限流方法,debounce()就很是契合當前場景。爲了使用debounce(),得先把回調轉換成流:

// 構建輸入框文字變化流
fun EditText.textChangeFlow(): Flow<Editable> = callbackFlow {
    // 構建輸入框監聽器
    val watcher = object : TextWatcher {
        override fun afterTextChanged(s: Editable?) {} 
        override fun beforeTextChanged( s: CharSequence?, start: Int, count: Int, after: Int ) { }
        // 在文本變化後向流發射數據
        override fun onTextChanged( s: CharSequence?, start: Int, before: Int, count: Int ) { 
            s?.let { offer(it) }
        }
    }
    addTextChangedListener(watcher) // 設置輸入框監聽器
    awaitClose { removeTextChangedListener(watcher) } // 阻塞以保證流一直運行
}
複製代碼

爲 EditText 擴展了一個方法,用於構建一個輸入框文字變化流。

其中callbackFlow {}是系統預約義的頂層方法,它用於將回調組織成流。只須要在其內部構建回調實例並註冊之,而後在生產數據的回調方法中調用offer()發射數據便可。當前場景中,將輸入框每次文字變化做爲流數據發射出去。

callbackFlow { lambda }中最後一句awaitClose {}是必不可少的,它阻塞了當前協程,保證流不會結束,即讓流一直存活處於等待數據狀態,不然 lambda 一執行完畢,流就會關閉。

而後就能夠像這樣使用:

editText.textChangeFlow() // 構建輸入框文字變化流
    .filter { it.isNotEmpty() } // 過濾空內容,避免無效網絡請求
    .debounce(300) // 300ms防抖
    .flatMapLatest { searchFlow(it.toString()) } // 新搜索覆蓋舊搜索
    .flowOn(Dispatchers.IO) // 讓搜索在異步線程中執行
    .onEach { updateUi(it) } // 獲取搜索結果並更新界面
    .launchIn(mainScope) // 在主線程收集搜索結果

// 更新界面
fun updateUi(it: List<String>) {}
// 訪問網絡進行搜索
suspend fun search(key: String): List<String> {}
// 將搜索關鍵詞轉換成搜索結果流
fun searchFlow(key: String) = flow { emit(search(key)) }
複製代碼

其中filter()是流的中間消費者:

public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
    if (predicate(value)) return@transform emit(value)
}
複製代碼

filter() 利用transform()構建了一個下游流,它會收集上游數據,而且經過predicate過濾之,只有知足條件的數據纔會被髮射。關於transform()的詳細解釋能夠點擊Kotlin 進階 | 異步數據流 Flow 的使用場景

其中的flatMapLatest()也是中間消費者,flatMap 的意思是將上游流中的一個數據轉換成一個新的流,當前場景下便是將 key 經過網絡請求轉換成搜索結果Flow<List<String>>。lateest 的意思是若是一個新的搜索請求到來時,上一個請求還未返回,則取消之,即老是展現最新輸入內容的搜索結果。

flatMapLatest()源碼以下:

public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> =
    transformLatest { emitAll(transform(it)) }
    
public fun <T, R> Flow<T>.transformLatest(@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R> =
    ChannelFlowTransformLatest(transform, this)

internal class ChannelFlowTransformLatest<T, R>(
    private val transform: suspend FlowCollector<R>.(value: T) -> Unit,
    flow: Flow<T>,
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = Channel.BUFFERED,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, R>(flow, context, capacity, onBufferOverflow) {
    override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<R> =
        ChannelFlowTransformLatest(transform, flow, context, capacity, onBufferOverflow)

    override suspend fun flowCollect(collector: FlowCollector<R>) {
        assert { collector is SendingCollector }
        flowScope {
            var previousFlow: Job? = null
            // 收集上游數據
            flow.collect { value ->
                // 1. 若新數據到來,則取消上一次
                previousFlow?.apply {
                    cancel(ChildCancelledException())
                    join()
                }
                // 2. 啓動協程處理當前數據
                previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
                    collector.transform(value)
                }
            }
        }
    }
}
複製代碼

在收集數據時,每次都會啓動新協程執行數據變換操做,並記錄協程的 Job,待下一個數據到來時,取消上一次的 Job。

demo 場景中的launchIn()是一個終端消費者:

// 啓動協程並在其中收集數據
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
    collect() 
}
// 用空收集器收集數據
public suspend fun Flow<*>.collect(): Unit = collect(NopCollector)

// 空收集器是一個不會再向下游發射數據的 FlowCollector
internal object NopCollector : FlowCollector<Any?> {
    override suspend fun emit(value: Any?) {
        // does nothing
    }
}
複製代碼

使用 launchIn() 將啓動協程收集數據這一細節隱藏在了內部,因此就可使外部代碼保持簡潔的鏈式調用。下面這兩段代碼是等價的:

mainScope.launch {
    editText.textChangeFlow() 
        .filter { it.isNotEmpty() } 
        .debounce(300) 
        .flatMapLatest { searchFlow(it.toString()) } 
        .flowOn(Dispatchers.IO) 
        .collect { updateUi(it) }
}

editText.textChangeFlow() 
    .filter { it.isNotEmpty() } 
    .debounce(300) 
    .flatMapLatest { searchFlow(it.toString()) }
    .flowOn(Dispatchers.IO) 
    .onEach { updateUi(it) } 
    .launchIn(mainScope) 
複製代碼

但因爲 launchIn() 不會再向下游發射數據,因此它通常配合onEach {}一塊兒使用來完成消費數據。

點擊事件防抖

app 中點擊事件響應邏輯通常是彈出界面或是網絡請求。

若是用飛快的速度連續點擊兩次,就會彈出兩個界面或是請求了兩次網絡。

爲了不這種狀況的方法,須要作點擊事件防抖,即在必定時間間隔內只響應第一次點擊事件。能夠這樣實現:

val FAST_CLICK_THRSHOLD = 300

fun View.onDebounceClickListener( block: (T) -> Unit ) {
    // 若是不是快速點擊,則響應點擊邏輯
    setOnClickListener { if (!it.isFastClick) block() }
}

// 判斷是否快速點擊
fun View.isFastClick(): Boolean {
    val currentTime = SystemClock.elapsedRealtime()
    if (currentTime >= FAST_CLICK_THRSHOLD) {
        this.triggerTime = currentTime
        return false
    } else {
        return true
    }
}

// 記錄上次點擊時間
private var View.triggerTime: Long
    get() = getTag(R.id.click_trigger) as? Long ?: 0L
    set(value) = setTag(R.id.click_trigger, value)
複製代碼

作了 3 個擴展,完成了點擊事件防抖。將每次有效點擊的時間保存在 View 的 tag 中,每次點擊時都判斷當前時間和上次時間差,若是超過閾值則容許點擊。

用流的思想從新樣理解這個場景:每一個點擊事件都是流上的新數據。要對流作限流,即發射第一個數據,而後拋棄時間窗口中緊跟其後的全部數據,直到新的時間窗口到來。

很遺憾,Kotlin 未提供系統級實現,但自定義一個也很簡單:

fun <T> Flow<T>.throttleFirst(thresholdMillis: Long): Flow<T> = flow {
    var lastTime = 0L // 上次發射數據的時間
    // 收集數據
    collect { upstream ->
        // 當前時間
        val currentTime = System.currentTimeMillis()
        // 時間差超過閾值則發送數據並記錄時間
        if (currentTime - lastTime > thresholdMillis) {
            lastTime = currentTime
            emit(upstream)
        }
    }
複製代碼

throttleFirst() 使用flow {}構建了一個下游流而且收集了上游數據,只有當兩次數據時間差超過閾值時,才發射數據。

而後將點擊事件組織成流:

fun View.clickFlow() = callbackFlow {
    setOnClickListener { offer(Unit) }
    awaitClose { setOnClickListener(null) }
}
複製代碼

就能夠像這樣使用:

view.clickFlow()
    .throttleFirst(300)
    .onEach { // 點擊事件響應 }
    .launchIn(mainScope)
複製代碼

防過分刷新

想象這樣一個場景:百萬級別的直播間,有一個展現最近加入觀衆的列表。每一個新觀衆加入,都經過回調 onUserIn(uid: String) 通知,需經過 uid 請求網絡拉取用戶信息並更新在觀衆列表中。

對於百萬級別的直播間,每一秒可能有成百上千的觀衆加入,若不作限制,每秒幾百上千次的網絡訪問就很離譜。

產品端給出的限流方案:每一秒鐘刷新一次列表,且只展現這一秒內最後加入直播間的那我的。

用流從新理解這個場景:onUserIn() 回調是流數據的生產者。要作限流,即在每一個固定時間間隔內,只發射最後的 1 個數據,並丟棄其他的數據。

kotlin 提供了系統級別的實現sample()

// 將回調轉換成流
fun userInFlow() = callbackFlow {
    val callback = object : UserCallback() {
        override fun onUserIn(uid: String) { offer(uid) }
    }
    setCallback(callback)
    awaitClose { setCallback(null) }
}

// 觀衆列表限流
userInFlow()
    .sample(1000)
    .onEach { fetchUser(it) }
    .flowOn(Dispatchers.IO)
    .onEach { updateAudienceList() }
    .launchIn(mainScope)
複製代碼

(這個實現犯了一個和上篇倒計時 Flow 一樣的錯誤,看出來了嗎?後續篇章會詳細分析)

總結

用異步數據流的思想理解下面這些場景,使得問題求解變得簡單:

  1. 搜索框防抖:丟棄一切發射間隔太短的數據,直到生產出某個數據以後一段時間內再也不有新數據。
  2. 點擊事件防抖:發射第一個數據,而後拋棄時間窗口中緊跟其後的全部數據,直到新的時間窗口到來。
  3. 在每一個固定時間間隔內,只發射最後的 n 個數據,並丟棄其他的數據。

能夠從兩個維度區別上述限流方案:

  1. 發射數據是否有固定時間間隔。
  2. 新的數據是否會致使重啓倒計時。
限流方案 固定間隔 重啓倒計時
搜索框防抖 false true
點擊事件防抖 false false
防過分刷新 true false
  • 只要輸入連續不中止,則永遠也不會發送數據。因此輸入框防抖發射數據是沒有固定時間間隔的。搜索框防抖會重啓倒計時,並且是每個新數據的到來都會觸發從新倒計時。

  • 只要不發生點擊事件,數據就不會發射。因此點擊事件防抖發射數據是沒有固定時間間隔的。點擊事件防抖中,第一個數據產生時,倒計時開始,它並不會由於後續事件的到來而從新倒計時,在倒計時內除第一個數據外的其餘數據都被拋棄。

  • 無論有沒有新數據,每一個固定的時間間隔內都會發射一個新數據,防過分刷新時有固定時間間隔的。

推薦閱讀

相關文章
相關標籤/搜索