RxJava經常使用操做符官方文檔翻譯及Kotlin示例(1)

Rxjava2 可謂是平常開發中的利器,特別是在異步任務中更能發揮做用。響應式編程以及流式api的良好支持,給予了更好的編碼體驗。愈來愈多開發者漸漸用起來了。學習rxjava2最好的地方無外乎官方文檔,詳細且完整。如下結合官方文檔和我本身的理解以及例子,解釋各個操做符的用法,給各位以及我本身做一篇參考。java

怎麼用Rxjava2

要使用RxJava,須要先建立Observables(發出數據項),以各類方式轉換這些Observable以獲取所須要的精確數據項(經過使用Observable運算符),而後觀察並響應這些須要的項目序列(經過實現觀察者) 或者訂閱者,而後將它們訂閱到最終的變換後的Observables)。react

Creating Observables 建立操做符

just

經過獲取預先存在的對象並在訂閱時將該特定對象發佈給下游使用者來構造反應類型。爲方便起見,存在2到9個參數的重載,這些對象(具備相同的常見類型)將按指定的順序發出。就像From相似,但請注意From將傳入一個數組或一個iterable或相似的東西來取出要發出的項目,而Just只是簡單地發出數組或者迭代器。git

請注意,若是將null傳遞給Just,它將返回一個Observable,它將null做爲項發出。不要錯誤地假設這將返回一個空的Observable(一個根本不發出任何項目)。爲此,須要使用Empty運算符。 github

just

fun testOpJust() {
        val arr = arrayOf("mary", "tom", "ben", "lisa", "ken")
        Observable.fromArray(arr).filter { it.size > 3 }.map { it + "s" }.subscribe(System.out::println)

        val list = arrayListOf("mary", "tom", "ben", "lisa", "ken")
        Observable.just(list).forEach { it -> System.out.println(it + "s") }

        list.stream().filter { it -> it.length > 3 }.map { "$it s" }.forEach(System.out::println)
    }
複製代碼

from

根據預先存在的源或生成器類型構造序列。當使用Observable時,若是使用的全部數據均可以表示爲Observables,而不是Observables和其餘類型的混合,則能夠更方便。這容許使用一組運算符來控制數據流的整個生命週期。例如,Iterables能夠被認爲是一種的Observable;做爲一種始終只發出單一項目的Observable。經過將這些對象顯式轉換爲Observable,能夠將它們做爲對等體與其餘Observable進行交互。所以,大多數ReactiveX實現都具備容許將特定於語言的對象和數據結構轉換爲Observable的方法。編程

注意:這些靜態方法使用後綴命名約定(即,在方法名稱中重複參數類型)以免重載解析模糊。api

from

fromIterable

從java.lang.Iterable源(例如Lists,Sets或Collections或custom Iterables)發出信號,而後完成序列。數組

可用於 Flowable ,Observablebash

fromArray

發信號通知給定數組的元素,而後完成序列。 可用於Flowable,Observable數據結構

注意:RxJava不支持原始數組,只支持(通用)引用數組。併發

fun testOpFrom(){
        val list = arrayListOf<Int>(1,2,3,4,5,6)
        Observable.fromIterable(list).subscribe(System.out::println)

        Observable.fromArray(1,2,3,4,5,6).subscribe(System.out::println)

    }
複製代碼
fromCallable

當消費者訂閱時,調用給定的java.util.concurrent.Callable並將其返回值(或拋出的異常)轉發給該使用者。

可用於:Observable,Flowable,Maybe,Single,Completable

備註:在Completable中,忽略實際返回值,而且Completable完成。

Observable.fromCallable<String> {
            "hello"
        }.subscribe(System.out::println)

        Completable.fromCallable{
            "complatable from callable"
        }.subscribe {
            System.out.println("complete")
        }
複製代碼

fromAction

當消費者訂閱時,調用給定的io.reactivex.function.Action而且消費者完成或接收Action拋出的異常。

可用於: Maybe,Completable

Maybe.fromAction<String>{
            System.out.println("maybe from action")
        }.subscribe(System.out::println)
複製代碼

如下標星先很少作解釋,用得很少

*fromRunnable

