Kotlin 應用 | 用協程控制多個並行異步結果的優先級

你會怎麼實現下面這個場景?應用首頁有三個優先級從高到低的彈窗,展現內容依賴網絡請求,若低優先級彈窗請求先返回則需等待,讓高優先級先展現。git

串行請求是最容易想到的解決方案,即先請求最高優先級的彈窗,當它返回展現後再請求第二優先級彈窗。但這樣會拉長全部彈窗的展現時間。github

性能更好的方案是同時並行發出三個請求,但網絡請求時長的不肯定性使得最高優先級的彈窗不必定優先返回,因此得設計一種優先級阻塞機制。本文使用 協程 + 鏈式隊列 實現這個機制。web

異步任務鏈

把單個異步任務進行抽象:網絡

// 單個異步任務
class Item {  companion object {  // 默認異步優先級  const val PRIORITY_DEFAULT = 0  }   // 異步操做:耗時的異步操做  var suspendAction: (suspend () -> Any?)? = null  set(value) {  field = value  value?.let {  // 啓動協程執行異步操做  GlobalScope.launch { deferred = async { it.invoke() } }  }  }  // 異步響應:異步操做結束後要作的事情  var resumeAction: ((Any?) -> Unit)? = null  // 異步結果:異步操做返回值  var deferred: Deferred<*>? = null  // 異步優先級  var priority: Int = PRIORITY_DEFAULT } 複製代碼

異步任務有三個主要的屬性,分別是異步操做suspendAction、異步響應resumeAction、異步結果deferred。每當異步操做被賦值時,就啓動協程執行它,並將其返回值保存在deferred中。app

爲了確保異步任務的優先級,把多個異步任務用鏈的方式串起來,組成異步任務鏈dom

class Item {
 companion object {  const val PRIORITY_DEFAULT = 0  }   var suspendAction: (suspend () -> Any?)? = null  set(value) {  field = value  value?.let {  GlobalScope.launch { deferred = async { it.invoke() } }  }  }   var resumeAction: ((Any?) -> Unit)? = null  var deferred: Deferred<*>? = null  var priority: Int = PRIORITY_DEFAULT   // 異步任務前結點(Item 包含 Item)  internal var next: Item? = null  // 異步任務後結點(Item 包含 Item)  internal var pre: Item? = null   // 在當前結點後插入新結點  internal fun addNext(item: Item) {  next?.let {  it.pre = item  item.next = it  item.pre = this  this.next = item  } ?: also { // 尾結點插入  this.next = item  item.pre = this  item.next = null  }  }   // 在當前結點前插入新結點  internal fun addPre(item: Item) {  pre?.let {  it.next = item  item.pre = it  item.next = this  this.pre = item  } ?: also { // 頭結點插入  item.next = this  item.pre = null  this.pre = item  }  } } 複製代碼

本身包含本身 的方式就能實現鏈式結構。Android 消息列表也用一樣的結構:異步

public final class Message implements Parcelable {
 Message next;  ... } 複製代碼

鏈必須有個頭結點,存放頭結點的類就是存放整個鏈的類,就好像消息列表MessageQueue同樣:async

public final class MessageQueue {
 Message mMessages; } 複製代碼

模仿消息列表,寫一個異步任務鏈編輯器

// 異步任務鏈
class SuspendList {  // 頭結點  private var head: Item = emptyItem()   // 向異步任務鏈插入結點  fun add(item: Item) {  // 從頭結點向後查找插入位置,找到再後插入  head.findItem(item.priority).addNext(item)  }   // 根據優先級向後查找插入位置(優先級升序)  private fun Item.findItem(priority: Int): Item {  // 當前結點  var p: Item? = this  // 當前結點的後續結點  var next: Item? = p?.next  // 從當前結點開始向後遍歷異步任務鏈  while (next != null) {  // 若優先級介於當前結點和其後續結點之間,則表示找到插入位置  if (priority in p!!.priority until next.priority) {  break  }  p = p.next  next = p?.next  }  return p!!  }   // 觀察異步任務鏈並按優先級阻塞  fun observe() = GlobalScope.launch(Dispatchers.Main) {  // 從頭結點向後遍歷異步任務鏈  var p: Item? = head.next  while (p != null) {  // 在每一個異步結果上阻塞,直到異步任務完成後執行異步響應  p.resumeAction?.invoke(p.deferred?.await())  p = p.next  }  }   // 異步任務(已講解再也不贅述)  class Item { ... } }  // 空結點(頭結點) fun emptyItem(): SuspendList.Item = SuspendList.Item().apply { priority = -1 } 複製代碼

SuspendList持有鏈的頭結點,爲了使「頭插入」和「中間插入」複用一套代碼,將頭結點設置爲「空結點」。函數

異步任務鏈上的任務按優先級升序排列(優先級數字越小優先級越高)。這保證了優先級最高的異步任務老是在鏈表頭。

優先級阻塞:當全部異步任務都被添加到鏈以後,調用observe()觀察整個異步任務鏈。該方法啓動了一個協程,在協程中從頭結點向後遍歷鏈,並在每一個異步任務的Deferred上阻塞。由於鏈表已按優先級排序,因此阻塞時也是按優先級從高到低進行的。

全局做用域

真實業務場景中,須要統一安排優先級的異步任務多是跨界面的。這就要求異步任務鏈能全局訪問,單例是一個最直接的選擇,但它限制了整個 App 中異步任務鏈的數量:

// 私有構造函數
class SuspendList private constructor() {  companion object {  // 靜態 map 存放全部異步任務鏈  var map = ArrayMap<String, SuspendList>()  // 根據 key 構建異步任務鏈  fun of(key: String): SuspendList = map[key] ?: let {  val p = SuspendList()  map[key] = p  p  }  }  ... } 複製代碼

而後就能夠像這樣使用異步任務鏈:

// 構建異步任務鏈
SuspendList.of("dialog").apply {  // 向鏈添加異步任務1  add(Item {  suspendAction = { fetchUser() }  resumeAction = { user: Any? -> onUserResume(user) }  priority = 3  })  // 向鏈添加異步任務2  add(Item {  suspendAction = { fetchActivity() }  resumeAction = { activity: Any? -> onActivityResume(activity) }  priority = 1  }) }.observe()  suspend fun fetchUser(): String {  delay(4000)  return "User Taylor" }  suspend fun fetchActivity(): String {  delay(5000)  return "Activity Bonus" }  private fun onActivityResume(activity: Any?) {  Log.v("test", "activity(${activity.toString()}) resume") }  private fun onUserResume(user: Any?) {  Log.v("test", "user(${user.toString()}) resume") } 複製代碼

上述代碼構建了一個名爲 dialog 的異步任務鏈,向其中添加了兩個異步任務,並按優先級觀察它們的結果。

其中Item()是一個頂層方法,用於構建單個異步任務:

fun Item(init: SuspendList.Item.() -> Unit): SuspendList.Item = SuspendList.Item().apply(init)
複製代碼

這是構建對象 DSL 的標準寫法,詳細講解能夠參見這裏

運用 DSL 的思路還能夠進一步將構建代碼簡化成這樣:

SuspendList.of("dialog") {
 Item {  suspendAction = { fetchUser() }  resumeAction = { user: Any? -> onUserResume(user) }  priority = 3  }  Item {  suspendAction = { fetchActivity() }  resumeAction = { activity: Any? -> onActivityResume(activity) }  priority = 1  } }.observe() 複製代碼

不過須要對原先的of()Item()作一些調整:

// 新增接收者爲SuspendList的 lambda 參數,爲構建異步任務提供外層環境
fun of(key: String, init: SuspendList.() -> Unit): SuspendList = (map[key] ?: let {  val p = SuspendList()  map[key] = p  p }).apply(init)  // 將構建異步任務聲明爲SuspendList的擴展方法 // 構建異步任務後將其插入到異步任務鏈中 fun SuspendList.Item(init: SuspendList.Item.() -> Unit): SuspendList.Item =  SuspendList.Item().apply(init).also { add(it) } 複製代碼

異步超時

若某個高優先級的異步任務遲遲不能結束,其它任務只能都被阻塞?

得加個超時參數:

class Item {
 // 爲異步操做賦值時,再也不馬上構建協程  var suspendAction: (suspend () -> Any?)? = null  // 超時時長  var timeout: Long = -1  ... } 複製代碼

爲單個異步任務添加超時時長參數,還得重構一下異步任務的構建函數:

fun SuspendList.Item(init: SuspendList.Item.() -> Unit): SuspendList.Item =
 SuspendList.Item().apply {  // 爲異步任務設置各類參數  init()  // 啓動協程  GlobalScope.launch {  // 將異步任務結果包裝成 Deferred  deferred = async {  // 若須要超時機制  if (timeout > 0) {  withTimeoutOrNull(timeout) { suspendAction?.invoke() }  }  // 不須要超時機制  else {  suspendAction?.invoke()  }  }  }  }.also { add(it) } 複製代碼

本來在suspendAction賦值時就立馬啓動協程,如今將其延後,等全部參數都設置完畢後才啓動。這樣能夠避免「先爲 suspendAction 賦值,再爲 timeout 賦值」case 下超時無效的 bug。

使用withTimeoutOrNull()實現超時機制,當超時發生時,業務會從resumeAction中得到null

異步任務生命週期管理

構建異步任務鏈時使用了GlobalScope.launch()啓動協程,其建立的協程不符合structured-concurrency。因此須要手動管理生命週期:

class SuspendList private constructor() {
 class Item {  // 爲異步任務新增 Job 屬性,指向其對應的協程  internal var job:Job? = null  ...  }   // observer()返回類型爲 Job,業務層能夠在須要的時候取消它  fun observe() = GlobalScope.launch(Dispatchers.Main) {  var p: Item? = head.next  while (p != null) {  p.resumeAction?.invoke(p.deferred?.await())  // 當異步任務響應被處理後,取消其協程以釋放資源  p.job?.cancel()  p = p.next  }  } }  fun SuspendList.Item(init: SuspendList.Item.() -> Unit): SuspendList.Item =  SuspendList.Item().apply {  init()  // 將該異步任務的協程存儲在 job 中  job = GlobalScope.launch {  deferred = async {  if (timeout > 0) {  withTimeoutOrNull(timeout) { suspendAction?.invoke() }  } else {  suspendAction?.invoke()  }  }  }  }.also { add(it) } 複製代碼

Talk is cheap, show me the code

本篇的完整源碼能夠點擊這裏

推薦閱讀

這是該系列的第十二篇,系列文章目錄以下:

  1. Kotlin基礎 | 白話文轉文言文般的Kotlin常識

  2. Kotlin基礎 | 望文生義的Kotlin集合操做

  3. Kotlin實戰 | 用實戰代碼更深刻地理解預約義擴展函數

  4. Kotlin實戰 | 使用DSL構建結構化API去掉冗餘的接口方法

  5. Kotlin基礎 | 抽象屬性的應用場景

  6. Kotlin進階 | 動畫代碼太醜,用DSL動畫庫拯救,像說話同樣寫代碼喲!

  7. Kotlin基礎 | 用約定簡化相親

  8. Kotlin基礎 | 2 = 12 ?泛型、類委託、重載運算符綜合應用

  9. Kotlin實戰 | 語法糖,總有一顆甜到你(持續更新)

  10. Kotlin 實戰 | 幹掉 findViewById 和 Activity 中的業務邏輯

  11. Kotlin基礎 | 爲何要這樣用協程?

  12. Kotlin 應用 | 用協程控制多個並行異步結果的優先級

相關文章
相關標籤/搜索