若是您是庫做者,您也許但願用戶在使用 Kotlin 協程與 Flow 時能夠更加輕鬆地調用您基於 Java 或回調的 API。另外,若是您是 API 的使用者,則可能願意將第三方 API 界面適配協程,以使它們對 Kotlin 更友好。html
本文將會介紹如何使用協程和 Flow 簡化 API,以及如何使用 suspendCancellableCoroutine 和 callbackFlow API 建立您本身的適配器。針對那些富有好奇心的讀者,本文還會對這些 API 進行剖析,以幫您瞭解它們底層的工做原理。react
若是您更喜歡觀看視頻,能夠 點擊這裏。android
在您爲現有 API 編寫本身的封裝以前,請檢查是否已經存在針對您的用例的適配器或者 擴展方法。下面是一些包含常見類型協程適配器的庫。git
Future 類型github
對於 future 類型,Java 8 集成了 CompletableFuture,而 Guava 集成了 ListenableFuture。這裏提到的並非所有,您能夠在線搜索以肯定是否存在適用於您的 future 類型的適配器。編程
// 等待 CompletionStage 的執行完成而不阻塞線程
suspend fun <T> CompletionStage<T>.await(): T
// 等待 ListenableFuture 的執行完成而不阻塞線程
suspend fun <T> ListenableFuture<T>.await(): T
複製代碼
使用這些函數,您能夠擺脫回調並掛起協程直到 future 的結果被返回。api
Reactive Streammarkdown
對於響應式流的庫,有針對 RxJava、Java 9 API 與 響應式流庫 的集成:app
// 將給定的響應式 Publisher 轉換爲 Flow
fun <T : Any> Publisher<T>.asFlow(): Flow<T>
複製代碼
這些函數將響應式流轉換爲了 Flow。異步
Android 專用 API
對於 Jetpack 庫或 Android 平臺 API,您能夠參閱 Jetpack KTX 庫 列表。現有超過 20 個庫擁有 KTX 版本,構成了您所熟悉的 Java API。其中包括 SharedPreferences、ViewModels、SQLite 以及 Play Core。
回調
回調是實現異步通信時很是常見的作法。事實上,咱們在 後臺線程任務運行指南 中將回調做爲 Java 編程語言的默認解決方案。然而,回調也有許多缺點: 這一設計會致使使人費解的回調嵌套。同時,因爲沒有簡單的傳播方式,錯誤處理也更加複雜。在 Kotlin 中,您能夠簡單地使用協程調用回調,但前提是您必須建立您本身的適配器。
若是沒有找到適合您用例的適配器,更直接的作法是本身編寫適配器。對於一次性異步調用,可使用 suspendCancellableCoroutine API;而對於流數據,可使用 callbackFlow API。
做爲練習,下面的示例將會使用來自 Google Play Services 的 Fused Location Provider API 來獲取位置數據。此 API 界面十分簡單,可是它使用回調來執行異步操做。當邏輯變得複雜時,這些回調容易使代碼變得不可讀,而咱們可使用協程來擺脫它們。
若是您但願探索其它解決方案,能夠經過上面函數所連接的源代碼爲您帶來啓發。
一次性異步調用
Fused Location Provider API 提供了 getLastLocation 方法來得到 最後已知位置。對於協程來講,理想的 API 是一個直接返回確切結果的掛起函數。
咱們能夠經過爲 FusedLocationProviderClient
建立擴展函數來得到更好的 API:
suspend fun FusedLocationProviderClient.awaitLastLocation(): Location
複製代碼
因爲這是一個一次性異步操做,咱們使用 suspendCancellableCoroutine
函數: 一個用於從協程庫建立掛起函數的底層構建塊。
suspendCancellableCoroutine
會執行做爲參數傳入的代碼塊,而後在等待繼續信號期間掛起協程的執行。當協程 Continuation 對象中的 resume
或 resumeWithException
方法被調用時,協程會被恢復執行。有關 Continuation 的更多信息,請參閱: Kotlin Vocabulary | 揭祕協程中的 suspend 修飾符。
咱們使用能夠添加到 getLastLocation 方法中的回調來在合適的時機恢復協程。參見下面的實現:
// FusedLocationProviderClient 的擴展函數,返回最後已知位置
suspend fun FusedLocationProviderClient.awaitLastLocation(): Location =
// 建立新的可取消協程
suspendCancellableCoroutine<Location> { continuation ->
// 添加恢復協程執行的監聽器
lastLocation.addOnSuccessListener { location ->
// 恢復協程並返回位置
continuation.resume(location)
}.addOnFailureListener { e ->
// 經過拋出異常來恢復協程
continuation.resumeWithException(e)
}
// suspendCancellableCoroutine 塊的結尾。這裏會掛起協程
//直到某個回調調用了 continuation 參數
}
複製代碼
注意: 儘管協程庫中一樣包含了不可取消版本的協程構建器 (即 suspendCoroutine
),但最好始終選擇使用 suspendCancellableCoroutine
處理協程做用域的取消及從底層 API 傳播取消事件。
suspendCancellableCoroutine 原理
在內部,suspendCancellableCoroutine 使用 suspendCoroutineUninterceptedOrReturn 在掛起函數的協程中得到 Continuation。這一 Continuation
對象會被一個 CancellableContinuation 對象攔截,後者會今後時開始控制協程的生命週期 (其 實現 具備 Job 的功能,可是有一些限制)。
接下來,傳遞給 suspendCancellableCoroutine
的 lambda 表達式會被執行。若是該 lambda 返回告終果,則協程將當即恢復;不然協程將會在 CancellableContinuation 被 lambda 手動恢復前保持掛起狀態。
您能夠經過我在下面代碼片斷 (原版實現) 中的註釋來了解發生了什麼:
public suspend inline fun <T> suspendCancellableCoroutine( crossinline block: (CancellableContinuation<T>) -> Unit ): T =
// 獲取運行此掛起函數的協程的 Continuation 對象
suspendCoroutineUninterceptedOrReturn { uCont ->
// 接管協程。Continuation 已經被攔截,
// 接下來將會遵循 CancellableContinuationImpl 的生命週期
val cancellable = CancellableContinuationImpl(uCont.intercepted(), ...)
/* ... */
// 使用可取消 Continuation 調用代碼塊
block(cancellable)
// 掛起協程而且等待 Continuation 在 「block」 中被恢復,或者在 「block」 結束執行時返回結果
cancellable.getResult()
}
複製代碼
想了解更多有關掛起函數的工做原理,請參閱這篇: Kotlin Vocabulary | 揭祕協程中的 suspend 修飾符。
流數據
若是咱們轉而但願用戶的設備在真實的環境中移動時,週期性地接收位置更新 (使用 requestLocationUpdates 函數),咱們就須要使用 Flow 來建立數據流。理想的 API 看起來應該像下面這樣:
fun FusedLocationProviderClient.locationFlow(): Flow<Location>
複製代碼
爲了將基於回調的 API 轉換爲 Flow,可使用 callbackFlow 流構建器來建立新的 flow。callbackFlow
的 lambda 表達式的內部處於一個協程的上下文中,這意味着它能夠調用掛起函數。不一樣於 flow 流構建器,channelFlow 能夠在不一樣的 CoroutineContext 或協程以外使用 offer 方法發送數據。
一般狀況下,使用 callbackFlow 構建流適配器遵循如下三個步驟:
將上述步驟應用於當前用例,咱們獲得如下實現:
// 發送位置更新給消費者
fun FusedLocationProviderClient.locationFlow() = callbackFlow<Location> {
// 建立了新的 Flow。這段代碼會在協程中執行。
// 1. 建立回調並向 flow 中添加元素
val callback = object : LocationCallback() {
override fun onLocationResult(result: LocationResult?) {
result ?: return // 忽略爲空的結果
for (location in result.locations) {
try {
offer(location) // 將位置發送到 flow
} catch (t: Throwable) {
// 位置沒法發送到 flow
}
}
}
}
// 2. 註冊回調並經過調用 requestLocationUpdates 獲取位置更新。
requestLocationUpdates(
createLocationRequest(),
callback,
Looper.getMainLooper()
).addOnFailureListener { e ->
close(e) // 在出錯時關閉 flow
}
// 3. 等待消費者取消協程並註銷回調。這一過程會掛起協程,直到 Flow 被關閉。
awaitClose {
// 在這裏清理代碼
removeLocationUpdates(callback)
}
}
複製代碼
callbackFlow 內部原理
在內部,callbackFlow 使用了一個 channel。channel 在概念上很接近阻塞 隊列 —— 它在配置時須要指定容量 (capacity): 便可以緩衝的元素個數。在 callbackFlow 中建立的 channel 默認容量是 64 個元素。若是將新元素添加到已滿的 channel,因爲 offer 不會將元素添加到 channel 中,而且會當即返回 false,因此 send 會暫停生產者,直到頻道 channel 中有新元素的可用空間爲止。
awaitClose 內部原理
有趣的是,awaitClose
內部使用的是 suspendCancellableCoroutine
。您能夠經過我在如下代碼片斷中的註釋 (查看 原始實現) 一窺究竟:
public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) {
...
try {
// 使用可取消 continuation 掛起協程
suspendCancellableCoroutine<Unit> { cont ->
// 僅在 Flow 或 Channel 關閉時成功恢復協程,不然保持掛起
invokeOnClose { cont.resume(Unit) }
}
} finally {
// 老是會執行調用者的清理代碼
block()
}
}
複製代碼
複用 Flow
除非額外使用中間操做符 (如: conflate
),不然 Flow 是冷且惰性的。這意味着每次調用 flow 的終端操做符時,都會執行構建塊。對於咱們的用例來講,因爲添加一個新的位置監聽器開銷很小,因此這一特性不會有什麼大問題。然而對於另外的一些實現可就不必定了。
您可使用 shareIn
中間操做符在多個收集器間複用同一個 flow,並使冷流成爲熱流。
val FusedLocationProviderClient.locationFlow() = callbackFlow<Location> {
...
}.shareIn(
// 讓 flow 跟隨 applicationScope
applicationScope,
// 向新的收集器發送上一次發送的元素
replay = 1,
// 在有活躍的訂閱者時,保持生產者的活躍狀態
started = SharingStarted.WhileSubscribed()
)
複製代碼
您能夠經過文章《協程中的取消和異常 | 駐留任務詳解》來了解更多有關在應用中使用 applicationScope
的最佳實踐。
您應當考慮經過建立協程適配器使您的 API 或現存 API 簡潔、易讀且符合 Kotlin 的使用習慣。首先檢查是否已經存在可用的適配器,若是沒有,您可使用 suspendCancellableCoroutine
針對一次性調用;或使用 callbackFlow
針對流數據,來建立您本身的適配器。
您能夠經過 codelab: 建立 Kotlin 擴展庫,來上手本文所介紹的話題。