*fromFuture

*from{reactive type}

將另外一種反應類型包裹或轉換爲目標反應類型。具備如下簽名模式的各類反應類型中提供如下組合:targetType.from {sourceType}()

*注意:並不是全部可能的轉換都是經過from {reactive type}方法系列實現的。查看to {reactive type}方法系列以得到進一步的轉換可能性。

注意:fromAction和fromRunnable之間的區別在於Action接口容許拋出已受檢的異常,而java.lang.Runnable則否則。

error

可用於Observable,Flowable,Maybe,Single,Completable

經過java.util.concurrent.Callable向消費者發出預先存在或生成的錯誤信號。

fun testOpError(){
        Observable.error<Throwable>(IOException(""))
                .subscribe({
                    System.out.print("不會打印吧")
                },{
                    it.printStackTrace()
                },{
                    System.out.println("也不會打印")
                })
    }
複製代碼

一個典型的用例是使用onErrorResumeNext有條件地映射或抑制鏈中的異常:

/**
     * 抑制鏈上發生的異常
     */
    @Test
    fun testOpOnErrorResumeNext() {
        val observable = Observable.fromCallable {
            if (Math.random() < 0.5f) {
                throw IllegalArgumentException()
            }
            throw IOException()
        }

        observable.onErrorResumeNext(Function {
            if (it is IllegalArgumentException) {
                Observable.empty()
            } else {
                Observable.error(it)
            }
        }).subscribe({
            System.out.println("nothing")
        },{
            it.printStackTrace()
        },{
            System.out.println("empty")
        })
    }
複製代碼

這個onErrorResumeNext 厲害了,能夠說以前一直不太明白怎麼很好的處理。經過此操做符能夠抑制錯誤的傳遞,原本若是subscribe發生了錯誤會觸發onError回調。事實上可能發生了錯誤,須要不處理或者抑制產生。在onErrorResumeNext的function參數中,能夠根據錯誤類型返回處理流程。

  • empty 這種類型的源在訂閱後當即表示完成。 可用於Observable,Flowable,Maybe,Single,Completable

示例可見onErrorResumeNext的例子

empty

empty發送直接表示完成,就是訂閱者直接調用onComplete回調。onNext 不會執行

  • never 這種類型的源不會發出任何onNext,onSuccess,onError或onComplete的信號。這種類型的反應源可用於測試或「禁用」組合子操做符中的某些源。

可用於Observable,Flowable,Maybe,Single,Completable

不會對訂閱者的任何回調進行調用。禁用也可理解,好比發送了錯誤,都不往下執行

  • interval 按期生成無限的,不斷增長的數字(Long類型)。intervalRange變體生成有限數量的此類數字。

可用於Observable,Flowable

interval

fun testOpInterval(){
        Observable.interval(1,TimeUnit.SECONDS)
                .onErrorResumeNext(Function { 
                    Observable.error(it)
                })
                .subscribe({
                    if (it.rem(5) == 0L) {
                        System.out.println("tick")
                    } else {
                        System.out.println("tock")
                    }
                },{
                    it.printStackTrace()
                },{
                    System.out.println("interval complete")
                })
    }
複製代碼
  • Timer運算符建立一個Observable,在指定的一段時間後發出一個特定項。
    Timer

也就是說在給定的時間以後發送事件

  • range 爲每一個消費者生成一系列值。range()方法生成Integers,rangeLong()生成Longs。Range運算符按順序發出一系列順序整數,您能夠在其中選擇範圍的起點及其長度。

可用於 Observable,Flowable

range

fun testOpRange(){
        val s = "test range operation now"
        Observable.range(0,s.length- 3)
                .map { "${s[it]} in range"}
                .subscribe {
                    System.out.println(it)
                }
    }
複製代碼

發出一系列值,參數爲起點,和長度。

  • generate 建立一個冷,同步和有狀態的值生成器。

可用於Observable,Flowable

create

@Test
    fun testOpGenerate(){
        val start = 1
        val increaseValue = 2
        Observable.generate<Int,Int>(Callable<Int> {
            start
        }, BiFunction<Int, Emitter<Int>,Int> {
            t1, t2 ->
            t2.onNext(t1 + increaseValue)
            t1 + increaseValue
        }).subscribe {
            System.out.println("generate value : $it")
        }
    }
