使用協程和 Flow 簡化 API 設計

若是您是庫做者,您也許但願用戶在使用 Kotlin 協程與 Flow 時能夠更加輕鬆地調用您基於 Java 或回調的 API。另外,若是您是 API 的使用者,則可能願意將第三方 API 界面適配協程,以使它們對 Kotlin 更友好。html

本文將會介紹如何使用協程和 Flow 簡化 API,以及如何使用 suspendCancellableCoroutine 和 callbackFlow API 建立您本身的適配器。針對那些富有好奇心的讀者,本文還會對這些 API 進行剖析,以幫您瞭解它們底層的工做原理。react

若是您更喜歡觀看視頻,能夠 點擊這裏android

檢查現有協程適配器

在您爲現有 API 編寫本身的封裝以前,請檢查是否已經存在針對您的用例的適配器或者 擴展方法。下面是一些包含常見類型協程適配器的庫。git

Future 類型github

對於 future 類型,Java 8 集成了 CompletableFuture,而 Guava 集成了 ListenableFuture。這裏提到的並非所有,您能夠在線搜索以肯定是否存在適用於您的 future 類型的適配器。編程

// 等待 CompletionStage 的執行完成而不阻塞線程
suspend fun <T> CompletionStage<T>.await(): T 
 
// 等待 ListenableFuture 的執行完成而不阻塞線程
suspend fun <T> ListenableFuture<T>.await(): T
複製代碼

使用這些函數,您能夠擺脫回調並掛起協程直到 future 的結果被返回。api

Reactive Streammarkdown

對於響應式流的庫,有針對 RxJavaJava 9 API響應式流庫 的集成:app

// 將給定的響應式 Publisher 轉換爲 Flow
fun <T : Any> Publisher<T>.asFlow(): Flow<T>
複製代碼

這些函數將響應式流轉換爲了 Flow。異步

Android 專用 API

對於 Jetpack 庫或 Android 平臺 API,您能夠參閱 Jetpack KTX 庫 列表。現有超過 20 個庫擁有 KTX 版本,構成了您所熟悉的 Java API。其中包括 SharedPreferences、ViewModels、SQLite 以及 Play Core。

回調

回調是實現異步通信時很是常見的作法。事實上,咱們在 後臺線程任務運行指南 中將回調做爲 Java 編程語言的默認解決方案。然而,回調也有許多缺點: 這一設計會致使使人費解的回調嵌套。同時,因爲沒有簡單的傳播方式,錯誤處理也更加複雜。在 Kotlin 中,您能夠簡單地使用協程調用回調,但前提是您必須建立您本身的適配器。

建立您本身的適配器

若是沒有找到適合您用例的適配器,更直接的作法是本身編寫適配器。對於一次性異步調用,可使用 suspendCancellableCoroutine API;而對於流數據,可使用 callbackFlow API。

做爲練習,下面的示例將會使用來自 Google Play Services 的 Fused Location Provider API 來獲取位置數據。此 API 界面十分簡單,可是它使用回調來執行異步操做。當邏輯變得複雜時,這些回調容易使代碼變得不可讀,而咱們可使用協程來擺脫它們。

若是您但願探索其它解決方案,能夠經過上面函數所連接的源代碼爲您帶來啓發。

一次性異步調用

Fused Location Provider API 提供了 getLastLocation 方法來得到 最後已知位置。對於協程來講,理想的 API 是一個直接返回確切結果的掛起函數。

注意: 這一 API 返回值爲 Task,而且已經有了對應的 適配器。出於學習的目的,咱們用它做爲範例。

咱們能夠經過爲 FusedLocationProviderClient 建立擴展函數來得到更好的 API:

suspend fun FusedLocationProviderClient.awaitLastLocation(): Location
複製代碼

因爲這是一個一次性異步操做,咱們使用 suspendCancellableCoroutine 函數: 一個用於從協程庫建立掛起函數的底層構建塊。

