RxJava 系列-1:一篇的比較全面的 RxJava2 方法總結

看了許多講解RxJava的文章,有些文章講解的內容是基於第一個版本的,有些文章的講解是經過比較經常使用的一些API和基礎的概念進行講解的。 可是每次看到RxJava的類中的幾十個方法的時候,老是感受內心沒底。因此,我打算本身去專門寫篇文章來從API的角度系統地梳理一下RxJava的各類方法和用法。java

一、RxJava 基本

1.1 RxJava 簡介

RxJava是一個在Java VM上使用可觀測的序列來組成異步的、基於事件的程序的庫。react

雖然,在Android中,咱們可使用AsyncTask來完成異步任務操做,可是當任務的梳理比較多的時候,咱們要爲每一個任務定義一個AsyncTask就變得很是繁瑣。 RxJava能幫助咱們在實現異步執行的前提下保持代碼的清晰。 它的原理就是建立一個Observable來完成異步任務,組合使用各類不一樣的鏈式操做,來實現各類複雜的操做,最終將任務的執行結果發射給Observer進行處理。 固然,RxJava不只適用於Android,也適用於服務端等各類場景。android

咱們總結如下RxJava的用途:算法

  1. 簡化異步程序的流程;
  2. 使用近似於Java8的流的操做進行編程:由於想要在Android中使用Java8的流編程有諸多的限制,因此咱們可使用RxJava來實現這個目的。

在使用RxJava以前,咱們須要先在本身的項目中添加以下的依賴:編程

compile 'io.reactivex.rxjava2:rxjava:2.2.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.2'
複製代碼

這裏咱們使用的是RxJava2,它與RxJava的第一個版本有些許不一樣。在本文中,咱們全部的關於RxJava的示例都將基於RxJava2.數組

注:若是想了解關於Java8的流編程的內容的內容,能夠參考我以前寫過的文章五分鐘學習Java8的流編程緩存

1.2 概要

下面是RxJava的一個基本的用例,這裏咱們定義了一個Observable,而後在它內部使用emitter發射了一些數據和信息(其實就至關於調用了被觀察對象內部的方法,通知全部的觀察者)。 而後,咱們用Consumer接口的實例做爲subscribe()方法的參數來觀察發射的結果。(這裏的接口的方法都已經被使用Lambda簡化過,應該學着適應它。)網絡

Observable<Integer> observable = Observable.create(emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
});
observable.subscribe(System.out::println);
複製代碼

這樣,咱們就完成了一個基本的RxJava的示例。從上面的例子中,你或許無法看出Observable中隱藏的流的概念,看下面的例子:app

Observable.range(0, 10).map(String::valueOf).forEach(System.out::println);
複製代碼

這裏咱們先用Observable.range()方法產生一個序列,而後用map方法將該整數序列映射成一個字符序列,最後將獲得的序列輸出來。從上面看出,這種操做和Java8裏面的Stream編程很像。可是二者之間是有區別的:異步

  1. 所謂的「推」和「拉」的區別:Stream中是經過從流中讀取數據來實現鏈式操做,而RxJava除了Stream中的功能以外,還能夠經過「發射」數據,來實現通知的功能,即RxJava在Stream之上又多了一個觀察者的功能。
  2. Java8中的Stream能夠經過parall()來實現並行,即基於分治算法將任務分解並計算獲得結果以後將結果合併起來;而RxJava只能經過subscribeOn()方法將全部的操做切換到某個線程中去。
  3. Stream只能被消費一次,可是Observable能夠被屢次進行訂閱;

RxJava除了爲咱們提供了Observable以外,在新的RxJava中還提供了適用於其餘場景的基礎類,它們之間的功能和主要區別以下:

  1. Flowable: 多個流,響應式流和背壓
  2. Observable: 多個流,無背壓
  3. Single: 只有一個元素或者錯誤的流
  4. Completable: 沒有任何元素,只有一個完成和錯誤信號的流
  5. Maybe: 沒有任何元素或者只有一個元素或者只有一個錯誤的流

除了上面的幾個基礎類以外,還有一個Disposable。當咱們監聽某個流的時候,就能獲取到一個Disposable對象。它提供了兩個方法,一個是isDisposed,能夠被用來判斷是否中止了觀察指定的流;另外一個是dispose方法,用來放棄觀察指定的流,咱們可使用它在任意的時刻中止觀察操做。

1.3 總結

上面咱們介紹了了關於RxJava的基本的概念和使用方式,在下面的文章中咱們會按照以上定義的順序從API的角度來說解如下RxJava各個模塊的使用方法。

二、RxJava 的使用

2.1 Observable

從上面的文章中咱們能夠得知,Observable和後面3種操做功能近似,區別在於Flowable加入了背壓的概念,Observable的大部分方法也適用於其餘3個操做和Flowable。 所以,咱們這裏先從Observable開始梳理,而後咱們再專門對Flowable和背壓的進行介紹。

Observable爲咱們提供了一些靜態的構造方法來建立一個Observable對象,還有許多鏈式的方法來完成各類複雜的功能。 這裏咱們按照功能將它的這些方法分紅各個類別並依次進行相關的說明。

2.1.1 建立操做

1.interval & intervalRange

下面的操做能夠每一個3秒的時間發送一個整數,整數從0開始:

Observable.interval(3, TimeUnit.SECONDS).subscribe(System.out::println);
複製代碼

若是想要設置從指定的數字開始也是能夠的,實際上interval提供了許多重載方法供咱們是使用。下面咱們連同與之功能相近的intervalRange方法也一同給出:

  1. public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
  2. public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler)
  3. public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

這裏的initialDelay參數用來指示開始發射第一個整數的以前要停頓的時間,時間的單位與peroid同樣,都是經過unit參數來指定的;period參數用來表示每一個發射之間停頓多少時間;unit表示時間的單位,是TimeUnit類型的;scheduler參數指定數據發射和等待時所在的線程。

intervalRange方法能夠用來將發射的整數序列限制在一個範圍以內,這裏的start用來表示發射的數據的起始值,count表示總共要發射幾個數字,其餘參數與上面的interval方法一致。