複製代碼

不太明白乾啥的,具體應用場景。只是一直不間斷的產生值

Filtering Observables 過濾Observable

過濾操做是很是經常使用且重要的,並且相關的操做符也不少

Debounce

可用於Observable,Flowable

刪除響應源發出的項目,在給定的超時值到期以前,這些項目後面跟着更新的項目。計時器重置每次發射。此運算符會跟蹤最近發出的項目,而且僅在有足夠的時間過去而沒有源發出任何其餘項目時纔會發出此項目。

按照我得理解就是debounde傳入了超時值,在該時間以內若是屢次發射,取離超時值最近得值。既然又超時那麼也應該又開始時間,開始時間就是一組發射最開始值得時間,這一組發射得值的時的差是在debounce超時時間以內。

// Diagram:
// -A--------------B----C-D-------------------E-|---->
//  a---------1s
//                 b---------1s
//                      c---------1s
//                        d---------1s
//                                            e-|---->
// -----------A---------------------D-----------E-|-->

   fun testOpDebounce(){
        Observable.create<String>{
            it.onNext("A")
            Thread.sleep(1_500)
            it.onNext("B")
            Thread.sleep(500)
            it.onNext("C")
            Thread.sleep(250)
            it.onNext("D")
            Thread.sleep(2_000)
            it.onNext("E")
        }.debounce(1,TimeUnit.SECONDS)
                .subscribe(System.out::println)
    }
複製代碼

distinct

可用於Observable Flowable 經過僅發出與先前項目相比不一樣的項目來過濾反應源。能夠指定io.reactivex.functions.Function,將源發出的每一個項目映射到一個新值中,該值將用於與先前的映射值進行比較。Distinct運算符經過僅容許還沒有發出的項目來過濾Observable。在一些實現中,存在容許調整兩個項被視爲「不一樣」的標準的變體。在一些實施例中,存在操做符的變體,其僅將項目與其前一個項目進行比較以得到更精確的比較,從而僅過濾連續的重複項目,序列中的項目。

fun testOpDistinct(){
        Observable.fromArray(1,2,3,3,4,5)
                .distinct()
                .subscribe(System.out::println)

        // 用來過濾序列中一組值先後是否相同得值
        Observable.fromArray(1,1,2,3,2)
                .distinct { "呵呵" }
                .subscribe(System.out::println)
    }
複製代碼

重載的方法,傳入keySelectro ,做用是對每一個元素應用方法獲得得新得值,再決定怎麼去重

distinctUntilChanged

可用於Observable Flowable
經過僅發出與其前一個元素相比較不一樣的項目來過濾反應源。能夠指定io.reactivex.functions.Function,將源發出的每一個項目映射到一個新值中,該值將用於與先前的映射值進行比較。或者,能夠指定io.reactivex.functions.BiPredicate做爲比較器函數來比較前一個。

Observable.fromArray(1,2,3,3,4,5)
//                .distinctUntilChanged()
                .distinctUntilChanged { t1, t2 ->
                    t1 == t2
                }
                .subscribe(System.out::println)
複製代碼

能夠說是distinct的增強版,多了一個能夠傳入比較器的重載方法

elementAt

課用於Flowable,Observable 在來自反應源的一系列發射的數據項中,以指定的從零開始的索引起出單個項目。若是指定的索引不在序列中,則能夠指定將發出的默認項。

簡單說就是按照發出項的次序獲取指定的位置的元素

Observable.fromArray(1,2,3,3,4,5)
                .elementAt(2)
                .subscribe(System.out::println)
複製代碼

elementAtOrError

filter

可用於Observable,Flowable,Maybe,Single 經過僅發出知足指定函數的項來過濾由反應源發出的項。

過濾偶數
 Observable.fromArray(1,2,3,3,4,5)
                .filter {
                    it.rem(2) == 0
                }
                .subscribe(System.out::println)}
複製代碼

first

可用於Flowable,Observable 僅發出反應源發出的第一個項目,或者若是源完成而不發出項目則發出給定的默認項目。這與firstElement的不一樣之處在於此運算符返回Single,而firstElement返回Maybe。