suspendCancellableCoroutine 會執行做爲參數傳入的代碼塊,而後在等待繼續信號期間掛起協程的執行。當協程 Continuation 對象中的 resumeresumeWithException 方法被調用時,協程會被恢復執行。有關 Continuation 的更多信息,請參閱: Kotlin Vocabulary | 揭祕協程中的 suspend 修飾符

咱們使用能夠添加到 getLastLocation 方法中的回調來在合適的時機恢復協程。參見下面的實現:

// FusedLocationProviderClient 的擴展函數,返回最後已知位置
suspend fun FusedLocationProviderClient.awaitLastLocation(): Location =

  // 建立新的可取消協程
  suspendCancellableCoroutine<Location> { continuation ->

    // 添加恢復協程執行的監聽器
    lastLocation.addOnSuccessListener { location ->
      // 恢復協程並返回位置
      continuation.resume(location)
    }.addOnFailureListener { e ->
      // 經過拋出異常來恢復協程
      continuation.resumeWithException(e)
    }

    // suspendCancellableCoroutine 塊的結尾。這裏會掛起協程
    //直到某個回調調用了 continuation 參數
  }
複製代碼

注意: 儘管協程庫中一樣包含了不可取消版本的協程構建器 (即 suspendCoroutine),但最好始終選擇使用 suspendCancellableCoroutine 處理協程做用域的取消及從底層 API 傳播取消事件。

suspendCancellableCoroutine 原理

在內部,suspendCancellableCoroutine 使用 suspendCoroutineUninterceptedOrReturn 在掛起函數的協程中得到 Continuation。這一 Continuation 對象會被一個 CancellableContinuation 對象攔截,後者會今後時開始控制協程的生命週期 (其 實現 具備 Job 的功能,可是有一些限制)。

接下來,傳遞給 suspendCancellableCoroutine 的 lambda 表達式會被執行。若是該 lambda 返回告終果,則協程將當即恢復;不然協程將會在 CancellableContinuation 被 lambda 手動恢復前保持掛起狀態。

您能夠經過我在下面代碼片斷 (原版實現) 中的註釋來了解發生了什麼:

public suspend inline fun <T> suspendCancellableCoroutine( crossinline block: (CancellableContinuation<T>) -> Unit ): T =
  // 獲取運行此掛起函數的協程的 Continuation 對象 
  suspendCoroutineUninterceptedOrReturn { uCont ->

    // 接管協程。Continuation 已經被攔截,
    // 接下來將會遵循 CancellableContinuationImpl 的生命週期
    val cancellable = CancellableContinuationImpl(uCont.intercepted(), ...)
    /* ... */
 
    // 使用可取消 Continuation 調用代碼塊
    block(cancellable)
        
    // 掛起協程而且等待 Continuation 在 「block」 中被恢復,或者在 「block」 結束執行時返回結果 
    cancellable.getResult()
  }
複製代碼

想了解更多有關掛起函數的工做原理,請參閱這篇: Kotlin Vocabulary | 揭祕協程中的 suspend 修飾符

流數據

若是咱們轉而但願用戶的設備在真實的環境中移動時,週期性地接收位置更新 (使用 requestLocationUpdates 函數),咱們就須要使用 Flow 來建立數據流。理想的 API 看起來應該像下面這樣:

fun FusedLocationProviderClient.locationFlow(): Flow<Location>
複製代碼

爲了將基於回調的 API 轉換爲 Flow,可使用 callbackFlow 流構建器來建立新的 flow。callbackFlow 的 lambda 表達式的內部處於一個協程的上下文中,這意味着它能夠調用掛起函數。不一樣於 flow 流構建器,channelFlow 能夠在不一樣的 CoroutineContext 或協程以外使用 offer 方法發送數據。

