[譯] 管中窺豹:RxJava 與 Kotlin 協程的對比

引言

Kotlin 的協程是否讓 RxJava 和 響應式編程光輝再也不 了呢?答案取決於你詢問的對象。狂信徒和營銷者們會堅決果斷地是是是。若是真是這樣的話,開發者們早晚會將 Rx 代碼用協程重寫一遍,抑或從一開始就用協程來寫。 由於 協程 目前仍是實驗性的,因此目前的諸如性能瓶頸之類的不足,都將逐漸解決。所以,相對於原生性能,本文的重點更在於易用性方面。html

方案設計

假設有兩個函數,f1f2,用來模仿不可信的服務,兩者都會在一段延遲以後返回一個數。調用這兩個函數,將其返回值求和並呈現給用戶。然而若是 500ms 以內沒有返回的話,就再也不期望它會返回值了,所以咱們會在有限次數內取消並重試,直到超過次數最終放棄請求。前端

協程的方式

協程用起來就像是傳統的 基於 ExecutorServiceFuture 的工具套裝, 不一樣點在於協程的底層是用的掛起、狀態機和任務調度來代替線程阻塞的。java

首先,寫兩個函數來實現延遲操做:react

suspend fun f1(i: Int) {
    Thread.sleep(if (i != 2) 2000L else 200L)
    return 1;
}

suspend fun f2(i: Int) {
    Thread.sleep(if (i != 2) 2000L else 200L)
    return 2;
}
複製代碼

與協程調度有關的函數須要加上 suspend 關鍵字並經過協程上下文來調用。爲了演示上面的目的,若是傳入參數不是 2 的時候,函數會延遲 2s。這樣就會讓超時檢測將其結束掉,並在第三次嘗試時在規定時間內成功。android

由於異步總會在結束時離開主線程,咱們須要一個方法來在業務邏輯完成前阻塞它,以防止直接退出 JVM。爲了達到目的,可使用 runBlocking 在主線程中調用函數。ios

fun main(arg: Array<string>) = runBlocking <unit>{

     coroutineWay()

     reactiveWay()
}

suspend func coroutineWay() {
    // TODO implement
}

func reactiveWay() {
    // TODO implement
}</unit> </string>
複製代碼

相比 RxJava 的函數式,用協程寫出來的代碼邏輯更簡潔,並且代碼看起來就像是線性和同步的同樣。git

suspend fun coroutineWay() {
    val t0 = System.currentTimeMillis()

    var i = 0;
    while (true) {                                       // (1)
        println("Attempt " + (i + 1) + " at T=" +
            (System.currentTimeMillis() - t0))

        var v1 = async(CommonPool) { f1(i) }             // (2)
        var v2 = async(CommonPool) { f2(i) }

        var v3 = launch(CommonPool) {                    // (3)
            Thread.sleep(500)
            println(" Cancelling at T=" +
                (System.currentTimeMillis() - t0))
            val te = TimeoutException();
            v1.cancel(te);                               // (4)
            v2.cancel(te);
        }

        try {
            val r1 = v1.await();                         // (5)
            val r2 = v2.await();
            v3.cancel();                                 // (6)
            println(r1 + r2)
            break;                                       
        } catch (ex: TimeoutException) {                 // (7)
            println(" Crash at T=" +
                (System.currentTimeMillis() - t0))
            if (++i > 2) {                               // (8)
                throw ex;
            }
        }
    }
    println("End at T=" 
        + (System.currentTimeMillis() - t0))             // (9)

}
複製代碼

添加的一些輸出是用來觀察這段代碼如何運行的。程序員

  1. 一般線性編程的狀況下,是沒有直接重試某個操做的快捷方法的,所以,咱們須要創建一個循環以及重試計數器 i
  2. 經過 async(CommonPool) 來執行異步操做,該函數能夠在一些後臺線程當即啓動並執行函數。該函數會返回一個 Deferred,稍後會用到這個值。 若是用 await() 來獲得 v1 做爲最終值的話,當前線程將會掛起,另外,對 v2 的計算也不會開始,除非前一個恢復執行。除此之外,咱們還須要在超時的狀況下取消當前操做的方法。參考步驟 3 和 5。
  3. 若是想讓兩個操做都超時的話,看起來咱們只能在另外一個異步線程中執行等待操做。launch(CommonPool) 方法會返回一個能夠用在這種狀況下的 Job 對象。 與 async 的區別是,這樣執行沒法返回值。之因此保存返回的 Job 是由於先前的異步操做可能及時返回,就再也不須要取消操做了。
  4. 在超時的任務中,咱們用 TimeoutException 來取消 v1v2 ,這將恢復任何已經掛起來等待兩者返回的操做。
  5. 等待兩個函數運行結果。若是超時,await 將從新扔出在第四步中使用的異常。
  6. 若是沒有異常,則取消再也不須要執行的超時任務,並跳出循環。
  7. 若是有超時,則走老一套捕獲異常並執行狀態檢查來肯定下一步操做。注意任何其餘異常都會直接被拋出並退出循環。
  8. 萬一是第三次或更屢次的嘗試,直接扔出異常,什麼都不作。
  9. 若是一切按劇本走,打印運行的總時間,而後退出當前函數。

