關鍵詞:Kotlin 協程 協程取消 任務中止java
協程的任務的取消須要靠協程內部調用的協做支持,這就相似於咱們線程中斷以及對中斷狀態的響應同樣。git
咱們先從你們熟悉的話題講起。線程有一個被廢棄的 stop
方法,這個方法會讓線程當即死掉,而且釋放它持有的鎖,這樣會讓它正在讀寫的存儲處於一個不安全的狀態,所以 stop
被廢棄了。若是咱們啓動了一個線程並讓它執行一些任務,但很快咱們就後悔了,stop
還不讓用,那該怎麼辦?程序員
val thread = thread {
...
}
thread.stop() // !!! Deprecated!!!
複製代碼
咱們應該想辦法讓線程內部正在運行的任務跟咱們合做把任務停掉,這樣線程內部的任務中止以前還有機會清理一些資源,好比關閉流等等。github
val thread = thread {
try {
Thread.sleep(10000)
} catch (e: InterruptedException) {
log("Interrupted, do cleaning stuff.")
}
}
thread.interrupt()
複製代碼
像 sleep
這樣的方法調用,文檔明確指出它支持 InterruptedException
,所以當線程被標記爲中斷狀態時,它就會拋出 InterruptedException
,那麼咱們天然就能夠捕獲異常並作資源清理了。api
因此請注意所謂的協做式的任務終止,協程的取消也就是 cancel
機制的思路也是如此。安全
咱們來看一個協程取消的例子:bash
fun main() = runBlocking {
val job1 = launch { // ①
log(1)
delay(1000) // ②
log(2)
}
delay(100)
log(3)
job1.cancel() // ③
log(4)
}
複製代碼
此次咱們用了一個不同的寫法,咱們沒有用 suspend main,而是直接用 runBlocking
啓動協程,這個方法在 Native 上也存在,都是基於當前線程啓動一個相似於 Android 的 Looper 的死循環,或者叫消息隊列,能夠不斷的發送消息給它進行處理。runBlocking
會啓動一個 Job
,所以這裏也存在默認的做用域,不過這對於咱們今天的討論暫時沒有太大影響。網絡
這段代碼 ① 處啓動了一個子協程,它內部先輸出 1,接着開始 delay
, delay
與線程的 sleep
不一樣,它不會阻塞線程,你能夠認爲它實際上就是觸發了一個延時任務,告訴協程調度系統 1000ms 以後再來執行後面的這段代碼(也就是 log(2));而在這期間,咱們在 ③ 處對剛纔啓動的協程觸發了取消,所以在 ② 處的 delay
尚未回調的時候協程就被取消了,由於 delay
能夠響應取消,所以 delay
後面的代碼就不會再次調度了,不調度的緣由也很簡單,② 處的 delay
會拋一個 CancellationException
:框架
...
log(1)
try {
delay(1000)
} catch (e: Exception) {
log("cancelled. $e")
}
log(2)
...
複製代碼
那麼輸出的結果就不同了:異步
06:54:56:361 [main] 1
06:54:56:408 [main] 3
06:54:56:411 [main] 4
06:54:56:413 [main] cancelled. kotlinx.coroutines.JobCancellationException: Job was cancelled; job=StandaloneCoroutine{Cancelling}@e73f9ac
06:54:56:413 [main] 2
複製代碼
你們看,這與線程的中斷邏輯是否是很是的相似呢?
以前咱們有個例子,上一篇文章已經加入了異常處理邏輯,那麼此次咱們給它加上取消邏輯。以前是這樣:
suspend fun getUserCoroutine() = suspendCoroutine<User> { continuation ->
getUser(object : Callback<User> {
override fun onSuccess(value: User) {
continuation.resume(value)
}
override fun onError(t: Throwable) {
continuation.resumeWithException(t)
}
})
}
複製代碼
加取消邏輯,那須要咱們的 getUser
回調版本支持取消,咱們看下咱們的 getUser
是怎麼實現的:
fun getUser(callback: Callback<User>) {
val call = OkHttpClient().newCall(
Request.Builder()
.get().url("https://api.github.com/users/bennyhuo")
.build())
call.enqueue(object : okhttp3.Callback {
override fun onFailure(call: Call, e: IOException) {
callback.onError(e)
}
override fun onResponse(call: Call, response: Response) {
response.body()?.let {
try {
callback.onSuccess(User.from(it.string()))
} catch (e: Exception) {
callback.onError(e) // 這裏多是解析異常
}
}?: callback.onError(NullPointerException("ResponseBody is null."))
}
})
}
複製代碼
咱們發了個網絡請求給 Github,讓它把一個叫 bennyhuo
的用戶信息返回來,咱們知道 OkHttp 的這個 Call
是支持 cancel
的, 取消後,網絡請求過程當中若是讀取到這個取消的狀態,就會把請求給中止掉。既然這樣,咱們乾脆直接改造 getUser
好了,這樣還能省掉咱們本身的 Callback
回調過程:
suspend fun getUserCoroutine() = suspendCancellableCoroutine<User> { continuation ->
val call = OkHttpClient().newCall(...)
continuation.invokeOnCancellation { // ①
log("invokeOnCancellation: cancel the request.")
call.cancel()
}
call.enqueue(object : okhttp3.Callback {
override fun onFailure(call: Call, e: IOException) {
log("onFailure: $e")
continuation.resumeWithException(e)
}
override fun onResponse(call: Call, response: Response) {
log("onResponse: ${response.code()}")
response.body()?.let {
try {
continuation.resume(User.from(it.string()))
} catch (e: Exception) {
continuation.resumeWithException(e)
}
} ?: continuation.resumeWithException(NullPointerException("ResponseBody is null."))
}
})
}
複製代碼
咱們這裏用到了 suspendCancellableCoroutine
,而不是以前的 suspendCoroutine
,這就是爲了讓咱們的掛起函數支持協程的取消。該方法將獲取到的 Continuation
包裝成了一個 CancellableContinuation
,經過調用它的 invokeOnCancellation
方法能夠設置一個取消事件的回調,一旦這個回調被調用,那麼意味着 getUserCoroutine
調用所在的協程被取消了,這時候咱們也要相應的作出取消的響應,也就是把 OkHttp 發出去的請求給取消掉。
那麼咱們在調用它的時候,若是遇到了取消,會怎麼樣呢?
val job1 = launch { //①
log(1)
val user = getUserCoroutine()
log(user)
log(2)
}
delay(10)
log(3)
job1.cancel()
log(4)
複製代碼
注意咱們啓動 ① 以後僅僅延遲了 10ms 就取消了它,網絡請求的速度通常來說還不會這麼快,所以取消的時候大機率 getUserCoroutine
被掛起了,所以結果大機率是:
07:31:30:751 [main] 1
07:31:31:120 [main] 3
07:31:31:124 [main] invokeOnCancellation: cancel the request.
07:31:31:129 [main] 4
07:31:31:131 [OkHttp https://api.github.com/...] onFailure: java.io.IOException: Canceled
複製代碼
咱們發現,取消的回調被調用了,OkHttp 在收到咱們的取消指令以後,也確實中止了網絡請求,而且回調給咱們一個 IO 異常,這時候咱們的協程已經被取消,在處於取消狀態的協程上調用 Continuation.resume
、 Continuation.resumeWithException
或者 Continuation.resumeWith
都會被忽略,所以 OkHttp 回調中咱們收到 IO 異常後調用的 continuation.resumeWithException(e)
不會有任何反作用。
我在破解 Kotlin 協程 - 入門篇 提到了 Jake Wharton 大神爲 Retrofit 寫的 協程 Adapter,
implementation 'com.jakewharton.retrofit:retrofit2-kotlin-coroutines-adapter:0.9.2'
複製代碼
它確實能夠完成網絡請求,不過有細心的小夥伴發現了它的問題:它怎麼取消呢?咱們把使用它的代碼貼出來:
interface GitHubServiceApi {
@GET("users/{login}")
fun getUserCoroutine(@Path("login") login: String): Deferred<User>
}
複製代碼
定義好接口,建立 Retrofit 實例的時候傳入對應的 Adapter:
val gitHubServiceApi by lazy {
val retrofit = retrofit2.Retrofit.Builder()
.baseUrl("https://api.github.com")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(CoroutineCallAdapterFactory()) // 這裏添加 Adapter
.build()
retrofit.create(GitHubServiceApi::class.java)
}
複製代碼
用的時候就這樣:
val deferred = gitHubServiceApi.getUserCoroutine("bennyhuo")
try {
showUser(deferred.await())
} catch (e: Exception) {
showError(e)
}
複製代碼
若是要取消,咱們能夠直接調用 deferred.cancel()
,例如:
log("1")
val deferred = gitHubServiceApi.getUserCoroutine("bennyhuo")
log("2")
withContext(Dispatchers.IO){
deferred.cancel()
}
try {
showUser(deferred.await())
} catch (e: Exception) {
showError(e)
}
複製代碼
運行結果以下:
12:59:54:185 [DefaultDispatcher-worker-1] 1
12:59:54:587 [DefaultDispatcher-worker-1] 2
kotlinx.coroutines.JobCancellationException: Job was cancelled; job=CompletableDeferredImpl{Cancelled}@36699211
複製代碼
這種狀況下,其實網絡請求確實是被取消的,這一點咱們能夠看下源碼的處理:
...
override fun adapt(call: Call<T>): Deferred<T> {
val deferred = CompletableDeferred<T>()
deferred.invokeOnCompletion { // ①
if (deferred.isCancelled) {
call.cancel()
}
}
call.enqueue(object : Callback<T> {
...
}
}
...
複製代碼
注意 ① 處,invokeOnCompletion
在協程進入完成狀態時觸發,包括異常和正常完成,那麼在這時候若是發現它的狀態是已經取消的,那麼結果就直接調用 Call
的取消便可。
這看上去確實很正常啊~ 不過 @阿永 在公衆號的評論裏面提到了一個 Case,仔細一看還真是有問題。咱們給出示例來複現這個 Case:
val job = GlobalScope.launch {
log("1")
val deferred = gitHubServiceApi.getUserCoroutine("bennyhuo")
log("2")
deferred.invokeOnCompletion {
log("invokeOnCompletion, $it, ${deferred.isCancelled}")
}
try {
showUser(deferred.await())
} catch (e: Exception) {
showError(e)
}
log(3)
}
delay(10)
job.cancelAndJoin()
複製代碼
咱們啓動一個協程,在其中執行網絡請求,那麼正常來講,這時候 getUserCoroutine
返回的 Deferred
能夠當作一個子協程,它應當遵循默認的做用域規則,在父做用域取消時被取消掉,但現實卻並非這樣:
13:06:54:332 [DefaultDispatcher-worker-1] 1
13:06:54:829 [DefaultDispatcher-worker-1] 2
kotlinx.coroutines.JobCancellationException: Job was cancelled; job=StandaloneCoroutine{Cancelling}@19aea38c
13:06:54:846 [DefaultDispatcher-worker-1] 3
13:06:56:937 [OkHttp https://api.github.com/...] invokeOnCompletion, null, false
複製代碼
咱們看到在調用 deferred.await()
的時候拋了個取消異常,這主要是由於 await()
所在的協程已經被咱們用 cancelAndJoin()
取消,但從隨後 invokeOnCompletion
的回調結果來看, getUserCoroutine
返回的 Deferred
並無被取消,再仔細一看,時間上這個回調比前面的操做晚了 2s,那必然是網絡請求返回以後纔回調的。
因此問題究竟在哪裏?在 CoroutineCallAdapterFactory
的實現中,爲了實現異步轉換,手動建立了一個 CompletableDeferred
:
override fun adapt(call: Call<T>): Deferred<T> {
val deferred = CompletableDeferred<T>() // ①
...
}
複製代碼
這個 CompletableDeferred
自己就是一個 Job
的實現,它的構造可接受一個 Job
實例做爲它的父協程,那麼問題來了,這裏並無告訴它父協程到底是誰,所以也就談不上做用域的事兒了,這好像咱們用 GlobalScope.launch
啓動了一個協程同樣。若是你們在 Android 當中使用 MainScope
,那麼一樣由於前面說到的這個緣由,致使 CompletableDeferred
沒有辦法被取消。
@阿永 在公衆號評論中提到這個問題,並提到了一個比較好的解決方案,下面咱們爲你們詳細介紹。感謝 @阿永。
說到這裏咱們再簡單回顧下,做用域主要有 GlobalScope
、coroutineScope
、supervisorScope
,對於取消,除了 supervisorScope
比較特別是單向取消,即父協程取消後子協程都取消,Android 中 MainScope
就是一個調度到 UI 線程的 supervisorScope
;coroutineScope
的邏輯則是父子相互取消的邏輯;而 GlobalScope
會啓動一個全新的做用域,與它外部隔離,內部遵循默認的協程做用域規則。
那麼有沒有辦法解決這個問題呢?
直接解決仍是比較困難的,由於 CompletableDeferred
構造所處的調用環境不是 suspend 函數,於是也沒有辦法拿到(極可能根本就沒有!)父協程。
前面咱們提到既然 adapt
方法不是 suspend 方法,那麼咱們是否是應該在其餘位置建立協程呢?
其實咱們前面在講 getUserCoroutine
的時候就不斷爲你們展現瞭如何將一個回調轉換爲協程調用的方法:
suspend fun getUserCoroutine() = suspendCancellableCoroutine<User> { continuation ->
...
}
複製代碼
suspendCancellableCoroutine
跟最初咱們提到的 suspendCoroutine
同樣,都是要獲取當前協程的 Continuation
實例,這實際上就至關於要繼承當前協程的上下文,所以咱們只須要在真正須要切換協程的時候再去作這個轉換便可:
public suspend fun <T : Any> Call<T>.await(): T {
return suspendCancellableCoroutine { continuation ->
enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>?, response: Response<T?>) {
continuation.resumeWith(runCatching { // ①
if (response.isSuccessful) {
response.body()
?: throw NullPointerException("Response body is null: $response")
} else {
throw HttpException(response)
}
})
}
override fun onFailure(call: Call<T>, t: Throwable) {
if (continuation.isCancelled) return // ②
continuation.resumeWithException(t)
}
})
continuation.invokeOnCancellation {
try {
cancel()
} catch (ex: Throwable) { // ③
//Ignore cancel exception
}
}
}
}
複製代碼
你們看着這段代碼會不會很眼熟?這與咱們 getUserCoroutine
的寫法幾乎一模一樣,不過有幾處細節值得關注,我用數字標註了他們的位置:
runCatching
能夠將一段代碼的運行結果或者拋出的異常封裝到一個 Result
類型當中,Kotlin 1.3 開始新增了 Continuation.resumeWith(Result)
這個方法, 這個點比起咱們前面的寫法更具 Kotlin 風格。Continuation
的取消發起的,所以這時候不必再調用 continuation.resumeWithException(t)
來將異常再拋回來了。儘管咱們前面其實也提到過,這時候繼續調用 continuation.resumeWithException(t)
也沒有任何邏輯上的反作用,但性能上多少仍是會有一些開銷。Call.cancel
的調用比較安全,但網絡環境和狀態不免狀況複雜,所以對異常進行捕獲會讓這段代碼更加健壯。若是 cancel
拋異常而沒有捕獲的話,那麼等同於協程體內部拋出異常,具體如何傳播看所在做用域的相關定義了。須要指出的是,這段代碼片斷源自 gildor/kotlin-coroutines-retrofit ,你們也能夠直接添加依賴進行使用:
compile 'ru.gildor.coroutines:kotlin-coroutines-retrofit:1.1.0'
複製代碼
這個框架代碼量不多,但通過各路 Kotlin 協程專家的錘鍊,邏輯手法很細膩,值得你們學習。
這篇文章咱們從線程中斷的概念切入,類比學習協程的取消,實際上你們就會發現這兩者從邏輯上和場景上有多麼的類似。接着咱們將以前咱們一直提到的回調轉協程的例子進一步升級,支持取消,這樣你們就能夠輕易的將回調轉變爲協程的掛起調用了。最後咱們還分析了一下 Retrofit 的協程擴展的一些問題和解決方法,這個例子也進一步能夠引起咱們對協程做用域以及如何將現有程序協程化的思考。
再稍微提一句,協程不是一個簡單的東西,畢竟它的原理涉及到對操做系統調度、程序運行機制這樣程序界畢竟原始的話題,但你說若是我對前面提到的這些都不是很熟悉或者根本沒有接觸過,是否是就要跟協程拜拜了呢,其實也不是,只不過若是你對這些都不熟悉,那麼可能須要多加練習培養出感受,而沒必要一開始就關注原理和細節,依樣畫葫蘆同樣能夠用的很好,就像你們不知道 RxJava 原理同樣能夠用的很好同樣,協程也能夠作到這一點的。
固然,做爲一個有追求的程序員,咱們不止要會用,還要用得好,不管如何咱們都須要知道前因後果,這其中涉及到的基礎知識的欠缺也是須要儘快補充的,不能偷懶哈 :)
歡迎關注 Kotlin 中文社區!
中文官網:www.kotlincn.net/
中文官方博客:www.kotliner.cn/
公衆號:Kotlin
知乎專欄:Kotlin
CSDN:Kotlin中文社區
掘金:Kotlin中文社區
簡書:Kotlin中文社區