一般狀況下,使用 callbackFlow 構建流適配器遵循如下三個步驟:

  1. 建立使用 offer 向 flow 添加元素的回調;
  2. 註冊回調;
  3. 等待消費者取消協程,並註銷回調。

將上述步驟應用於當前用例,咱們獲得如下實現:

// 發送位置更新給消費者
fun FusedLocationProviderClient.locationFlow() = callbackFlow<Location> {
  // 建立了新的 Flow。這段代碼會在協程中執行。
  // 1. 建立回調並向 flow 中添加元素
  val callback = object : LocationCallback() {
    override fun onLocationResult(result: LocationResult?) {
      result ?: return  // 忽略爲空的結果
      for (location in result.locations) {
        try {
          offer(location)  // 將位置發送到 flow
        } catch (t: Throwable) {
          // 位置沒法發送到 flow
        }
      }
    }
  }
  
  // 2. 註冊回調並經過調用 requestLocationUpdates 獲取位置更新。
  requestLocationUpdates(
    createLocationRequest(),
    callback,
    Looper.getMainLooper()
  ).addOnFailureListener { e ->
    close(e)  // 在出錯時關閉 flow
  }
  
  // 3. 等待消費者取消協程並註銷回調。這一過程會掛起協程,直到 Flow 被關閉。
  awaitClose {
    // 在這裏清理代碼
    removeLocationUpdates(callback)
  }
}
複製代碼

callbackFlow 內部原理

在內部,callbackFlow 使用了一個 channel。channel 在概念上很接近阻塞 隊列 —— 它在配置時須要指定容量 (capacity): 便可以緩衝的元素個數。在 callbackFlow 中建立的 channel 默認容量是 64 個元素。若是將新元素添加到已滿的 channel,因爲 offer 不會將元素添加到 channel 中,而且會當即返回 false,因此 send 會暫停生產者,直到頻道 channel 中有新元素的可用空間爲止。

awaitClose 內部原理

有趣的是,awaitClose 內部使用的是 suspendCancellableCoroutine。您能夠經過我在如下代碼片斷中的註釋 (查看 原始實現) 一窺究竟:

public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
  ...
  try {
    // 使用可取消 continuation 掛起協程
    suspendCancellableCoroutine<Unit> { cont ->
      // 僅在 Flow 或 Channel 關閉時成功恢復協程,不然保持掛起
      invokeOnClose { cont.resume(Unit) }
    }
  } finally {
    // 老是會執行調用者的清理代碼
    block()
  }
}
複製代碼

複用 Flow

除非額外使用中間操做符 (如: conflate),不然 Flow 是冷且惰性的。這意味着每次調用 flow 的終端操做符時,都會執行構建塊。對於咱們的用例來講,因爲添加一個新的位置監聽器開銷很小,因此這一特性不會有什麼大問題。然而對於另外的一些實現可就不必定了。

您可使用 shareIn 中間操做符在多個收集器間複用同一個 flow,並使冷流成爲熱流。

val FusedLocationProviderClient.locationFlow() = callbackFlow<Location> {
  ...
}.shareIn(
  // 讓 flow 跟隨 applicationScope
  applicationScope,
  // 向新的收集器發送上一次發送的元素
  replay = 1,
  // 在有活躍的訂閱者時,保持生產者的活躍狀態
  started = SharingStarted.WhileSubscribed()
)
複製代碼

您能夠經過文章《協程中的取消和異常 | 駐留任務詳解》來了解更多有關在應用中使用 applicationScope 的最佳實踐。

您應當考慮經過建立協程適配器使您的 API 或現存 API 簡潔、易讀且符合 Kotlin 的使用習慣。首先檢查是否已經存在可用的適配器,若是沒有,您可使用 suspendCancellableCoroutine 針對一次性調用;或使用 callbackFlow 針對流數據,來建立您本身的適配器。

您能夠經過 codelab: 建立 Kotlin 擴展庫,來上手本文所介紹的話題。

相關文章
相關標籤/搜索