2.range & rangeLong

下面的操做能夠產生一個從5開始的連續10個整數構成的序列:

Observable.range(5, 10).subscribe(i -> System.out.println("1: " + i));
複製代碼

該方法須要傳入兩個參數,與之有相同功能的方法還有rangeLong

  1. public static Observable<Integer> range(final int start, final int count)
  2. public static Observable<Long> rangeLong(long start, long count)

這裏的兩個參數start用來指定用於生成的序列的開始值,count用來指示要生成的序列總共包含多少個數字,上面的兩個方法的主要區別在於一個是用來生成int型整數的,一個是用來生成long型整數的。

3.create

create方法用於從頭開始建立一個Observable,像下面顯示的那樣,你須要使用create方法並傳一個發射器做爲參數,在該發射器內部調用onNextonCompleteonError方法就能夠將數據發送給監聽者。

Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
    observableEmitter.onNext(1);
    observableEmitter.onNext(2);
    observableEmitter.onComplete();
}).subscribe(System.out::println);
複製代碼

4.defer

defer直到有觀察者訂閱時才建立Observable,而且爲每一個觀察者建立一個新的Observable。defer操做符會一直等待直到有觀察者訂閱它,而後它使用Observable工廠方法生成一個Observable。好比下面的代碼兩個訂閱輸出的結果是不一致的:

Observable<Long> observable = Observable.defer((Callable<ObservableSource<Long>>) () -> Observable.just(System.currentTimeMillis()));
observable.subscribe(System.out::print);
System.out.println();
observable.subscribe(System.out::print);
複製代碼

下面是該方法的定義,它接受一個Callable對象,能夠在該對象中返回一個Observable的實例:

public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)

5.empty & never & error

  1. public static <T> Observable<T> empty():建立一個不發射任何數據可是正常終止的Observable;
  2. public static <T> Observable<T> never():建立一個不發射數據也不終止的Observable;
  3. public static <T> Observable<T> error(Throwable exception):建立一個不發射數據以一個錯誤終止的Observable,它有幾個重載版本,這裏給出其中的一個。

測試代碼:

Observable.empty().subscribe(i->System.out.print("next"),i->System.out.print("error"),()->System.out.print("complete"));
Observable.never().subscribe(i->System.out.print("next"),i->System.out.print("error"),()->System.out.print("complete"));
Observable.error(new Exception()).subscribe(i->System.out.print("next"),i->System.out.print("error"),()->System.out.print("complete"));
複製代碼

輸出結果:completeerror

6.from 系列

from系列的方法用來從指定的數據源中獲取一個Observable:

  1. public static <T> Observable<T> fromArray(T... items):從數組中獲取;
  2. public static <T> Observable<T> fromCallable(Callable<? extends T> supplier):從Callable中獲取;
  3. public static <T> Observable<T> fromFuture(Future<? extends T> future):從Future中獲取,有多個重載版本,能夠用來指定線程和超時等信息;
  4. public static <T> Observable<T> fromIterable(Iterable<? extends T> source):從Iterable中獲取;
  5. public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher):從Publisher中獲取。

7.just 系列

just系列的方法的一個參數的版本爲下面的形式:public static <T> Observable<T> just(T item),它還有許多個重載的版本,區別在於接受的參數的個數不一樣,最少1個,最多10個。

8.repeat

該方法用來表示指定的序列要發射多少次,下面的方法會將該序列無限次進行發送:

Observable.range(5, 10).repeat().subscribe(i -> System.out.println(i));
複製代碼

repeat方法有如下幾個類似方法:

  1. public final Observable<T> repeat()
  2. public final Observable<T> repeat(long times)
  3. public final Observable<T> repeatUntil(BooleanSupplier stop)
  4. public final Observable<T> repeatWhen(Function<? super Observable<Object>, ? extends ObservableSource<?>> handler)

第1個無參的方法會無限次地發送指定的序列(實際上內部調用了第2個方法並傳入了Long.MAX_VALUE),第2個方法會將指定的序列重複發射指定的次數;第3個方法會在知足指定的要求的時候中止重複發送,不然會一直髮送。

9.timer

timer操做符建立一個在給定的時間段以後返回一個特殊值的Observable,它在延遲一段給定的時間後發射一個簡單的數字0。好比下面的程序會在500毫秒以後輸出一個數字0

Observable.timer(500, TimeUnit.MILLISECONDS).subscribe(System.out::print);
複製代碼

下面是該方法及其重載方法的定義,重載方法還能夠指定一個調度器:

  1. public static Observable<Long> timer(long delay, TimeUnit unit)
  2. public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler)

2.1.2 變換操做

1.map & cast

  1. map操做符對原始Observable發射的每一項數據應用一個你選擇的函數,而後返回一個發射這些結果的Observable。默認不在任何特定的調度器上執行。
  2. cast操做符將原始Observable發射的每一項數據都強制轉換爲一個指定的類型(多態),而後再發射數據,它是map的一個特殊版本:

下面的第一段代碼用於將生成的整數序列轉換成一個字符串序列以後並輸出;第二段代碼用於將Date類型轉換成Object類型並進行輸出,這裏若是前面的Class沒法轉換成第二個Class就會出現異常:

Observable.range(1, 5).map(String::valueOf).subscribe(System.out::println);
Observable.just(new Date()).cast(Object.class).subscribe(System.out::print);
複製代碼

這兩個方法的定義以下:

  1. public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
  2. public final <U> Observable<U> cast(Class<U> clazz)

這裏的mapper函數接受兩個泛型,一個表示原始的數據類型,一個表示要轉換以後的數據類型,轉換的邏輯寫在該接口實現的方法中便可。

2.flatMap & contactMap

flatMap將一個發送事件的上游Observable變換爲多個發送事件的Observables,而後將它們發射的事件合併後放進一個單獨的Observable裏。須要注意的是, flatMap並不保證事件的順序,也就是說轉換以後的Observables的順序沒必要與轉換以前的序列的順序一致。好比下面的代碼用於將一個序列構成的整數轉換成多個單個的Observable,而後組成一個OBservable,並被訂閱。下面輸出的結果仍將是一個字符串數字序列,只是順序不必定是增序的。

