- 原文地址:RxJava vs. Kotlin Coroutines, a quick look
- 原文做者:Dávid Karnok
- 譯文出自:掘金翻譯計劃
- 本文永久連接:github.com/xitu/gold-m…
- 譯者:PhxNirvana
- 校對者:jamweak、jerry-shao
Kotlin 的協程是否讓 RxJava 和 響應式編程光輝再也不 了呢?答案取決於你詢問的對象。狂信徒和營銷者們會堅決果斷地是是是。若是真是這樣的話,開發者們早晚會將 Rx 代碼用協程重寫一遍,抑或從一開始就用協程來寫。 由於 協程 目前仍是實驗性的,因此目前的諸如性能瓶頸之類的不足,都將逐漸解決。所以,相對於原生性能,本文的重點更在於易用性方面。html
假設有兩個函數,f1 和 f2,用來模仿不可信的服務,兩者都會在一段延遲以後返回一個數。調用這兩個函數,將其返回值求和並呈現給用戶。然而若是 500ms 以內沒有返回的話,就再也不期望它會返回值了,所以咱們會在有限次數內取消並重試,直到超過次數最終放棄請求。前端
協程用起來就像是傳統的 基於 ExecutorService 和 Future 的工具套裝, 不一樣點在於協程的底層是用的掛起、狀態機和任務調度來代替線程阻塞的。java
首先,寫兩個函數來實現延遲操做:react
suspend fun f1(i: Int) {
Thread.sleep(if (i != 2) 2000L else 200L)
return 1;
}
suspend fun f2(i: Int) {
Thread.sleep(if (i != 2) 2000L else 200L)
return 2;
}
複製代碼
與協程調度有關的函數須要加上 suspend 關鍵字並經過協程上下文來調用。爲了演示上面的目的,若是傳入參數不是 2 的時候,函數會延遲 2s。這樣就會讓超時檢測將其結束掉,並在第三次嘗試時在規定時間內成功。android
由於異步總會在結束時離開主線程,咱們須要一個方法來在業務邏輯完成前阻塞它,以防止直接退出 JVM。爲了達到目的,可使用 runBlocking 在主線程中調用函數。ios
fun main(arg: Array<string>) = runBlocking <unit>{
coroutineWay()
reactiveWay()
}
suspend func coroutineWay() {
// TODO implement
}
func reactiveWay() {
// TODO implement
}</unit> </string>
複製代碼
相比 RxJava 的函數式,用協程寫出來的代碼邏輯更簡潔,並且代碼看起來就像是線性和同步的同樣。git
suspend fun coroutineWay() {
val t0 = System.currentTimeMillis()
var i = 0;
while (true) { // (1)
println("Attempt " + (i + 1) + " at T=" +
(System.currentTimeMillis() - t0))
var v1 = async(CommonPool) { f1(i) } // (2)
var v2 = async(CommonPool) { f2(i) }
var v3 = launch(CommonPool) { // (3)
Thread.sleep(500)
println(" Cancelling at T=" +
(System.currentTimeMillis() - t0))
val te = TimeoutException();
v1.cancel(te); // (4)
v2.cancel(te);
}
try {
val r1 = v1.await(); // (5)
val r2 = v2.await();
v3.cancel(); // (6)
println(r1 + r2)
break;
} catch (ex: TimeoutException) { // (7)
println(" Crash at T=" +
(System.currentTimeMillis() - t0))
if (++i > 2) { // (8)
throw ex;
}
}
}
println("End at T="
+ (System.currentTimeMillis() - t0)) // (9)
}
複製代碼
添加的一些輸出是用來觀察這段代碼如何運行的。程序員
看起來挺簡單的,儘管取消機制可能搞個大新聞:若是 v2 由於其餘異常(好比網絡緣由致使的 IOException)崩潰了呢?固然咱們得處理這些狀況來確保任務能夠在各類狀況下被取消(舉個栗子,試試 Kotlin 中的資源?)。然而,這種狀況發生的背景是 v1 會及時返回,直到嘗試 await 以前都沒法取消 v1 或檢測 v2 的崩潰。github
不要在乎那些細節,反正程序跑起來了,運行結果以下:編程
Attempt 1 at T=0
Cancelling at T=531
Crash at T=2017
Attempt 2 at T=2017
Cancelling at T=2517
Crash at T=4026
Attempt 3 at T=4026
3
End a
複製代碼
一共進行了 3 次嘗試,最後一次成功了,值是 3。是否是和劇本如出一轍的?一點都不快(此處有雙關(譯者並無看出來哪裏有雙關))! 咱們能夠看到取消事件發生的大概時間,兩次不成功的請求以後大約 500 ms ,然而異常捕獲發生在大約 2000 ms 以後!咱們知道 cancel() 被成功調用是由於咱們捕獲了異常。然而,看起來函數中的 Thread.sleep() 並無被打斷,或者用協程的說法,沒有在打斷異常時恢復。這多是 CommonPool 的一部分,對 Future.cancel(false) 的調用處於基礎結構中,抑或只是簡單的程序限制。
接下來咱們看看 RxJava 2 是如何實現相同操做的。讓人失望的是,若是函數前加了 suspended,就沒法經過普通方式調用了,因此咱們還得用普通方法重寫一下兩個函數:
fun f3(i: Int) : Int {
Thread.sleep(if (i != 2) 2000L else 200L)
return 1
}
fun f4(i: Int) : Int {
Thread.sleep(if (i != 2) 2000L else 200L)
return 2
}
複製代碼
爲了匹配阻塞外部環境的功能,咱們採用 RxJava 2 Extensions 中的 BlockingScheduler 來提供返回到主線程的功能。顧名思義,它阻塞了一開始的調用者/主線程,直到有任務經過調度器來提交併運行。
fun reactiveWay() {
RxJavaPlugins.setErrorHandler({ }) // (1)
val sched = BlockingScheduler() // (2)
sched.execute {
val t0 = System.currentTimeMillis()
val count = Array<Int>(1, { 0 }) // (3)
Single.defer({ // (4)
val c = count[0]++;
println("Attempt " + (c + 1) +
" at T=" + (System.currentTimeMillis() - t0))
Single.zip( // (5)
Single.fromCallable({ f3(c) })
.subscribeOn(Schedulers.io()),
Single.fromCallable({ f4(c) })
.subscribeOn(Schedulers.io()),
BiFunction<Int, Int> { a, b -> a + b } // (6)
)
})
.doOnDispose({ // (7)
println(" Cancelling at T=" +
(System.currentTimeMillis() - t0))
})
.timeout(500, TimeUnit.MILLISECONDS) // (8)
.retry({ x, e ->
println(" Crash at " +
(System.currentTimeMillis() - t0))
x < 3 && e is TimeoutException // (9)
})
.doAfterTerminate { sched.shutdown() } // (10)
.subscribe({
println(it)
println("End at T=" +
(System.currentTimeMillis() - t0)) // (11)
},
{ it.printStackTrace() })
}
}
複製代碼
實現起來有點長,對那些不熟悉 lambda 的人來講看起來可能有點可怕。
可能有人說,這比協程的實現複雜多了。不過……至少跑起來了:
Cancelling at T=4527
Attempt 1 at T=72
Cancelling at T=587
Crash at 587
Attempt 2 at T=587
Cancelling at T=1089
Crash at 1090
Attempt 3 at T=1090
Cancelling at T=1291
3
End at T=1292
複製代碼
有趣的是,若是在 main 函數中同時調用兩個函數的話,Cancelling at T=4527 是在調用 coroutineWay() 方法時打印出來的:儘管最後根本沒有時間消耗,取消事件自身就浪費在沒法中止的計算問題上,也所以在取消已經完成的任務上增長了額外消耗。
另外一方面,RxJava 至少及時地取消和重試了函數。然而,實際上也有幾乎不必的 Cancelling at T=1291 被打印出來了。吶,沒辦法,寫出來就這樣了,或者說我懶吧,在 Single.timeout 中是這樣實現的:若是沒有延時就完成了的話,不管操做符真實狀況如何,內部的 CompositeDisposable 代理了上游的 Disposable 並將其和操做符一塊兒取消了。
最後呢,咱們經過一個小小的改進來看一下響應式設計的強大之處:若是隻須要重試沒有響應的函數的話,爲何咱們要重試整個過程呢?改進方法也能夠很容易地在 RxJava 中找到:將 doOnDispose().timeout().retry() 放到每個函數調用鏈中(也許用 transfomer 能夠避免代碼的重複):
val timeoutRetry = SingleTransformer<Int, Int> {
it.doOnDispose({
println(" Cancelling at T=" +
(System.currentTimeMillis() - t0))
})
.timeout(500, TimeUnit.MILLISECONDS)
.retry({ x, e ->
println(" Crash at " +
(System.currentTimeMillis() - t0))
x < 3 && e is TimeoutException
})
}
// ...
Single.zip(
Single.fromCallable({ f3(c) })
.subscribeOn(Schedulers.io())
.compose(timeoutRetry)
,
Single.fromCallable({ f4(c) })
.subscribeOn(Schedulers.io())
.compose(timeoutRetry)
,
BiFunction<Int, Int> { a, b -> a + b }
)
// ...
複製代碼
歡迎讀者親自動手實踐並更新協程的實現來實現相同行爲(順即可以試試各類其餘形式的取消機制)。 響應式編程的好處之一是大多數狀況下都沒必要去理會諸如線程、取消信息的傳遞和操做符的結構等惱人的東西。RxJava 之類的庫已經設計好了 API 並將這些底層的大麻煩封裝起來了,一般狀況下,程序員只須要使用便可。
那麼,協程到底有沒有用呢?固然有用啦,但總的來講,我仍是以爲性能對其是極大的限制,同時,我也想知道協程能夠怎麼作才能總體取代響應式編程。
掘金翻譯計劃 是一個翻譯優質互聯網技術文章的社區,文章來源爲 掘金 上的英文分享文章。內容覆蓋 Android、iOS、React、前端、後端、產品、設計 等領域,想要查看更多優質譯文請持續關注 掘金翻譯計劃、官方微博、知乎專欄。