Observable.fromArray(1,2,3,3,4,5)
                .first(-1)
                .subscribe(Consumer<Int> {
                    System.out.println("onNext :$it")
                })
                
                      Observable.fromArray(1,2,3,3,4,5)
                .firstElement()
                .subscribe {
                    System.out.println("onNext :$it")
                }
複製代碼

firstOrError

僅發出響應源發出的第一個項目,或者若是源完成而不發出項目則發出java.util.NoSuchElementException信號。

ignoreElement

可用於Maybe Single 忽略Single或Maybe源發出的單個項目,並返回一個Completable,它僅從源中發出錯誤或完成事件的信號。

ignoreElement

Maybe.timer(1L,TimeUnit.SECONDS)
                .ignoreElement()
                .doOnComplete {
                    System.out.println("done")
                }
                .blockingAwait()
複製代碼

ignoreElements

忽略Single或Maybe源發出的單個項目,並返回一個Completable,它僅從源中發出錯誤或完成事件的信號。

Observable.timer(1L,TimeUnit.SECONDS)
                .ignoreElements()
                .doOnComplete {
                    System.out.println("completed")
                }
                .blockingAwait()
複製代碼

last

可用於Observable,Flowable

僅發出反應源發出的最後一個項目,或者若是源完成而不發出項目則發出給定的默認項目。這與lastElement的不一樣之處在於此運算符返回Single,而lastElement返回Maybe。

Observable.fromArray(1,2,3,3,4,5)
                .last(-1)
                .subscribe(Consumer<Int>{
                    System.out.println("last $it")
                })
複製代碼

lastElement

Observable.fromArray(1,2,3,3,4,5)
                .lastElement()
                .subscribe(Consumer<Int>{
                    System.out.println("last $it")
                })
複製代碼

lastOnError

僅發出響應源發出的最後一項,或者若是源完成而不發出項,則發出java.util.NoSuchElementException信號。

ofType

可用於Flowable,Observable,Maybe 經過僅發出指定類型的項目來過濾反應源發出的項目。

Observable.fromArray(1,2.1f,3,3,4,5)
                .ofType(Int::class.java)
                .subscribe(Consumer<Int>{
                    System.out.println("last $it")
                })
複製代碼

sample

可用於Observable Flowable 經過僅在週期性時間間隔內發出最近發出的項目來過濾反應源發出的項目。

Observable.create<String> {
            it.onNext("A")
            Thread.sleep(1_000)

            it.onNext("B")
            Thread.sleep(300)

            it.onNext("C")
            Thread.sleep(700)

            it.onNext("D")
            it.onComplete()
        }.sample(1,TimeUnit.SECONDS)
                .blockingSubscribe(System.out::println)
複製代碼

skip

刪除響應源發出的前n個項目,併發出剩餘項目。您能夠經過使用Skip運算符修改Observable來忽略Observable發出的前n個項目,並僅參加以後的項目。

Observable.fromArray("hehe",2.1f,3,3,4,5)
//                .ofType(String::class.java)
                .skip(3)
                .subscribe {
                    System.out.println(it)
                }
複製代碼

skipLast

丟棄反應源發出的最後n個項目,併發出剩餘的項目。

take

可用於Flowable Observable 僅發出反應源發出的前n項。

Observable.fromArray("hehe",2.1f,3,3,4,5)
                .take(2)
                .subscribe(System.out::println)
複製代碼

takeLast

可用於Flowable Observable 僅發出反應源發出的最後n個項目。

throttleFirst

可用於Flowable Observable

跟debounce有些類似,是取時間範圍內第一個,在點擊事件過濾很經常使用

在指定持續時間的連續時間窗口期間僅發出由反應源發出的第一個項目。

Observable.create<String> {
            it.onNext("A")
            Thread.sleep(300)

            it.onNext("B")
            Thread.sleep(400)
        }.throttleFirst(1,TimeUnit.SECONDS)
                .subscribe(System.out::println)
複製代碼

throttleLast

可用於Observable,Flowable 在指定持續時間的連續時間期間僅發出由反應源發出的最後一個項目。跟throttleFirst相反,取最後一個值