Observable.range(1, 5)
        .flatMap((Function<Integer, ObservableSource<String>>) i -> Observable.just(String.valueOf(i)))
        .subscribe(System.out::println);
複製代碼

flatMap對應的方法是contactMap,後者可以保證最終輸出的順序與上游發送的順序一致。下面是這兩個方法的定義:

  1. public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
  2. public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)

flatMap的重載方法數量過多,它們在數據源方面略有不一樣,有的支持錯誤等可選參數,具體能夠參考源代碼。

3.flatMapIterable

flatMapIterable能夠用來將上流的任意一個元素轉換成一個Iterable對象,而後咱們能夠對其進行消費。在下面的代碼中,咱們先生成一個整數的序列,而後將每一個整數映射成一個Iterable<string>類型,最後,咱們對其進行訂閱和消費:

Observable.range(1, 5)
        .flatMapIterable((Function<Integer, Iterable<String>>) integer -> Collections.singletonList(String.valueOf(integer)))
        .subscribe(s -> System.out.println("flatMapIterable : " + s));
複製代碼

下面是該方法及其重載方法的定義:

  1. public final <U> Observable<U> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> mapper)
  2. public final <U, V> Observable<V> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> mapper, BiFunction<? super T, ? super U, ? extends V> resultSelector)

4.buffer

該方法用於將整個流進行分組。如下面的程序爲例,咱們會先生成一個7個整數構成的流,而後使用buffer以後,這些整數會被3個做爲一組進行輸出,因此當咱們訂閱了buffer轉換以後的Observable以後獲得的是一個列表構成的OBservable

Observable.range(1, 7).buffer(3)
        .subscribe(integers -> System.out.println(Arrays.toString(integers.toArray())));
複製代碼

下面是這個方法及其重載方法的定義,它的重載方法太多,這裏咱們只給出其中的兩個,其餘的能夠參考RxJava的源碼。這裏的buffer應該理解爲一個緩衝區,當緩衝區滿了或者剩餘的數據不夠一個緩衝區的時候就將數據發射出去。

  1. public final Observable<List<T>> buffer(int count)
  2. public final Observable<List<T>> buffer(int count, int skip)
  3. ...

5.groupBy

groupBy用於分組元素,它能夠被用來根據指定的條件將元素分紅若干組。它將獲得一個Observable<GroupedObservable<T, M>>類型的Observable。以下面的程序所示,這裏咱們使用concat方法先將兩個Observable拼接成一個Observable,而後對其元素進行分組。這裏咱們的分組依據是整數的值,這樣咱們將獲得一個Observable<GroupedObservable<Integer, Integer>>類型的Observable。而後,咱們再將獲得的序列拼接成一個並進行訂閱輸出:

Observable<GroupedObservable<Integer, Integer>> observable = Observable.concat(
        Observable.range(1,4), Observable.range(1,6)).groupBy(integer -> integer);
Observable.concat(observable).subscribe(integer -> System.out.println("groupBy : " + integer));
複製代碼

該方法有多個重載版本,這裏咱們用到的一個的定義是:

public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector)

6.scan

scan操做符對原始Observable發射的第一項數據應用一個函數,而後將那個函數的結果做爲本身的第一項數據發射。它將函數的結果同第二項數據一塊兒填充給這個函數來產生它本身的第二項數據。它持續進行這個過程來產生剩餘的數據序列。這個操做符在某些狀況下被叫作accumulator。

如下面的程序爲例,該程序的輸結果是2 6 24 120 720,能夠看出這裏的計算規則是,咱們把傳入到scan中的函數記爲f,序列記爲x,生成的序列記爲y,那麼這裏的計算公式是y(0)=x(0); y(i)=f(y(i-1), x(i)), i>0

Observable.range(2, 5).scan((i1, i2) -> i1 * i2).subscribe(i -> System.out.print(i + " "));
複製代碼

除了上面的這種形式,scan方法還有一個重載的版本,咱們可使用這個版本的方法來在生成序列的時候指定一個初始值。如下面的程序爲例,它的輸出結果是3 6 18 72 360 2160,能夠看出它的輸出比上面的形式多了1個,這是由於當指定了初始值以後,生成的第一個數字就是那個初始值,剩下的按照咱們上面的規則進行的。因此,用一樣的函數語言來描述的話,那麼它就應該是下面的這種形式:y(0)=initialValue; y(i)=f(y(i-1), x(i)), i>0

Observable.range(2, 5).scan(3, (i1, i2) -> i1 * i2).subscribe(i -> System.out.print(i + " "));
複製代碼

以上方法的定義是:

  1. public final Observable<T> scan(BiFunction<T, T, T> accumulator)
  2. public final <R> Observable<R> scan(R initialValue, BiFunction<R, ? super T, R> accumulator)

7.window

windowWindow和Buffer相似,但不是發射來自原始Observable的數據包,它發射的是Observable,這些Observables中的每個都發射原始Observable數據的一個子集,最後發射一個onCompleted通知。

如下面的程序爲例,這裏咱們首先生成了一個由10個數字組成的整數序列,而後使用window函數將它們每3個做爲一組,每組會返回一個對應的Observable對象。 這裏咱們對該返回的結果進行訂閱並進行消費,由於10個數字,因此會被分紅4個組,每一個對應一個Observable:

Observable.range(1, 10).window(3).subscribe(
        observable -> observable.subscribe(integer -> System.out.println(observable.hashCode() + " : " + integer)));
複製代碼

除了對數據包進行分組,咱們還能夠根據時間來對發射的數據進行分組。該方法有多個重載的版本,這裏咱們給出其中的比較具備表明性的幾個:

  1. public final Observable<Observable<T>> window(long count)
  2. public final Observable<Observable<T>> window(long timespan, long timeskip, TimeUnit unit)
  3. public final <B> Observable<Observable<T>> window(ObservableSource<B> boundary)
  4. public final <B> Observable<Observable<T>> window(Callable<? extends ObservableSource<B>> boundary)