看起來挺簡單的,儘管取消機制可能搞個大新聞:若是 v2 由於其餘異常(好比網絡緣由致使的 IOException)崩潰了呢?固然咱們得處理這些狀況來確保任務能夠在各類狀況下被取消(舉個栗子,試試 Kotlin 中的資源?)。然而,這種狀況發生的背景是 v1 會及時返回,直到嘗試 await 以前都沒法取消 v1 或檢測 v2 的崩潰。github

不要在乎那些細節,反正程序跑起來了,運行結果以下:編程

Attempt 1 at T=0
    Cancelling at T=531
         Crash at T=2017
Attempt 2 at T=2017
    Cancelling at T=2517
         Crash at T=4026
Attempt 3 at T=4026
3
End a
複製代碼

一共進行了 3 次嘗試,最後一次成功了,值是 3。是否是和劇本如出一轍的?一點都不快(此處有雙關(譯者並無看出來哪裏有雙關))! 咱們能夠看到取消事件發生的大概時間,兩次不成功的請求以後大約 500 ms ,然而異常捕獲發生在大約 2000 ms 以後!咱們知道 cancel() 被成功調用是由於咱們捕獲了異常。然而,看起來函數中的 Thread.sleep() 並無被打斷,或者用協程的說法,沒有在打斷異常時恢復。這多是 CommonPool 的一部分,對 Future.cancel(false) 的調用處於基礎結構中,抑或只是簡單的程序限制。

響應式

接下來咱們看看 RxJava 2 是如何實現相同操做的。讓人失望的是,若是函數前加了 suspended,就沒法經過普通方式調用了,因此咱們還得用普通方法重寫一下兩個函數:

fun f3(i: Int) : Int {
    Thread.sleep(if (i != 2) 2000L else 200L)
    return 1
}

fun f4(i: Int) : Int {
    Thread.sleep(if (i != 2) 2000L else 200L)
    return 2
}
複製代碼

爲了匹配阻塞外部環境的功能,咱們採用  RxJava 2 Extensions 中的 BlockingScheduler 來提供返回到主線程的功能。顧名思義,它阻塞了一開始的調用者/主線程,直到有任務經過調度器來提交併運行。

fun reactiveWay() {
    RxJavaPlugins.setErrorHandler({ })                         // (1)

    val sched = BlockingScheduler()                            // (2)
    sched.execute {
        val t0 = System.currentTimeMillis()
        val count = Array<Int>(1, { 0 })                       // (3)

        Single.defer({                                         // (4)
            val c = count[0]++;
            println("Attempt " + (c + 1) +
                " at T=" + (System.currentTimeMillis() - t0))

            Single.zip(                                        // (5)
                    Single.fromCallable({ f3(c) })
                        .subscribeOn(Schedulers.io()),
                    Single.fromCallable({ f4(c) })
                        .subscribeOn(Schedulers.io()),
                    BiFunction<Int, Int> { a, b -> a + b }               // (6)
            )
        })
        .doOnDispose({                                         // (7)
            println(" Cancelling at T=" + 
                (System.currentTimeMillis() - t0))
        })
        .timeout(500, TimeUnit.MILLISECONDS)                   // (8)
        .retry({ x, e ->
            println(" Crash at " + 
                (System.currentTimeMillis() - t0))
            x < 3 && e is TimeoutException                     // (9)
        })
        .doAfterTerminate { sched.shutdown() }                 // (10)
        .subscribe({
            println(it)
            println("End at T=" + 
                (System.currentTimeMillis() - t0))             // (11)
        },
        { it.printStackTrace() })
    }
}
複製代碼

實現起來有點長,對那些不熟悉 lambda 的人來講看起來可能有點可怕。

  1. 衆所周知 RxJava 2 不管如何都會傳遞異常。在 Android 上,沒法傳遞的異常會使應用崩潰,除非使用 RxJavaPlugins.setErrorHandler 來捕獲。在此,由於咱們知道取消事件會打斷 Thread.sleep() ,調用棧打出來的結果只會是一團亂麻,咱們也不會去注意這麼多的異常。
  2. 設置 BlockingScheduler 並分發第一個執行的任務,以及剩下的主線程執行邏輯。 這是因爲一旦鎖住, start() 將會給主線程增長一個活鎖狀態,直到有任何隨後事件打破鎖定,主線程纔會繼續執行。
  3. 設置一個堆變量來記錄重試次數。
  4. 一旦有經過 Single.defer 的訂閱,計數器加一併打印 「Attempt」 字符串。該操做符容許保留每一個訂閱的狀態,這正是咱們在下游執行的 retry() 操做符所指望的。
  5. 使用 zip 操做符來異步執行兩個元素的計算,兩者都在後臺線程執行本身的函數。
  6. 當兩者都完成時,將結果相加。
  7. 爲了讓超時取消,使用 doOnDispose 操做符來打印當前狀態和時間。
  8. 使用 timeout 操做符定義求和的超時。若是超時則會發送 TimeoutException(例如該場景下沒有反饋時)。
  9. retry 操做符的重載提供了重試時間以及當前錯誤。打印錯誤後,應該返回 true ——也就是說必須執行重試——若是重試次數小於三而且當前錯誤是 TimeoutException 的話。任何其餘錯誤只會終止而不是觸發重試。
  10. 一旦完成,咱們須要關閉調度器,來讓釋放主線程並退出JVM。
  11. 固然,在完成前咱們須要打印求和結果以及整個操做的耗時。

可能有人說,這比協程的實現複雜多了。不過……至少跑起來了:

Cancelling at T=4527

Attempt 1 at T=72
    Cancelling at T=587
         Crash at 587
Attempt 2 at T=587
    Cancelling at T=1089
         Crash at 1090
Attempt 3 at T=1090
    Cancelling at T=1291
3
End at T=1292
複製代碼

有趣的是,若是在 main 函數中同時調用兩個函數的話,Cancelling at T=4527 是在調用 coroutineWay() 方法時打印出來的:儘管最後根本沒有時間消耗,取消事件自身就浪費在沒法中止的計算問題上,也所以在取消已經完成的任務上增長了額外消耗。

另外一方面,RxJava 至少及時地取消和重試了函數。然而,實際上也有幾乎不必的 Cancelling at T=1291 被打印出來了。吶,沒辦法,寫出來就這樣了,或者說我懶吧,在 Single.timeout 中是這樣實現的:若是沒有延時就完成了的話,不管操做符真實狀況如何,內部的 CompositeDisposable 代理了上游的 Disposable 並將其和操做符一塊兒取消了。

結論

最後呢,咱們經過一個小小的改進來看一下響應式設計的強大之處:若是隻須要重試沒有響應的函數的話,爲何咱們要重試整個過程呢?改進方法也能夠很容易地在 RxJava 中找到:將 doOnDispose().timeout().retry() 放到每個函數調用鏈中(也許用 transfomer 能夠避免代碼的重複):

val timeoutRetry = SingleTransformer<Int, Int> { 
    it.doOnDispose({
        println(" Cancelling at T=" + 
            (System.currentTimeMillis() - t0))
    })
    .timeout(500, TimeUnit.MILLISECONDS)
    .retry({ x, e ->
        println(" Crash at " + 
            (System.currentTimeMillis() - t0))
        x < 3 && e is TimeoutException
    })
}

// ...

Single.zip(
    Single.fromCallable({ f3(c) })
        .subscribeOn(Schedulers.io())
        .compose(timeoutRetry)
    ,
    Single.fromCallable({ f4(c) })
        .subscribeOn(Schedulers.io())
        .compose(timeoutRetry)
    ,
    BiFunction<Int, Int> { a, b -> a + b }
)
// ...
複製代碼

歡迎讀者親自動手實踐並更新協程的實現來實現相同行爲(順即可以試試各類其餘形式的取消機制)。 響應式編程的好處之一是大多數狀況下都沒必要去理會諸如線程、取消信息的傳遞和操做符的結構等惱人的東西。RxJava 之類的庫已經設計好了 API 並將這些底層的大麻煩封裝起來了,一般狀況下,程序員只須要使用便可。

那麼,協程到底有沒有用呢?固然有用啦,但總的來講,我仍是以爲性能對其是極大的限制,同時,我也想知道協程能夠怎麼作才能總體取代響應式編程。


掘金翻譯計劃 是一個翻譯優質互聯網技術文章的社區,文章來源爲 掘金 上的英文分享文章。內容覆蓋 AndroidiOSReact前端後端產品設計 等領域,想要查看更多優質譯文請持續關注 掘金翻譯計劃官方微博知乎專欄

相關文章
相關標籤/搜索