「本文已參與好文召集令活動,點擊查看:後端、大前端雙賽道投稿,2萬元獎池等你挑戰!」前端
異步數據流中的生產者可能會生產過多的數據,而消費者並不須要那麼多,因此限流就有用武之地了。App 開發中有一些常見的限流場景,好比搜索框防抖、點擊事件防抖、防過分刷新。這一篇就以這三個場景爲線索探究一下如何實現及背後的原理android
閱讀本篇須要瞭解 Flow 的基礎知識。關於這些知識的詳細介紹能夠點擊Kotlin 異步 | Flow 應用場景及原理,現援引以下:後端
suspend
方法用於表達一個異步過程,而Flow
用於表達多連續個異步過程。Flow
是冷流,冷流不會發射數據,直到它被收集的那一刻,因此冷流是「聲明式的」。Flow
被收集的瞬間,數據開始生產並被髮射出去,經過流收集器FlowCollector
將其傳遞給消費者。流和流收集器是成對出現的概念。流是一組按序產生的數據,數據的產生表現爲經過流收集器發射數據,在這裏流收集器像是流數據容器(雖然它不持有任何一條數據),它定義瞭如何將數據傳遞給消費者。flowOn()
改變上游流執行的線程,這並不影響下游流所執行的線程。Flow
中生產和消費數據的操做都被包裝在用 suspend 修飾的 lambda 中,用協程就能夠輕鬆的實現異步生產,異步消費。「在搜索框中輸入內容,而後點擊搜索按鈕,通過一段等待,搜索結果以列表形式展示」。好久之前的 app 是這樣進行搜索的。markdown
如今搜索體驗就要好不少了,不須要手動點擊搜索按鈕,輸入內容後,搜索是自動觸發。網絡
爲了實現這效果就得監聽輸入框內容的變化:app
// 構建監聽器
val textWatcher = object : android.text.TextWatcher {
override fun afterTextChanged(s: Editable?) {}
override fun beforeTextChanged(text: CharSequence?,start: Int,count: Int,after: Int) {}
override fun onTextChanged(text: CharSequence?, start: Int, before: Int, count: Int) {
search(text.toString())
}
}
// 設置輸入框內容監聽器
editText.addTextChangedListener(textWatcher)
// 訪問網絡進行搜索
fun search(key: String) {}
複製代碼
這樣實現有一個缺點,會進行屢次無效的網絡訪問。好比搜索「kotlin flow」時,onTextChanged()
會被回調 10 次,就觸發了 10 次網絡請求,而只有最後一次纔是有效的。異步
優化方案也很容易想到,只有在用戶中止輸入時才進行請求。但並無這樣的回調通知業務層用戶已經中止輸入。。。ide
那就只能設置一個超時,即用戶多久未輸入內容後就斷定已中止輸入。oop
但實現起來還挺複雜的:得在每次輸入框內容變化後啓動超時倒計時,若倒計時歸零時輸入框內容沒有發生新變化,則用輸入框當前內容發起請求,不然將倒計時重置,從新開始倒計時。post
在需求迭代中,會有時間去實現這麼一個複雜的小功能?
還好 Kotlin 的 Flow 替咱們封裝了這個功能。
用流的思想從新理解上面的場景:輸入框是流數據的生產者,其內容每變化一次,就是在流上生產了一個新數據。但並非每個數據都須要被消費,因此得作「限流」,即丟棄一切發射間隔太短的數據,直到生產出某個數據以後一段時間內再也不有新數據。
Kotlin 預約義了一些限流方法,debounce()
就很是契合當前場景。爲了使用debounce()
,得先把回調轉換成流:
// 構建輸入框文字變化流
fun EditText.textChangeFlow(): Flow<Editable> = callbackFlow {
// 構建輸入框監聽器
val watcher = object : TextWatcher {
override fun afterTextChanged(s: Editable?) {}
override fun beforeTextChanged( s: CharSequence?, start: Int, count: Int, after: Int ) { }
// 在文本變化後向流發射數據
override fun onTextChanged( s: CharSequence?, start: Int, before: Int, count: Int ) {
s?.let { offer(it) }
}
}
addTextChangedListener(watcher) // 設置輸入框監聽器
awaitClose { removeTextChangedListener(watcher) } // 阻塞以保證流一直運行
}
複製代碼
爲 EditText 擴展了一個方法,用於構建一個輸入框文字變化流。
其中callbackFlow {}
是系統預約義的頂層方法,它用於將回調組織成流。只須要在其內部構建回調實例並註冊之,而後在生產數據的回調方法中調用offer()
發射數據便可。當前場景中,將輸入框每次文字變化做爲流數據發射出去。
callbackFlow { lambda }
中最後一句awaitClose {}
是必不可少的,它阻塞了當前協程,保證流不會結束,即讓流一直存活處於等待數據狀態,不然 lambda 一執行完畢,流就會關閉。
而後就能夠像這樣使用:
editText.textChangeFlow() // 構建輸入框文字變化流
.filter { it.isNotEmpty() } // 過濾空內容,避免無效網絡請求
.debounce(300) // 300ms防抖
.flatMapLatest { searchFlow(it.toString()) } // 新搜索覆蓋舊搜索
.flowOn(Dispatchers.IO) // 讓搜索在異步線程中執行
.onEach { updateUi(it) } // 獲取搜索結果並更新界面
.launchIn(mainScope) // 在主線程收集搜索結果
// 更新界面
fun updateUi(it: List<String>) {}
// 訪問網絡進行搜索
suspend fun search(key: String): List<String> {}
// 將搜索關鍵詞轉換成搜索結果流
fun searchFlow(key: String) = flow { emit(search(key)) }
複製代碼
其中filter()
是流的中間消費者:
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform { value ->
if (predicate(value)) return@transform emit(value)
}
複製代碼
filter() 利用transform()
構建了一個下游流,它會收集上游數據,而且經過predicate
過濾之,只有知足條件的數據纔會被髮射。關於transform()
的詳細解釋能夠點擊Kotlin 進階 | 異步數據流 Flow 的使用場景。
其中的flatMapLatest()
也是中間消費者,flatMap 的意思是將上游流中的一個數據轉換成一個新的流,當前場景下便是將 key 經過網絡請求轉換成搜索結果Flow<List<String>>
。lateest 的意思是若是一個新的搜索請求到來時,上一個請求還未返回,則取消之,即老是展現最新輸入內容的搜索結果。
flatMapLatest()
源碼以下:
public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> =
transformLatest { emitAll(transform(it)) }
public fun <T, R> Flow<T>.transformLatest(@BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R> =
ChannelFlowTransformLatest(transform, this)
internal class ChannelFlowTransformLatest<T, R>(
private val transform: suspend FlowCollector<R>.(value: T) -> Unit,
flow: Flow<T>,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.BUFFERED,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
) : ChannelFlowOperator<T, R>(flow, context, capacity, onBufferOverflow) {
override fun create(context: CoroutineContext, capacity: Int, onBufferOverflow: BufferOverflow): ChannelFlow<R> =
ChannelFlowTransformLatest(transform, flow, context, capacity, onBufferOverflow)
override suspend fun flowCollect(collector: FlowCollector<R>) {
assert { collector is SendingCollector }
flowScope {
var previousFlow: Job? = null
// 收集上游數據
flow.collect { value ->
// 1. 若新數據到來,則取消上一次
previousFlow?.apply {
cancel(ChildCancelledException())
join()
}
// 2. 啓動協程處理當前數據
previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
collector.transform(value)
}
}
}
}
}
複製代碼
在收集數據時,每次都會啓動新協程執行數據變換操做,並記錄協程的 Job,待下一個數據到來時,取消上一次的 Job。
demo 場景中的launchIn()
是一個終端消費者:
// 啓動協程並在其中收集數據
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
collect()
}
// 用空收集器收集數據
public suspend fun Flow<*>.collect(): Unit = collect(NopCollector)
// 空收集器是一個不會再向下游發射數據的 FlowCollector
internal object NopCollector : FlowCollector<Any?> {
override suspend fun emit(value: Any?) {
// does nothing
}
}
複製代碼
使用 launchIn() 將啓動協程收集數據這一細節隱藏在了內部,因此就可使外部代碼保持簡潔的鏈式調用。下面這兩段代碼是等價的:
mainScope.launch {
editText.textChangeFlow()
.filter { it.isNotEmpty() }
.debounce(300)
.flatMapLatest { searchFlow(it.toString()) }
.flowOn(Dispatchers.IO)
.collect { updateUi(it) }
}
editText.textChangeFlow()
.filter { it.isNotEmpty() }
.debounce(300)
.flatMapLatest { searchFlow(it.toString()) }
.flowOn(Dispatchers.IO)
.onEach { updateUi(it) }
.launchIn(mainScope)
複製代碼
但因爲 launchIn() 不會再向下游發射數據,因此它通常配合onEach {}
一塊兒使用來完成消費數據。
app 中點擊事件響應邏輯通常是彈出界面或是網絡請求。
若是用飛快的速度連續點擊兩次,就會彈出兩個界面或是請求了兩次網絡。
爲了不這種狀況的方法,須要作點擊事件防抖,即在必定時間間隔內只響應第一次點擊事件。能夠這樣實現:
val FAST_CLICK_THRSHOLD = 300
fun View.onDebounceClickListener( block: (T) -> Unit ) {
// 若是不是快速點擊,則響應點擊邏輯
setOnClickListener { if (!it.isFastClick) block() }
}
// 判斷是否快速點擊
fun View.isFastClick(): Boolean {
val currentTime = SystemClock.elapsedRealtime()
if (currentTime >= FAST_CLICK_THRSHOLD) {
this.triggerTime = currentTime
return false
} else {
return true
}
}
// 記錄上次點擊時間
private var View.triggerTime: Long
get() = getTag(R.id.click_trigger) as? Long ?: 0L
set(value) = setTag(R.id.click_trigger, value)
複製代碼
作了 3 個擴展,完成了點擊事件防抖。將每次有效點擊的時間保存在 View 的 tag 中,每次點擊時都判斷當前時間和上次時間差,若是超過閾值則容許點擊。
用流的思想從新樣理解這個場景:每一個點擊事件都是流上的新數據。要對流作限流,即發射第一個數據,而後拋棄時間窗口中緊跟其後的全部數據,直到新的時間窗口到來。
很遺憾,Kotlin 未提供系統級實現,但自定義一個也很簡單:
fun <T> Flow<T>.throttleFirst(thresholdMillis: Long): Flow<T> = flow {
var lastTime = 0L // 上次發射數據的時間
// 收集數據
collect { upstream ->
// 當前時間
val currentTime = System.currentTimeMillis()
// 時間差超過閾值則發送數據並記錄時間
if (currentTime - lastTime > thresholdMillis) {
lastTime = currentTime
emit(upstream)
}
}
複製代碼
throttleFirst() 使用flow {}
構建了一個下游流而且收集了上游數據,只有當兩次數據時間差超過閾值時,才發射數據。
而後將點擊事件組織成流:
fun View.clickFlow() = callbackFlow {
setOnClickListener { offer(Unit) }
awaitClose { setOnClickListener(null) }
}
複製代碼
就能夠像這樣使用:
view.clickFlow()
.throttleFirst(300)
.onEach { // 點擊事件響應 }
.launchIn(mainScope)
複製代碼
想象這樣一個場景:百萬級別的直播間,有一個展現最近加入觀衆的列表。每一個新觀衆加入,都經過回調 onUserIn(uid: String) 通知,需經過 uid 請求網絡拉取用戶信息並更新在觀衆列表中。
對於百萬級別的直播間,每一秒可能有成百上千的觀衆加入,若不作限制,每秒幾百上千次的網絡訪問就很離譜。
產品端給出的限流方案:每一秒鐘刷新一次列表,且只展現這一秒內最後加入直播間的那我的。
用流從新理解這個場景:onUserIn() 回調是流數據的生產者。要作限流,即在每一個固定時間間隔內,只發射最後的 1 個數據,並丟棄其他的數據。
kotlin 提供了系統級別的實現sample()
:
// 將回調轉換成流
fun userInFlow() = callbackFlow {
val callback = object : UserCallback() {
override fun onUserIn(uid: String) { offer(uid) }
}
setCallback(callback)
awaitClose { setCallback(null) }
}
// 觀衆列表限流
userInFlow()
.sample(1000)
.onEach { fetchUser(it) }
.flowOn(Dispatchers.IO)
.onEach { updateAudienceList() }
.launchIn(mainScope)
複製代碼
(這個實現犯了一個和上篇倒計時 Flow 一樣的錯誤,看出來了嗎?後續篇章會詳細分析)
用異步數據流的思想理解下面這些場景,使得問題求解變得簡單:
能夠從兩個維度區別上述限流方案:
限流方案 | 固定間隔 | 重啓倒計時 |
---|---|---|
搜索框防抖 | false | true |
點擊事件防抖 | false | false |
防過分刷新 | true | false |
只要輸入連續不中止,則永遠也不會發送數據。因此輸入框防抖發射數據是沒有固定時間間隔的。搜索框防抖會重啓倒計時,並且是每個新數據的到來都會觸發從新倒計時。
只要不發生點擊事件,數據就不會發射。因此點擊事件防抖發射數據是沒有固定時間間隔的。點擊事件防抖中,第一個數據產生時,倒計時開始,它並不會由於後續事件的到來而從新倒計時,在倒計時內除第一個數據外的其餘數據都被拋棄。
無論有沒有新數據,每一個固定的時間間隔內都會發射一個新數據,防過分刷新時有固定時間間隔的。