2.1.3 過濾操做

1.filter

filter用來根據指定的規則對源進行過濾,好比下面的程序用來過濾整數1到10中全部大於5的數字:

Observable.range(1,10).filter(i -> i > 5).subscribe(System.out::println);
複製代碼

下面是該方法的定義:

  1. public final Observable<T> filter(Predicate<? super T> predicate)

2.elementAt & firstElement & lastElement

elementAt用來獲取源中指定位置的數據,它有幾個重載方法,這裏咱們介紹一下最簡單的一個方法的用法。下面是elementAt的一個示例,它將獲取源數據中索引爲1的元素並交給觀察者訂閱。下面的程序將輸出1

Observable.range(1, 10).elementAt(0).subscribe(System.out::print);
複製代碼

這裏咱們給出elementAt及其相關的方法的定義,它們的使用類似。注意一下這裏的返回類型:

  1. public final Maybe<T> elementAt(long index)
  2. public final Single<T> elementAt(long index, T defaultItem)
  3. public final Single<T> elementAtOrError(long index)

除了獲取指定索引的元素的方法以外,RxJava中還有能夠用來直接獲取第一個和最後一個元素的方法,這裏咱們直接給出方法的定義:

  1. public final Maybe<T> firstElement()
  2. public final Single<T> first(T defaultItem)
  3. public final Single<T> firstOrError()
  4. public final Maybe<T> lastElement()
  5. public final Single<T> last(T defaultItem)
  6. public final Single<T> lastOrError()

3.distinct & distinctUntilChanged

distinct用來對源中的數據進行過濾,如下面的程序爲例,這裏會把重複的數字7過濾掉:

Observable.just(1,2,3,4,5,6,7,7).distinct().subscribe(System.out::print);
複製代碼

與之相似的還有distinctUntilChanged方法,與distinct不一樣的是,它只當相鄰的兩個元素相同的時候纔會將它們過濾掉。好比下面的程序會過濾掉其中的2和5,因此最終的輸出結果是12345676

Observable.just(1,2,2,3,4,5,5,6,7,6).distinctUntilChanged().subscribe(System.out::print);
複製代碼

該方法也有幾個功能類似的方法,這裏給出它們的定義以下:

  1. public final Observable<T> distinct()
  2. public final <K> Observable<T> distinct(Function<? super T, K> keySelector)
  3. public final <K> Observable<T> distinct(Function<? super T, K> keySelector, Callable<? extends Collection<? super K>> collectionSupplier)
  4. public final Observable<T> distinctUntilChanged()
  5. public final <K> Observable<T> distinctUntilChanged(Function<? super T, K> keySelector)
  6. public final Observable<T> distinctUntilChanged(BiPredicate<? super T, ? super T> comparer)

4.skip & skipLast & skipUntil & skipWhile

skip方法用於過濾掉數據的前n項,好比下面的程序將會過濾掉前2項,所以輸出結果是345

Observable.range(1, 5).skip(2).subscribe(System.out::print);
複製代碼

skip方法對應的是take方法,它用來表示只選擇數據源的前n項,該方法的示例就不給出了。這裏,咱們說一下與之類功能相似的重載方法。skip還有一個重載方法接受兩個參數,用來表示跳過指定的時間,也就是在指定的時間以後纔開始進行訂閱和消費。下面的程序會在3秒以後纔開始不斷地輸出數字:

Observable.range(1,5).repeat().skip(3, TimeUnit.SECONDS).subscribe(System.out::print);
複製代碼

skip功能相反的方法的還有skipLast,它用來表示過濾掉後面的幾項,以及最後的一段時間不進行發射等。好比下面的方法,咱們會在程序開始以前進行計時,而後會不斷重複輸出數字,直到5秒以後結束。而後,咱們用skipLast方法表示最後的2秒再也不進行發射。因此下面的程序會先不斷輸出數字3秒,3秒結束後中止輸出,並在2秒以後結束程序:

long current = System.currentTimeMillis();
Observable.range(1,5)
        .repeatUntil(() -> System.currentTimeMillis() - current > TimeUnit.SECONDS.toMillis(5))
        .skipLast(2, TimeUnit.SECONDS).subscribe(System.out::print);
複製代碼

與上面的這些方法相似的還有一些,這裏咱們再也不一一列舉。由於這些方法的重載方法比較多,下面咱們給出其中的具備表明性的一部分:

  1. public final Observable<T> skip(long count)
  2. public final Observable<T> skip(long time, TimeUnit unit, Scheduler scheduler)
  3. public final Observable<T> skipLast(int count)
  4. public final Observable<T> skipLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
  5. public final <U> Observable<T> skipUntil(ObservableSource<U> other)
  6. public final Observable<T> skipWhile(Predicate<? super T> predicate)

5.take & takeLast & takeUntil & takeWhile

skip方法對應的是take方法,它表示按照某種規則進行選擇操做。咱們如下面的程序爲例,這裏第一段程序表示只發射序列中的前2個數據:

Observable.range(1, 5).take(2).subscribe(System.out::print);
複製代碼

下面的程序表示只選擇最後2秒中輸出的數據:

long current = System.currentTimeMillis();
Observable.range(1,5)
        .repeatUntil(() -> System.currentTimeMillis() - current > TimeUnit.SECONDS.toMillis(5))
        .takeLast(2, TimeUnit.SECONDS).subscribe(System.out::print);
複製代碼

下面是以上相關的方法的定義,一樣的,咱們只選擇其中比較有表明性的幾個:

  1. public final Observable<T> take(long count)
  2. public final Observable<T> takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
  3. public final <U> Observable<T> takeUntil(ObservableSource<U> other)
  4. public final Observable<T> takeUntil(Predicate<? super T> stopPredicate)
  5. public final Observable<T> takeWhile(Predicate<? super T> predicate)

6.ignoreElements

