你會怎麼實現下面這個場景?應用首頁有三個優先級從高到低的彈窗,展現內容依賴網絡請求,若低優先級彈窗請求先返回則需等待,讓高優先級先展現。git
串行請求是最容易想到的解決方案,即先請求最高優先級的彈窗,當它返回展現後再請求第二優先級彈窗。但這樣會拉長全部彈窗的展現時間。github
性能更好的方案是同時並行發出三個請求,但網絡請求時長的不肯定性使得最高優先級的彈窗不必定優先返回,因此得設計一種優先級阻塞機制。本文使用 協程 + 鏈式隊列 實現這個機制。web
把單個異步任務進行抽象:網絡
// 單個異步任務
class Item { companion object { // 默認異步優先級 const val PRIORITY_DEFAULT = 0 } // 異步操做:耗時的異步操做 var suspendAction: (suspend () -> Any?)? = null set(value) { field = value value?.let { // 啓動協程執行異步操做 GlobalScope.launch { deferred = async { it.invoke() } } } } // 異步響應:異步操做結束後要作的事情 var resumeAction: ((Any?) -> Unit)? = null // 異步結果:異步操做返回值 var deferred: Deferred<*>? = null // 異步優先級 var priority: Int = PRIORITY_DEFAULT } 複製代碼
異步任務有三個主要的屬性,分別是異步操做suspendAction
、異步響應resumeAction
、異步結果deferred
。每當異步操做被賦值時,就啓動協程執行它,並將其返回值保存在deferred
中。app
爲了確保異步任務的優先級,把多個異步任務用鏈的方式串起來,組成異步任務鏈:dom
class Item {
companion object { const val PRIORITY_DEFAULT = 0 } var suspendAction: (suspend () -> Any?)? = null set(value) { field = value value?.let { GlobalScope.launch { deferred = async { it.invoke() } } } } var resumeAction: ((Any?) -> Unit)? = null var deferred: Deferred<*>? = null var priority: Int = PRIORITY_DEFAULT // 異步任務前結點(Item 包含 Item) internal var next: Item? = null // 異步任務後結點(Item 包含 Item) internal var pre: Item? = null // 在當前結點後插入新結點 internal fun addNext(item: Item) { next?.let { it.pre = item item.next = it item.pre = this this.next = item } ?: also { // 尾結點插入 this.next = item item.pre = this item.next = null } } // 在當前結點前插入新結點 internal fun addPre(item: Item) { pre?.let { it.next = item item.pre = it item.next = this this.pre = item } ?: also { // 頭結點插入 item.next = this item.pre = null this.pre = item } } } 複製代碼
用 本身包含本身 的方式就能實現鏈式結構。Android 消息列表也用一樣的結構:異步
public final class Message implements Parcelable {
Message next; ... } 複製代碼
鏈必須有個頭結點,存放頭結點的類就是存放整個鏈的類,就好像消息列表MessageQueue
同樣:async
public final class MessageQueue {
Message mMessages; } 複製代碼
模仿消息列表,寫一個異步任務鏈:編輯器
// 異步任務鏈
class SuspendList { // 頭結點 private var head: Item = emptyItem() // 向異步任務鏈插入結點 fun add(item: Item) { // 從頭結點向後查找插入位置,找到再後插入 head.findItem(item.priority).addNext(item) } // 根據優先級向後查找插入位置(優先級升序) private fun Item.findItem(priority: Int): Item { // 當前結點 var p: Item? = this // 當前結點的後續結點 var next: Item? = p?.next // 從當前結點開始向後遍歷異步任務鏈 while (next != null) { // 若優先級介於當前結點和其後續結點之間,則表示找到插入位置 if (priority in p!!.priority until next.priority) { break } p = p.next next = p?.next } return p!! } // 觀察異步任務鏈並按優先級阻塞 fun observe() = GlobalScope.launch(Dispatchers.Main) { // 從頭結點向後遍歷異步任務鏈 var p: Item? = head.next while (p != null) { // 在每一個異步結果上阻塞,直到異步任務完成後執行異步響應 p.resumeAction?.invoke(p.deferred?.await()) p = p.next } } // 異步任務(已講解再也不贅述) class Item { ... } } // 空結點(頭結點) fun emptyItem(): SuspendList.Item = SuspendList.Item().apply { priority = -1 } 複製代碼
SuspendList
持有鏈的頭結點,爲了使「頭插入」和「中間插入」複用一套代碼,將頭結點設置爲「空結點」。函數
異步任務鏈上的任務按優先級升序排列(優先級數字越小優先級越高)。這保證了優先級最高的異步任務老是在鏈表頭。
優先級阻塞:當全部異步任務都被添加到鏈以後,調用observe()
觀察整個異步任務鏈。該方法啓動了一個協程,在協程中從頭結點向後遍歷鏈,並在每一個異步任務的Deferred
上阻塞。由於鏈表已按優先級排序,因此阻塞時也是按優先級從高到低進行的。
真實業務場景中,須要統一安排優先級的異步任務多是跨界面的。這就要求異步任務鏈能全局訪問,單例是一個最直接的選擇,但它限制了整個 App 中異步任務鏈的數量:
// 私有構造函數
class SuspendList private constructor() { companion object { // 靜態 map 存放全部異步任務鏈 var map = ArrayMap<String, SuspendList>() // 根據 key 構建異步任務鏈 fun of(key: String): SuspendList = map[key] ?: let { val p = SuspendList() map[key] = p p } } ... } 複製代碼
而後就能夠像這樣使用異步任務鏈:
// 構建異步任務鏈
SuspendList.of("dialog").apply { // 向鏈添加異步任務1 add(Item { suspendAction = { fetchUser() } resumeAction = { user: Any? -> onUserResume(user) } priority = 3 }) // 向鏈添加異步任務2 add(Item { suspendAction = { fetchActivity() } resumeAction = { activity: Any? -> onActivityResume(activity) } priority = 1 }) }.observe() suspend fun fetchUser(): String { delay(4000) return "User Taylor" } suspend fun fetchActivity(): String { delay(5000) return "Activity Bonus" } private fun onActivityResume(activity: Any?) { Log.v("test", "activity(${activity.toString()}) resume") } private fun onUserResume(user: Any?) { Log.v("test", "user(${user.toString()}) resume") } 複製代碼
上述代碼構建了一個名爲 dialog 的異步任務鏈,向其中添加了兩個異步任務,並按優先級觀察它們的結果。
其中Item()
是一個頂層方法,用於構建單個異步任務:
fun Item(init: SuspendList.Item.() -> Unit): SuspendList.Item = SuspendList.Item().apply(init)
複製代碼
這是構建對象 DSL 的標準寫法,詳細講解能夠參見這裏。
運用 DSL 的思路還能夠進一步將構建代碼簡化成這樣:
SuspendList.of("dialog") {
Item { suspendAction = { fetchUser() } resumeAction = { user: Any? -> onUserResume(user) } priority = 3 } Item { suspendAction = { fetchActivity() } resumeAction = { activity: Any? -> onActivityResume(activity) } priority = 1 } }.observe() 複製代碼
不過須要對原先的of()
和Item()
作一些調整:
// 新增接收者爲SuspendList的 lambda 參數,爲構建異步任務提供外層環境
fun of(key: String, init: SuspendList.() -> Unit): SuspendList = (map[key] ?: let { val p = SuspendList() map[key] = p p }).apply(init) // 將構建異步任務聲明爲SuspendList的擴展方法 // 構建異步任務後將其插入到異步任務鏈中 fun SuspendList.Item(init: SuspendList.Item.() -> Unit): SuspendList.Item = SuspendList.Item().apply(init).also { add(it) } 複製代碼
若某個高優先級的異步任務遲遲不能結束,其它任務只能都被阻塞?
得加個超時參數:
class Item {
// 爲異步操做賦值時,再也不馬上構建協程 var suspendAction: (suspend () -> Any?)? = null // 超時時長 var timeout: Long = -1 ... } 複製代碼
爲單個異步任務添加超時時長參數,還得重構一下異步任務的構建函數:
fun SuspendList.Item(init: SuspendList.Item.() -> Unit): SuspendList.Item =
SuspendList.Item().apply { // 爲異步任務設置各類參數 init() // 啓動協程 GlobalScope.launch { // 將異步任務結果包裝成 Deferred deferred = async { // 若須要超時機制 if (timeout > 0) { withTimeoutOrNull(timeout) { suspendAction?.invoke() } } // 不須要超時機制 else { suspendAction?.invoke() } } } }.also { add(it) } 複製代碼
本來在suspendAction
賦值時就立馬啓動協程,如今將其延後,等全部參數都設置完畢後才啓動。這樣能夠避免「先爲 suspendAction 賦值,再爲 timeout 賦值」case 下超時無效的 bug。
使用withTimeoutOrNull()
實現超時機制,當超時發生時,業務會從resumeAction
中得到null
。
構建異步任務鏈時使用了GlobalScope.launch()
啓動協程,其建立的協程不符合structured-concurrency
。因此須要手動管理生命週期:
class SuspendList private constructor() {
class Item { // 爲異步任務新增 Job 屬性,指向其對應的協程 internal var job:Job? = null ... } // observer()返回類型爲 Job,業務層能夠在須要的時候取消它 fun observe() = GlobalScope.launch(Dispatchers.Main) { var p: Item? = head.next while (p != null) { p.resumeAction?.invoke(p.deferred?.await()) // 當異步任務響應被處理後,取消其協程以釋放資源 p.job?.cancel() p = p.next } } } fun SuspendList.Item(init: SuspendList.Item.() -> Unit): SuspendList.Item = SuspendList.Item().apply { init() // 將該異步任務的協程存儲在 job 中 job = GlobalScope.launch { deferred = async { if (timeout > 0) { withTimeoutOrNull(timeout) { suspendAction?.invoke() } } else { suspendAction?.invoke() } } } }.also { add(it) } 複製代碼
本篇的完整源碼能夠點擊這裏
這是該系列的第十二篇,系列文章目錄以下: