[譯]RxJava 的全面介紹:Observable 類型、背壓、錯誤處理

原文:www.ericdecanini.com/2019/01/07/… 做者:www.ericdecanini.com/about/java

前言

RxJava 是一個不斷更新的工具庫,適用於除 Android 之外的許多平臺的開發人員(如:RxSwift)。RxJava 最大的優點是 以不使用回調的方式處理異步操做android

相反,ObservablesObservers 結合使用來發射數據(一次或屢次),而且還能夠經過各自包含的方法來處理每次數據發射時要作的事情。數據庫

什麼是 Observable 和 Observer

val myObservable = Observable.just(1, 2, 3)
 
val myObserver = myObservable.subscribe { receivedNumber ->
    Log.d(LOG_TAG, "Received Number $receivedNumber")
}
複製代碼

Observable – 發射數據流並與接受改數據的 Observer 一塊兒工做的對象。 Observer – 訂閱一個 Observable 讓它開始發射數據,並處理接受數據時要作的事情。緩存

在上面的例子中,Observable.just(1, 2, 3) 將按照順序發射整數 1, 2, 3。 Observer 一旦訂閱 Observable,將以相同的順序接受這些數字。服務器

Received Number 1
Received Number 2
Received Number 3
複製代碼

Observable 的生命週期

Observable 有兩個重要的方法,來處理傳入的數據。機器學習

  • onNext - 每當發射新數據時調用,正如你在上面示例中的 lambda 函數看到的同樣(在 subscribe 以後調用)。異步

  • onComplete - 在沒有更多數據發射時調用。顧名思義,數據流徹底發射完畢。函數

不一樣類型的 Observable

最基本的 Observable 會發射連續的數據流,直到調用 onComplete 爲止,這並不老是你想要的。你可能想發射 單個值,或者發射一個 沒法接受該值的值,亦或是 在執行沒有返回值的異步任務後調用其餘函數工具

val mySingle = Single.just(1)
val singleObserver = mySingle.subscribe { data ->
    Log.d(LOG_TAG, "Received $data")
}
 
val myMaybe = Maybe.empty<Int>()
val maybeObserver = myMaybe
        .defaultIfEmpty(1)
        .subscribe { data ->
            Log.d(LOG_TAG, "Received $data")
        }
 
val myCompletable = Completable.complete()
val completableObserver = myCompletable
        .subscribe {
            Log.d(LOG_TAG, "Task completed")
        }
複製代碼

Single 僅發射一個值。onNext 只調用一次,而且onComplete 將當即被調用。性能

Mayble 發射一個或零個值,當發射零個值時,將跳過 onNext 並當即調用 onComplete。可使用 defalutIfEmpyty 函數發射默認值。

Completable 不發射任何值,你能夠像沒有返回值的回調同樣訂閱它。

Flowables 和背壓

還有一種類型的 Observable,它就是 Flowable。和 Observable 同樣,Flowable 也發射連續的數據流,直到完成爲止。但有一個關鍵的區別:

想象一下,上游 Observable 數據發射的速度,大於下游 Observer 處理數據的速度,這就是 背壓。在大部分狀況下,將會致使錯誤發生。Flowable 是一種包含背壓策略的 Observable,具備當背壓發生時,如何處理數據的能力。

val myFlowable = Observable.range(1, 100000).toFlowable(BackpressureStrategy.DROP)
val flowableObserver = myFlowable.subscribe {data ->
    Log.d(LOG_TAG, "Received $data")
}
複製代碼

有 5 種不一樣的背壓策略,咱們須要瞭解下:

  • Buffer - 在內存中緩存事件,直到 Observer 能夠處理它們。默認狀況下,在錯誤發生以前,緩衝區的大小爲 128 個 items。能夠修改緩衝區的大小,但請注意,這將會帶來性能上的開銷。
  • Drop - 丟棄 Observer沒法處理的事件。
  • Latest - 僅保留最新發射的值,直到 Observer 可使用它並丟棄其它的值。
  • Error - 若是發生背壓,將拋出異常。
  • Missing - 缺少背壓策咯,若是你想在客戶端處理背壓,你可使用它(由於背壓策咯是由 Observable 建立的)。未能說出策咯會在背壓下拋出異常。
Observable vs Flowable

已知的是,當你明確知道發射的數據,不會致使發生背壓。你應當使用 Observable,而不是 Flowable。老實說,我尚未發現使用 Flowable 的場景,也許 Flowables 將使用額外的內存?

錯誤處理

沒有任何代碼能夠避免錯誤,你已經知道,若是不處理背壓將致使錯誤發生。最重要的是,在 Observersubscribe 方法中,由本身的代碼發生的異常,都將被視爲由Observer 應處理的錯誤。

高興的是,RxJava 包含了幾種處理這些錯誤的方法。

  • doOnError - 當錯誤發生,只需執行該方法
val observer = myObservable
    .doOnError { Log.e(LOG_TAG, "ErrorOccurred") }
    .subscribe()
複製代碼
  • onErrorReturnItem - 若是發生錯誤,返回一個默認值
val observer = myObservable
    .onErrorReturnItem(0)
    .subscribe()
    }
複製代碼
  • onErrorReturn - 和 onErrorReturnItem 同樣,但接受一個返回所需數據類型的函數(對於動態默認值)
val observer = myObservable
        .onErrorReturn{ throwable -> throwable.message}
        .subscribe()
複製代碼
  • onErrorResumeNext - 若是發生錯誤,則返回一個默認數據流,也能夠爲動態數據採起功能。
val observer = myObservable
        .onErrorResumeNext(Observable.just(2, 4, 6))
        .subscribe()
複製代碼
  • retry - 若是發生錯誤,嘗試從新訂閱 Observable,你能夠設置最多的重試次數,或者設置空值以便無限次重試。
val observer = myObservable
        .retry(3)
        .subscribe()
複製代碼

你也能夠經過布爾值,來實現 重試條件

val observer = myObservable
        .retry{ integer, throwable -> integer > 0 }
        .subscribe()
複製代碼

若是沒有使用上述的操做符,在錯誤發生時,將致使程序崩潰。

結語

到目前爲止,此 博客 的大部份內容都是關於 Firebase的。有充分的理由相信,Google 經過強大的 雲數據庫機器學習雲服務,將推進 Firebase 成爲最優秀的雲解決方案之一,並也爲服務器開發打開了新的世界。

雖然我會回到 Firebase 上,但從如今開始,我將潛心學習 RxJava 並深刻了解它的細節。由於它還有更多的閃光點待我去學習:修復回調地獄、提升行能、線程調度.....

相關文章
相關標籤/搜索