該方法用來過濾全部源Observable產生的結果,只會把Observable的onComplete和onError事件通知給訂閱者。下面是該方法的定義:

  1. public final Completable ignoreElements()

7.throttleFirst & throttleLast & throttleLatest & throttleWithTimeout

這些方法用來對輸出的數據進行限制,它們是經過時間的」窗口「來進行限制的,你能夠理解成按照指定的參數對時間進行分片,而後根據各個方法的要求選擇第一個、最後一個、最近的等進行發射。下面是throttleLast方法的用法示例,它會輸出每一個500毫秒之間的數字中最後一個數字:

Observable.interval(80, TimeUnit.MILLISECONDS)
        .throttleLast(500, TimeUnit.MILLISECONDS)
        .subscribe(i -> System.out.print(i + " "));
複製代碼

其餘的幾個方法的功能大體列舉以下:

  1. throttleFirst只會發射指定的Observable在指定的事件範圍內發射出來的第一個數據;
  2. throttleLast只會發射指定的Observable在指定的事件範圍內發射出來的最後一個數據;
  3. throttleLatest用來發射距離指定的時間分片最近的那個數據;
  4. throttleWithTimeout僅在過了一段指定的時間還沒發射數據時才發射一個數據,若是在一個時間片達到以前,發射的數據以後又緊跟着發射了一個數據,那麼這個時間片以內以前發射的數據會被丟掉,該方法底層是使用debounce方法實現的。若是數據發射的頻率老是快過這裏的timeout參數指定的時間,那麼將不會再發射出數據來。

下面是這些方法及其重載方法的定義(選擇其中一部分):

  1. public final Observable<T> throttleFirst(long skipDuration, TimeUnit unit, Scheduler scheduler)
  2. public final Observable<T> throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler)
  3. public final Observable<T> throttleLatest(long timeout, TimeUnit unit, Scheduler scheduler, boolean emitLast)
  4. public final Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler)

8.debounce

debounce也是用來限制發射頻率過快的,它僅在過了一段指定的時間還沒發射數據時才發射一個數據。咱們經過下面的圖來講明這個問題:

debounce

這裏紅、綠、藍三個球發射出來的緣由都是由於當反射了這個球以後的必定的時間內沒有其餘的球發射出來,這個時間是咱們能夠經過參數來指定的。

該方法的用法與throttle之類的方法相似,上面也說過throttle那些方法底層用了debounce實現,因此,這裏咱們再也不爲該方法專門編寫相關的測試代碼。

9.sample

實際上throttleLast的實現中內部調用的就是sample

2.1.4 組合操做

1.startWith & startWithArray

startWith方法能夠用來在指定的數據源的以前插入幾個數據,它的功能相似的方法有startWithArray,另外還有幾個重載方法。這裏咱們給出一個基本的用法示例,下面的程序會在原始的數字流1-5的前面加上0,因此最終的輸出結果是012345

Observable.range(1,5).startWith(0).subscribe(System.out::print);
複製代碼

下面是startWith及其幾個功能相關的方法的定義:

  1. public final Observable<T> startWith(Iterable<? extends T> items)
  2. public final Observable<T> startWith(ObservableSource<? extends T> other)
  3. public final Observable<T> startWith(T item)
  4. public final Observable<T> startWithArray(T... items)

2.merge & mergeArray

merge可讓多個數據源的數據合併起來進行發射,固然它可能會讓merge以後的數據交錯發射。下面是一個示例,這個例子中,咱們使用merge方法將兩個Observable合併到了一塊兒進行監聽:

Observable.merge(Observable.range(1,5), Observable.range(6,5)).subscribe(System.out::print);
複製代碼

鑑於merge方法及其功能相似的方法太多,咱們這裏挑選幾個比較有表明性的方法,具體的能夠查看RxJava的源代碼:

  1. public static <T> Observable<T> merge(Iterable<? extends ObservableSource<? extends T>> sources)
  2. public static <T> Observable<T> mergeArray(ObservableSource<? extends T>... sources)
  3. public static <T> Observable<T> mergeDelayError(Iterable<? extends ObservableSource<? extends T>> sources)
  4. public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... sources)

這裏的mergeError方法與merge方法的表現一致,只是在處理由onError觸發的錯誤的時候有所不一樣。mergeError方法會等待全部的數據發射完畢以後才把錯誤發射出來,即便多個錯誤被觸發,該方法也只會發射出一個錯誤信息。而若是使用merger方法,那麼當有錯誤被觸發的時候,該錯誤會直接被拋出來,並結束髮射操做。下面是該方法的一個使用的示例,這裏咱們主線程停頓4秒,而後全部merge的Observable中的一個會在線程開始的第2秒的時候觸發一個錯誤,該錯誤最終會在全部的數據發射完畢以後被髮射出來:

Observable.mergeDelayError(Observable.range(1,5),
        Observable.range(1,5).repeat(2),
        Observable.create((ObservableOnSubscribe<String>) observableEmitter -> {
            Thread.sleep(2000);
            observableEmitter.onError(new Exception("error"));
        })
).subscribe(System.out::print, System.out::print);
Thread.sleep(4000);
複製代碼

3.concat & concatArray & concatEager

該方法也是用來將多個Observable拼接起來,可是它會嚴格按照傳入的Observable的順序進行發射,一個Observable沒有發射完畢以前不會發射另外一個Observable裏面的數據。下面是一個程序示例,這裏傳入了兩個Observable,會按照順序輸出12345678910

Observable.concat(Observable.range(1, 5), Observable.range(6, 5)).subscribe(System.out::print);
複製代碼

下面是該方法的定義,鑑於該方法及其重載方法太多,這裏咱們選擇幾個比較有表明性的說明:

  1. public static <T> Observable<T> concat(Iterable<? extends ObservableSource<? extends T>> sources)
  2. public static <T> Observable<T> concatDelayError(Iterable<? extends ObservableSource<? extends T>> sources)
  3. public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)
  4. public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources)
  5. public static <T> Observable<T> concatEager(ObservableSource<? extends ObservableSource<? extends T>> sources)
  6. public static <T> Observable<T> concatArrayEager(ObservableSource<? extends T>... sources)

對於concat方法,咱們以前已經介紹過它的用法;這裏的conactArray的功能與之相似;對於concatEager方法,當一個觀察者訂閱了它的結果,那麼就至關於訂閱了它拼接的全部ObservableSource,而且會先緩存這些ObservableSource發射的數據,而後再按照順序將它們發射出來。而對於這裏的concatDelayError方法的做用和前面的mergeDelayError相似,只有當全部的數據都發射完畢纔會處理異常。

4.zip & zipArray & zipIterable

zip操做用來將多個數據項進行合併,能夠經過一個函數指定這些數據項的合併規則。好比下面的程序的輸出結果是6 14 24 36 50,顯然這裏的合併的規則是相同索引的兩個數據的乘積。不過仔細看下這裏的輸出結果,能夠看出,若是一個數據項指定的位置沒有對應的值的時候,它是不會參與這個變換過程的:

Observable.zip(Observable.range(1, 6), Observable.range(6, 5), (integer, integer2) -> integer * integer2)
        .subscribe(i -> System.out.print(i + " "));
複製代碼

zip 除了用做兩個 Observable 的合併,它還能夠用來指定兩個 Observable 的順序:

Observable<String> a = // ... A 請求
Observable<Integer> b =  // ... B 請求
Observable.zip(a, b, new BiFunction<String, Integer, Object>(){
    @Override
    public Object apply(@NonNull String s, @NonNull Integer integer) throws Exception {
        // 拿到了 A 請求和 B 請求的第 n 次執行的結果
        return new Object();
    }
}).subscribe();
複製代碼

A 和 B 會並行在各自的子線程當中, 而且會合併到 apply() 方法中。它能保證 B 操做在 A 操做以前執行。咱們可使用這種方式來實現線程的控制。即當一個任務完成以後才執行另外一個任務,同時它們的任務的結果能夠被合併。那麼合併的規則是什麼呢?即那麼若是 A 和 B 屢次發送結果,也就是屢次調用 onNext() 方法。此時,A 和 B 發送的結果會按照前後順序配對,並回調上述的 BiFunction 函數。

zip方法有多個重載的版本,同時也有功能近似的方法,這裏咱們挑選有表明性的幾個進行說明:

  1. public static <T, R> Observable<R> zip(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper)
  2. ublic static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends R> zipper, boolean delayError, int bufferSize, ObservableSource... sources)
  3. public static <T, R> Observable<R> zipIterable(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper, boolean delayError, int bufferSize)

實際上上面幾個方法的用法和功能基本相似,區別在於傳入的ObservableSource的參數的形式。

5.combineLastest

zip操做相似,可是這個操做的輸出結果與zip大相徑庭,如下面的程序爲例,它的輸出結果是36 42 48 54 60

Observable.combineLatest(Observable.range(1, 6), Observable.range(6, 5), (integer, integer2) -> integer * integer2)
        .subscribe(i -> System.out.print(i + " "));
複製代碼

利用下面的這張圖能夠比較容易來講明這個問題:

combineLastest

上圖中的上面的兩條橫線表明用於拼接的兩個數據項,下面的一條橫線是拼接以後的結果。combineLatest的做用是拼接最新發射的兩個數據。下面咱們用上圖的過程來講明該方法是如何執行的:開始第一條只有1的時候沒法拼接,;當第二條出現A的時候,此時最新的數據是1和A,故組合成一個1A;第二個數據項發射了B,此時最新的數據是1和B,故組合成1B;第一條橫線發射了2,此時最新的數據是2和B,所以獲得了2B,依次類推。而後再回到咱們上面的問題,第一個數據項連續發射了5個數據的時候,第二個數據項一個都沒有發射出來,所以沒有任何輸出;而後第二個數據項開始發射數據,當第二個數據項發射了6的時候,此時最新的數據組合是6和6,故得36;而後,第二個數據項發射了7,此時最新的數據組合是6和7,故得42,依次類推。

該方法也有對應的combineLatestDelayError方法,用途也是隻有當全部的數據都發射完畢的時候纔去處理錯誤邏輯。

2.1.5 輔助操做

1.delay

delay方法用於在發射數據以前停頓指定的時間,好比下面的程序會在真正地發射數據以前停頓1秒:

Observable.range(1, 5).delay(1000, TimeUnit.MILLISECONDS).subscribe(System.out::print);
Thread.sleep(1500);
複製代碼

一樣delay方法也有幾個重載的方法,能夠供咱們用來指定觸發的線程等信息,這裏給出其中的兩個,其餘的能夠參考源碼和文檔:

  1. public final Observable<T> delay(long delay, TimeUnit unit)
  2. public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler)

2.do系列

RxJava中還有一系列的方法能夠供咱們使用,它們共同的特色是都是以do開頭,下面咱們列舉一下這些方法並簡要說明一下它們各自的用途:

  1. public final Observable<T> doAfterNext(Consumer<? super T> onAfterNext),會在onNext方法以後觸發;
  2. public final Observable<T> doAfterTerminate(Action onFinally),會在Observable終止以後觸發;
  3. public final Observable<T> doFinally(Action onFinally),當onComplete或者onError的時候觸發;
  4. public final Observable<T> doOnDispose(Action onDispose),當被dispose的時候觸發;
  5. public final Observable<T> doOnComplete(Action onComplete),當complete的時候觸發;
  6. public final Observable<T> doOnEach(final Observer<? super T> observer),當每一個onNext調用的時候觸發;
  7. public final Observable<T> doOnError(Consumer<? super Throwable> onError),當調用onError的時候觸發;
  8. public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onDispose)
  9. public final Observable<T> doOnNext(Consumer<? super T> onNext),,會在onNext的時候觸發;
  10. public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe),會在訂閱的時候觸發;
  11. public final Observable<T> doOnTerminate(final Action onTerminate),當終止以前觸發。

這些方法能夠看做是對操做執行過程的一個監聽,當指定的操做被觸發的時候會同時觸發這些監聽方法:

Observable.range(1, 5)
        .doOnEach(integerNotification -> System.out.println("Each : " + integerNotification.getValue()))
        .doOnComplete(() -> System.out.println("complete"))
        .doFinally(() -> System.out.println("finally"))
        .doAfterNext(i -> System.out.println("after next : " + i))
        .doOnSubscribe(disposable -> System.out.println("subscribe"))
        .doOnTerminate(() -> System.out.println("terminal"))
        .subscribe(i -> System.out.println("subscribe : " + i));
複製代碼

3.subscribeOn & observeOn

subscribeOn用於指定Observable自身運行的線程,observeOn用於指定發射數據所處的線程,好比Android中的異步任務須要用subscribeOn指定發射數據所在的線程是非主線程,而後執行完畢以後將結果發送給主線程,就須要用observeOn來指定。好比下面的程序,咱們用這兩個方法來指定所在的線程:

Observable.create(new ObservableOnSubscribe<T>() {
    @Override
    public void subscribe(ObservableEmitter<T> emitter) throws Exception {
        // do nothing
    }
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
複製代碼

4.timeout

用來設置一個超時時間,若是指定的時間以內沒有任何數據被髮射出來,那麼就會執行咱們指定的數據項。以下面的程序所示,咱們先爲設置了一個間隔200毫秒的數字產生器,開始發射數據以前要停頓1秒鐘,由於咱們設置的超時時間是500毫秒,於是在第500毫秒的時候會執行咱們傳入的數據項:

Observable.interval(1000, 200, TimeUnit.MILLISECONDS)
        .timeout(500, TimeUnit.MILLISECONDS, Observable.rangeLong(1, 5))
        .subscribe(System.out::print);
Thread.sleep(2000);
複製代碼

timeout方法有多個重載方法,能夠爲其指定線程等參數,能夠參考源碼或者文檔瞭解詳情。

2.1.6 錯誤處理操做符

錯誤處理操做符主要用來提供給Observable,用來對錯誤信息作統一的處理,經常使用的兩個是catchretry

1.catch

catch操做會攔截原始的Observable的onError通知,將它替換爲其餘數據項或者數據序列,讓產生的Observable可以正常終止或者根本不終止。在RxJava中該操做有3終類型:

  1. onErrorReturn:這種操做會在onError觸發的時候返回一個特殊的項替換錯誤,並調用觀察者的onCompleted方法,而不會將錯誤傳遞給觀察者;
  2. onErrorResumeNext:會在onError觸發的時候發射備用的數據項給觀察者;
  3. onExceptionResumeNext:若是onError觸發的時候onError收到的Throwable不是Exception,它會將錯誤傳遞給觀察者的onError方法,不會使用備用的Observable。

下面是onErrorReturnonErrorResumeNext的程序示例,這裏第一段代碼會在出現錯誤的時候輸出666,而第二段會在出現錯誤的時候發射數字12345

Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
        observableEmitter.onError(null);
        observableEmitter.onNext(0);
    }).onErrorReturn(throwable -> 666).subscribe(System.out::print);

    Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
        observableEmitter.onError(null);
        observableEmitter.onNext(0);
    }).onErrorResumeNext(Observable.range(1,5)).subscribe(System.out::print);
複製代碼

2.retry

retry使用了一種錯誤重試機制,它能夠在出現錯誤的時候進行重試,咱們能夠經過參數指定重試機制的條件。如下面的程序爲例,這裏咱們設置了當出現錯誤的時候會進行2次重試,所以,第一次的時候出現錯誤會調用onNext,重試2次又會調用2次onNext,第二次重試的時候由於重試又出現了錯誤,所以此時會觸發onError方法。也就是說,下面這段代碼會觸發onNext3次,觸發onError()1次:

Observable.create(((ObservableOnSubscribe<Integer>) emitter -> {
        emitter.onNext(0);
        emitter.onError(new Throwable("Error1"));
        emitter.onError(new Throwable("Error2"));
    })).retry(2).subscribe(i -> System.out.println("onNext : " + i), error -> System.out.print("onError : " + error));
複製代碼

retry有幾個重載的方法和功能相近的方法,下面是這些方法的定義(選取部分):

  1. public final Observable<T> retry():會進行無限次地重試;
  2. public final Observable<T> retry(BiPredicate<? super Integer, ? super Throwable> predicate)
  3. public final Observable<T> retry(long times):指定重試次數;
  4. public final Observable<T> retry(long times, Predicate<? super Throwable> predicate)
  5. public final Observable<T> retryUntil(final BooleanSupplier stop)
  6. public final Observable<T> retryWhen(Function<? super Observable<Throwable>, ? extends ObservableSource<?>> handler)

2.1.7 條件操做符和布爾操做符

1.all & any

  1. all用來判斷指定的數據項是否所有知足指定的要求,這裏的「要求」可使用一個函數來指定;
  2. any用來判斷指定的Observable是否存在知足指定要求的數據項。

在下面的程序中,咱們用該函數來判斷指定的數據項是否所有知足大於5的要求,顯然是不知足的,所以下面的程序將會輸出false

Observable.range(5, 5).all(i -> i>5).subscribe(System.out::println); // false
Observable.range(5, 5).any(i -> i>5).subscribe(System.out::println); // true
複製代碼

如下是該方法的定義:

  1. public final Single<Boolean> all(Predicate<? super T> predicate)
  2. public final Single<Boolean> any(Predicate<? super T> predicate)

2.contains & isEmpty

這兩個方法分別用來判斷數據項中是否包含咱們指定的數據項,已經判斷數據項是否爲空:

Observable.range(5, 5).contains(4).subscribe(System.out::println); // false
Observable.range(5, 5).isEmpty().subscribe(System.out::println); // false
複製代碼

如下是這兩個方法的定義:

  1. public final Single<Boolean> isEmpty()
  2. public final Single<Boolean> contains(final Object element)

3.sequenceEqual

sequenceEqual用來判斷兩個Observable發射出的序列是不是相等的。好比下面的方法用來判斷兩個序列是否相等:

Observable.sequenceEqual(Observable.range(1,5), Observable.range(1, 5)).subscribe(System.out::println);
複製代碼

4.amb

amb做用的兩個或多個Observable,可是隻會發射最早發射數據的那個Observable的所有數據:

Observable.amb(Arrays.asList(Observable.range(1, 5), Observable.range(6, 5))).subscribe(System.out::print)
複製代碼

該方法及其功能近似的方法的定義,這裏前兩個是靜態的方法,第二個屬於實例方法:

  1. public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources)
  2. public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources)
  3. public final Observable<T> ambWith(ObservableSource<? extends T> other)

5.defaultIfEmpty

defaultIfEmpty用來當指定的序列爲空的時候指定一個用於發射的值。下面的程序中,咱們直接調用發射器的onComplete方法,所以序列是空的,結果輸出一個整數6

Observable.create((ObservableOnSubscribe<Integer>) Emitter::onComplete).defaultIfEmpty(6).subscribe(System.out::print);
複製代碼

下面是該方法的定義:

  1. public final Observable<T> defaultIfEmpty(T defaultItem)

2.1.8 轉換操做符

1.toList & toSortedList

toListtoSortedList用於將序列轉換成列表,後者相對於前者增長了排序的功能:

Observable.range(1, 5).toList().subscribe(System.out::println);
Observable.range(1, 5).toSortedList(Comparator.comparingInt(o -> -o)).subscribe(System.out::println);
複製代碼

下面是它們的定義,它們有多個重載版本,這裏選擇其中的兩個進行說明:

  1. public final Single<List<T>> toList()
  2. public final Single<List<T>> toSortedList(final Comparator<? super T> comparator)

注意一下,這裏的返回結果是Single類型的,不過這並不妨礙咱們繼續使用鏈式操做,由於Single的方法和Observable基本一致。 另外還要注意這裏的Single中的參數是一個List<T>,也就是說,它把整個序列轉換成了一個列表對象。所以,上面的兩個示例程序的輸出是:

[1, 2, 3, 4, 5]
[5, 4, 3, 2, 1]
複製代碼

2.toMap & toMultimap

toMap用於將發射的數據轉換成另外一個類型的值,它的轉換過程是針對每個數據項的。如下面的代碼爲例,它會將原始的序列中的每一個數字轉換成對應的十六進制。可是,toMap轉換的結果不必定是按照原始的序列的發射的順序來的:

Observable.range(8, 10).toMap(Integer::toHexString).subscribe(System.out::print);
複製代碼

toMap近似的是toMultimap方法,它能夠將原始序列的每一個數據項轉換成一個集合類型:

Observable.range(8, 10).toMultimap(Integer::toHexString).subscribe(System.out::print);
複製代碼

上面的兩段程序的輸出結果是:

{11=17, a=10, b=11, c=12, d=13, e=14, f=15, 8=8, 9=9, 10=16}
{11=[17], a=[10], b=[11], c=[12], d=[13], e=[14], f=[15], 8=[8], 9=[9], 10=[16]}
複製代碼

上面的兩個方法的定義是(多個重載,選擇部分):

  1. public final <K> Single<Map<K, T>> toMap(final Function<? super T, ? extends K> keySelector)
  2. public final <K> Single<Map<K, Collection<T>>> toMultimap(Function<? super T, ? extends K> keySelector)

3.toFlowable

該方法用於將一個Observable轉換成Flowable類型,下面是該方法的定義,顯然這個方法使用了策略模式,這裏面涉及背壓相關的內容,咱們後續再詳細介紹。

public final Flowable<T> toFlowable(BackpressureStrategy strategy)
複製代碼

4.to

相比於上面的方法,to方法的限制更加得寬泛,你能夠將指定的Observable轉換成任意你想要的類型(若是你能夠作到的話),下面是一個示例代碼,用來將指定的整數序列轉換成另外一個整數類型的Observable,只不過這裏的每一個數據項都是原來的列表中的數據總數的值:

Observable.range(1, 5).to(Observable::count).subscribe(System.out::println);
複製代碼

下面是該方法的定義:

public final <R> R to(Function<? super Observable<T>, R> converter)

2.2 線程控制

以前有提到過RxJava的線程控制是經過subscribeOnobserveOn兩個方法來完成的。 這裏咱們梳理一下RxJava提供的幾種線程調度器以及RxAndroid爲Android提供的調度器的使用場景和區別等。

  1. Schedulers.io():表明適用於io操做的調度器,增加或縮減來自適應的線程池,一般用於網絡、讀寫文件等io密集型的操做。重點須要注意的是線程池是無限制的,大量的I/O調度操做將建立許多個線程並佔用內存。
  2. Schedulers.computation():計算工做默認的調度器,表明CPU計算密集型的操做,與I/O操做無關。它也是許多RxJava方法,好比buffer(),debounce(),delay(),interval(),sample(),skip(),的默認調度器。
  3. Schedulers.newThread():表明一個常規的新線程。
  4. Schedulers.immediate():這個調度器容許你當即在當前線程執行你指定的工做。它是timeout(),timeInterval()以及timestamp()方法默認的調度器。
  5. Schedulers.trampoline():當咱們想在當前線程執行一個任務時,並非當即,咱們能夠用trampoline()將它入隊。這個調度器將會處理它的隊列而且按序運行隊列中每個任務。它是repeat()retry()方法默認的調度器。

以及RxAndroid提供的線程調度器:

AndroidSchedulers.mainThread()用來指代Android的主線程

2.3 總結

上面的這些操做也基本適用於FlowableSingleCompletableMaybe

咱們花費了不少的時間和精力來梳理了這些方法,按照上面的內容,使用RxJava實現一些基本的或者高級的操做都不是什麼問題。

可是,Observable更適用於處理一些數據規模較小的問題,當數據規模比較多的時候可能會出現MissingBackpressureException異常。 所以,咱們還須要瞭解背壓和Flowable的相關內容才能更好地理解和應用RxJava.

RxJava 系列文章:

相關文章
相關標籤/搜索