throttleWithTimeout

跟debounce的別名

public final Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
        return debounce(timeout, unit);
    }
複製代碼

timeout

從Observable或Flowable源發出項目,但若是在從上一項開始的指定超時持續時間內未發出下一項,則以java.util.concurrent.TimeoutException終止。對於Maybe,Single和Completable,指定的超時持續時間指定等待成功或完成事件到達的最長時間。若是Maybe,Single或Completable在給定時間內沒有完成,將發出java.util.concurrent.TimeoutException。

Observable.create<String>{
            it.onNext("A")
            Thread.sleep(600)

            it.onNext("B")
            Thread.sleep(1_500)

            it.onNext("C")
            Thread.sleep(500)
        }.subscribeOn(Schedulers.io())
                .subscribe({
                    System.out.println(it)
                },{
                    it.printStackTrace()
                })
複製代碼

捕獲處理

一下爲Kotlin編寫的代碼,能夠看到在發生錯誤的狀況下,經過onError() 拋出了錯誤,而且須要在訂閱者,第二個參數傳入,處理錯誤的回調。

fun testErrorHandle() {
        Observable.create<String> {
            it.onNext("start")
            Thread {
                try {
                    System.out.println("start open ...")
                    it.onNext("start open ...")
                    val stream = URL("https://www.baidu.com").openStream()
                    System.out.println("after url ...")
                    it.onNext("after url")
                    val br = stream.bufferedReader()
                    if (!it.isDisposed) {
                        var text = br.readText()
                        it.onNext(text)
                    }
                    stream.close()
                    br.close()
                    it.onNext("after open ...")
                    if (!it.isDisposed) {
                        it.onComplete()
                    }
                }catch (e : java.lang.Exception) {
                    System.out.println(e)
                    e.printStackTrace()
                    it.onError(e)
                }
            }.start()
        }.subscribe(System.out::println) {
            it.printStackTrace()
            System.out.println("what the fuck")
        }
    }
複製代碼

Observable一般不會拋出異常。相反,它會經過使用onError通知終止Observable序列來通知任何觀察者發生了不可恢復的錯誤。

這有一些例外。例如,若是onError()調用自己失敗,Observable將不會嘗試經過再次調用onError來通知觀察者,但會拋出RuntimeException,OnErrorFailedException或OnErrorNotImplementedException。

從onError通知中恢復的技術

所以,不是捕獲異常,而是觀察者或操做者應該更一般地響應異常的onError通知。還有各類Observable運算符可用於對來自Observable的onError通知做出反應或從中恢復。例如,可使用運算符:

  1. 吞下錯誤並切換到備份Observable以繼續序列
  2. 吞下錯誤併發出默認項
  3. 吞下錯誤並當即嘗試重啓失敗的Observable
  4. 吞下錯誤並嘗試在一些退避間隔後從新啓動失敗的Observable

可使用錯誤處理運算符中描述的運算符來實現這些策略。

吞下的意思,應該是不處理異常

RxJava特定的異常以及如何處理它們

CompositeException 這代表發生了多個異常。可使用異常的getExceptions()方法來檢索構成組合的各個異常。

MissingBackpressureException 這表示試圖將過多發出數據項應用於它的Observable。有關背壓(github.com/ReactiveX/R…)的Observable的解決方法,請參閱Backpressure。

OnErrorFailedException 這代表Observable試圖調用其觀察者的onError()方法,但該方法自己引起了異常。

OnErrorNotImplementedException 這代表Observable試圖調用其觀察者的onError()方法,可是沒有這樣的方法存在。能夠經過修復Observable以使其再也不達到錯誤條件,經過在觀察者中實現onError處理程序,或經過使用本頁其餘地方描述的其中一個運算符到達觀察者以前截獲onError通知來消除此問題。。

OnErrorThrowable 觀察者將這種類型的throwable傳遞給他們的觀察者的onError()處理程序。此變量的Throwable包含有關錯誤的更多信息以及錯誤發生時系統的Observable特定狀態,而不是標準Throwable。

參考資料

官網文檔

相關文章
相關標籤/搜索