若是您是庫做者,您也許但願用戶在使用 Kotlin 協程與 Flow 時能夠更加輕鬆地調用您基於 Java 或回調的 API。另外,若是您是 API 的使用者,則可能願意將第三方 API 界面適配協程,以使它們對 Kotlin 更友好。html
本文將會介紹如何使用協程和 Flow 簡化 API,以及如何使用 suspendCancellableCoroutine 和 callbackFlow API 建立您本身的適配器。針對那些富有好奇心的讀者,本文還會對這些 API 進行剖析,以幫您瞭解它們底層的工做原理。react
若是您更喜歡觀看視頻,能夠 點擊這裏。android
在您爲現有 API 編寫本身的封裝以前,請檢查是否已經存在針對您的用例的適配器或者 擴展方法。下面是一些包含常見類型協程適配器的庫。git
Future 類型github
對於 future 類型,Java 8 集成了 CompletableFuture,而 Guava 集成了 ListenableFuture。這裏提到的並非所有,您能夠在線搜索以肯定是否存在適用於您的 future 類型的適配器。編程
// 等待 CompletionStage 的執行完成而不阻塞線程 suspend fun <T> CompletionStage<T>.await(): T // 等待 ListenableFuture 的執行完成而不阻塞線程 suspend fun <T> ListenableFuture<T>.await(): T
使用這些函數,您能夠擺脫回調並掛起協程直到 future 的結果被返回。api
Reactive Streamapp
對於響應式流的庫,有針對 RxJava、Java 9 API 與 響應式流庫 的集成:異步
// 將給定的響應式 Publisher 轉換爲 Flow fun <T : Any> Publisher<T>.asFlow(): Flow<T>
這些函數將響應式流轉換爲了 Flow。jvm
Android 專用 API
對於 Jetpack 庫或 Android 平臺 API,您能夠參閱 Jetpack KTX 庫 列表。現有超過 20 個庫擁有 KTX 版本,構成了您所熟悉的 Java API。其中包括 SharedPreferences、ViewModels、SQLite 以及 Play Core。
回調
回調是實現異步通信時很是常見的作法。事實上,咱們在 後臺線程任務運行指南 中將回調做爲 Java 編程語言的默認解決方案。然而,回調也有許多缺點: 這一設計會致使使人費解的回調嵌套。同時,因爲沒有簡單的傳播方式,錯誤處理也更加複雜。在 Kotlin 中,您能夠簡單地使用協程調用回調,但前提是您必須建立您本身的適配器。
若是沒有找到適合您用例的適配器,更直接的作法是本身編寫適配器。對於一次性異步調用,可使用 suspendCancellableCoroutine API;而對於流數據,可使用 callbackFlow API。
做爲練習,下面的示例將會使用來自 Google Play Services 的 Fused Location Provider API 來獲取位置數據。此 API 界面十分簡單,可是它使用回調來執行異步操做。當邏輯變得複雜時,這些回調容易使代碼變得不可讀,而咱們可使用協程來擺脫它們。
若是您但願探索其它解決方案,能夠經過上面函數所連接的源代碼爲您帶來啓發。
一次性異步調用
Fused Location Provider API 提供了 getLastLocation) 方法來得到 最後已知位置。對於協程來講,理想的 API 是一個直接返回確切結果的掛起函數。
注意: 這一 API 返回值爲 Task,而且已經有了對應的 適配器。出於學習的目的,咱們用它做爲範例。
咱們能夠經過爲 FusedLocationProviderClient
建立擴展函數來得到更好的 API:
suspend fun FusedLocationProviderClient.awaitLastLocation(): Location
因爲這是一個一次性異步操做,咱們使用 suspendCancellableCoroutine
函數: 一個用於從協程庫建立掛起函數的底層構建塊。
suspendCancellableCoroutine
會執行做爲參數傳入的代碼塊,而後在等待繼續信號期間掛起協程的執行。當協程 Continuation 對象中的 resume
或 resumeWithException
方法被調用時,協程會被恢復執行。有關 Continuation 的更多信息,請參閱: Kotlin Vocabulary | 揭祕協程中的 suspend 修飾符。
咱們使用能夠添加到 getLastLocation 方法中的回調來在合適的時機恢復協程。參見下面的實現:
// FusedLocationProviderClient 的擴展函數,返回最後已知位置 suspend fun FusedLocationProviderClient.awaitLastLocation(): Location = // 建立新的可取消協程 suspendCancellableCoroutine<Location> { continuation -> // 添加恢復協程執行的監聽器 lastLocation.addOnSuccessListener { location -> // 恢復協程並返回位置 continuation.resume(location) }.addOnFailureListener { e -> // 經過拋出異常來恢復協程 continuation.resumeWithException(e) } // suspendCancellableCoroutine 塊的結尾。這裏會掛起協程 //直到某個回調調用了 continuation 參數 }
注意: 儘管協程庫中一樣包含了不可取消版本的協程構建器 (即 suspendCoroutine
),但最好始終選擇使用 suspendCancellableCoroutine
處理協程做用域的取消及從底層 API 傳播取消事件。
suspendCancellableCoroutine 原理
在內部,suspendCancellableCoroutine 使用 suspendCoroutineUninterceptedOrReturn 在掛起函數的協程中得到 Continuation。這一 Continuation
對象會被一個 CancellableContinuation 對象攔截,後者會今後時開始控制協程的生命週期 (其 實現 具備 Job 的功能,可是有一些限制)。
接下來,傳遞給 suspendCancellableCoroutine
的 lambda 表達式會被執行。若是該 lambda 返回告終果,則協程將當即恢復;不然協程將會在 CancellableContinuation 被 lambda 手動恢復前保持掛起狀態。
您能夠經過我在下面代碼片斷 (原版實現) 中的註釋來了解發生了什麼:
public suspend inline fun <T> suspendCancellableCoroutine( crossinline block: (CancellableContinuation<T>) -> Unit ): T = // 獲取運行此掛起函數的協程的 Continuation 對象 suspendCoroutineUninterceptedOrReturn { uCont -> // 接管協程。Continuation 已經被攔截, // 接下來將會遵循 CancellableContinuationImpl 的生命週期 val cancellable = CancellableContinuationImpl(uCont.intercepted(), ...) /* ... */ // 使用可取消 Continuation 調用代碼塊 block(cancellable) // 掛起協程而且等待 Continuation 在 「block」 中被恢復,或者在 「block」 結束執行時返回結果 cancellable.getResult() }
想了解更多有關掛起函數的工做原理,請參閱這篇: Kotlin Vocabulary | 揭祕協程中的 suspend 修飾符。
流數據
若是咱們轉而但願用戶的設備在真實的環境中移動時,週期性地接收位置更新 (使用 requestLocationUpdates) 函數),咱們就須要使用 Flow 來建立數據流。理想的 API 看起來應該像下面這樣:
fun FusedLocationProviderClient.locationFlow(): Flow<Location>
爲了將基於回調的 API 轉換爲 Flow,可使用 callbackFlow 流構建器來建立新的 flow。callbackFlow
的 lambda 表達式的內部處於一個協程的上下文中,這意味着它能夠調用掛起函數。不一樣於 flow 流構建器,channelFlow 能夠在不一樣的 CoroutineContext 或協程以外使用 offer 方法發送數據。
一般狀況下,使用 callbackFlow 構建流適配器遵循如下三個步驟:
將上述步驟應用於當前用例,咱們獲得如下實現:
// 發送位置更新給消費者 fun FusedLocationProviderClient.locationFlow() = callbackFlow<Location> { // 建立了新的 Flow。這段代碼會在協程中執行。 // 1. 建立回調並向 flow 中添加元素 val callback = object : LocationCallback() { override fun onLocationResult(result: LocationResult?) { result ?: return // 忽略爲空的結果 for (location in result.locations) { try { offer(location) // 將位置發送到 flow } catch (t: Throwable) { // 位置沒法發送到 flow } } } } // 2. 註冊回調並經過調用 requestLocationUpdates 獲取位置更新。 requestLocationUpdates( createLocationRequest(), callback, Looper.getMainLooper() ).addOnFailureListener { e -> close(e) // 在出錯時關閉 flow } // 3. 等待消費者取消協程並註銷回調。這一過程會掛起協程,直到 Flow 被關閉。 awaitClose { // 在這裏清理代碼 removeLocationUpdates(callback) } }
callbackFlow 內部原理
在內部,callbackFlow 使用了一個 channel。channel 在概念上很接近阻塞 隊列) —— 它在配置時須要指定容量 (capacity): 便可以緩衝的元素個數。在 callbackFlow 中建立的 channel 默認容量是 64 個元素。若是將新元素添加到已滿的 channel,因爲 offer 不會將元素添加到 channel 中,而且會當即返回 false,因此 send 會暫停生產者,直到頻道 channel 中有新元素的可用空間爲止。
awaitClose 內部原理
有趣的是,awaitClose
內部使用的是 suspendCancellableCoroutine
。您能夠經過我在如下代碼片斷中的註釋 (查看 原始實現) 一窺究竟:
public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) { ... try { // 使用可取消 continuation 掛起協程 suspendCancellableCoroutine<Unit> { cont -> // 僅在 Flow 或 Channel 關閉時成功恢復協程,不然保持掛起 invokeOnClose { cont.resume(Unit) } } } finally { // 老是會執行調用者的清理代碼 block() } }
複用 Flow
除非額外使用中間操做符 (如: conflate
),不然 Flow 是冷且惰性的。這意味着每次調用 flow 的終端操做符時,都會執行構建塊。對於咱們的用例來講,因爲添加一個新的位置監聽器開銷很小,因此這一特性不會有什麼大問題。然而對於另外的一些實現可就不必定了。
您可使用 shareIn
中間操做符在多個收集器間複用同一個 flow,並使冷流成爲熱流。
val FusedLocationProviderClient.locationFlow() = callbackFlow<Location> { ... }.shareIn( // 讓 flow 跟隨 applicationScope applicationScope, // 向新的收集器發送上一次發送的元素 replay = 1, // 在有活躍的訂閱者時,保持生產者的活躍狀態 started = SharingStarted.WhileSubscribed() )
您能夠經過文章《協程中的取消和異常 | 駐留任務詳解》來了解更多有關在應用中使用 applicationScope
的最佳實踐。
您應當考慮經過建立協程適配器使您的 API 或現存 API 簡潔、易讀且符合 Kotlin 的使用習慣。首先檢查是否已經存在可用的適配器,若是沒有,您可使用 suspendCancellableCoroutine
針對一次性調用;或使用 callbackFlow
針對流數據,來建立您本身的適配器。
您能夠經過 codelab: 建立 Kotlin 擴展庫,來上手本文所介紹的話題。