Rxjava2 Observable的結合操做詳解及實例

簡要:

需求瞭解:java

在使用 RxJava 開發的過程當中,不少時候須要結合多個條件或者數據的邏輯判斷,好比登陸功能的表單驗證,實時數據比對等。這個時候咱們就須要使用 RxJava 的結合操做符來完成這一需求,Rx中提供了豐富的結合操做處理的操做方法。react

可用於組合多個Observables的操做方法:git

  • CombineLatest:當Observables中的任何一個發射了一個數據時,經過一個指定的函數組合每一個Observable發射的最新數據,而後發射這個函數的結果。
  • Join:只要在另外一個Observable發射的數據定義的時間窗口內,這個Observable發射了一條數據,就結合兩個Observable發射的數據。
  • Merge:合併多個Observables的發射物,能夠將多個Observables的輸出合併,就好像它們是一個單個的Observable同樣。
  • Zip:經過一個函數將多個Observables的發射物結合到一塊兒,基於這個函數的結果爲每一個結合體嚴格按照數量以及順序發射單個數據項。
  • StartWith:在數據序列的開頭插入一條指定的數據項或者數據序列。
  • SwitchOnNext:將一個發射多個Observables的Observable轉換成另外一個單獨的Observable,後者發射那些Observables最新發射的Observable的數據項。

1. CombineLatest

當 Observables 中的任何一個發射了數據時,使用一個函數結合每一個 Observable 發射的最近數據項,而且基於這個函數的結果發射數據。github

CombineLatest 操做符行爲相似於zip,可是隻有當原始的Observable中的每個都發射了一條數據時 zip 才發射數據。 CombineLatest 則在原始的Observable中任意一個發射了數據時發射一條數據。當原始Observables的任何一個發射了一條數據時, CombineLatest 使用一 個函數結合它們最近發射的數據,而後發射這個函數的返回值。api

img-CombineLatest
解析: combineLatest 操做符能夠結合多個Observable,能夠接收 2-9 個Observable對象, 在其中原始Observables的任何一個發射了一條數據時, CombineLatest 使用一個函數結合它們最近發射的數據,而後發射這個函數的返回值。此外combineLatest 操做符還有一些接收 Iterable , 數組方式的變體,以及其餘指定參數combiner、bufferSize、和combineLatestDelayError方法等變體,在此就不在詳細展開了,有興趣的能夠查看官方的相關API文檔瞭解。數組

實例代碼:緩存

// Observables 建立
    Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS);
    Observable<Long> observable2 = Observable.intervalRange(1, 5, 1, 2, TimeUnit.SECONDS);
    Observable<Long> observable3 = Observable.intervalRange(100, 5, 1, 1, TimeUnit.SECONDS);
    
    // 1. combineLatest(ObservableSource, ObservableSource [支持2-9個參數]...,  BiFunction)
    // 結合多個Observable, 當他們其中任意一個發射了數據時,使用函數結合他們最近發射的一項數據
    Observable.combineLatest(observable1, observable2, new BiFunction<Long, Long, String>() {
    
        @Override
        public String apply(Long t1, Long t2) throws Exception {
            System.out.println("--> apply(1) t1 = " + t1 + ", t2 = " + t2);
            if (t1 + t2 == 10) {
                return "Success";   // 知足必定條件,返回指定的字符串
            }
            return t1 + t2 + "";    // 計算全部數據的和並轉換爲字符串
        }
    }).subscribe(new Consumer<String>() {
    
        @Override
        public void accept(String t) throws Exception {
            System.out.println("----> accept combineLatest(1): " + t);
        }
    });
    
    System.out.println("--------------------------------------------------------");
    // 2. combineLatest(T1, T2, T3, Function)
    // Observables的結合
    Observable.combineLatest(observable1, observable2, observable3, new Function3<Long, Long, Long, String>() {
        @Override
        public String apply(Long t1, Long t2, Long t3) throws Exception {
            System.out.println("--> apply(2): t1 = " + t1 + ", t2 = " + t2 + ", t3 = " + t3);
            return t1 + t2 + t3 + "";   // 計算全部數據的和並轉換爲字符串
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String t) throws Exception {
            System.out.println("--> accept(2): " + t);
        }
    });

輸出:網絡

--> apply(1) t1 = 1, t2 = 1
----> accept combineLatest(1): 2
--> apply(1) t1 = 2, t2 = 1
----> accept combineLatest(1): 3
--> apply(1) t1 = 3, t2 = 1
----> accept combineLatest(1): 4
--> apply(1) t1 = 3, t2 = 2
----> accept combineLatest(1): 5
--> apply(1) t1 = 4, t2 = 2
----> accept combineLatest(1): 6
--> apply(1) t1 = 4, t2 = 3
----> accept combineLatest(1): 7
--> apply(1) t1 = 5, t2 = 3
----> accept combineLatest(1): 8
--> apply(1) t1 = 5, t2 = 4
----> accept combineLatest(1): 9
--> apply(1) t1 = 5, t2 = 5
----> accept combineLatest(1): Success
--------------------------------------------------------
--> apply(2): t1 = 1, t2 = 1, t3 = 100
--> accept(2): 102
--> apply(2): t1 = 2, t2 = 1, t3 = 100
--> accept(2): 103
--> apply(2): t1 = 2, t2 = 1, t3 = 101
--> accept(2): 104
--> apply(2): t1 = 2, t2 = 2, t3 = 101
--> accept(2): 105
--> apply(2): t1 = 3, t2 = 2, t3 = 101
--> accept(2): 106
--> apply(2): t1 = 3, t2 = 2, t3 = 102
--> accept(2): 107
--> apply(2): t1 = 4, t2 = 2, t3 = 102
--> accept(2): 108
--> apply(2): t1 = 4, t2 = 2, t3 = 103
--> accept(2): 109
--> apply(2): t1 = 5, t2 = 2, t3 = 103
--> accept(2): 110
--> apply(2): t1 = 5, t2 = 3, t3 = 103
--> accept(2): 111
--> apply(2): t1 = 5, t2 = 3, t3 = 104
--> accept(2): 112
--> apply(2): t1 = 5, t2 = 4, t3 = 104
--> accept(2): 113
--> apply(2): t1 = 5, t2 = 5, t3 = 104
--> accept(2): 114

Javadoc: combineLatest(T1, T2, T3... , T9, combiner)併發

2. Join

任什麼時候候,只要在另外一個Observable發射的數據定義的時間窗口內,這個Observable發射了一條數據,就結合兩個Observable發射的數據。

img-join
Join 操做符結合兩個Observable發射的數據,基於時間窗口(你定義的針對每條數據特定的原則)選擇待集合的數據項。你將這些時間窗口實現爲一些Observables,它們的生命週期從任何一條Observable發射的每一條數據開始。當這個定義時間窗口的Observable發射了一條數據或者完成時,與這條數據關聯的窗口也會關閉。只要這條數據的窗口是打開的,它將繼續結合其它Observable發射的任何數據項。你定義一個用於結合數據的函數。

解析: join(other, leftEnd, rightEnd, resultSelector) 相關參數的解析

  • other: 源Observable與其組合的目標Observable。
  • leftEnd: 接收一個源數據項,返回一個Observable,這個Observable的生命週期就是源Observable發射數據的有效期。
  • rightEnd: 接收一個源數據項,返回一個Observable,這個Observable的生命週期就是目標Observable發射數據的有效期。
  • resultSelector: 接收源Observable和目標Observable發射的數據項, 處理後的數據返回給觀察者對象。

注意: 這是源Observable和目標Observable發射數據在任意一個基於時間窗口的有效期內纔會接收到組合數據,這就意味着可能有數據丟失的狀況,在其中一個已經發射完全部數據,而且沒有處於時間窗口的數據狀況,另外一個Observable的數據發射將不會收到組合數據。

示例代碼:

// Observable的建立
    Observable<Long> sourceObservable = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
    Observable<Long> targetObservable = Observable.intervalRange(10, 5, 1, 1000, TimeUnit.MILLISECONDS);

    // 1. join(other, leftEnd, rightEnd, resultSelector)
    // other: 目標組合的Observable
    // leftEnd: 接收一個源數據項,返回一個Observable,這個Observable的生命週期就是源Observable發射數據的有效期
    // rightEnd: 接收一個源數據項,返回一個Observable,這個Observable的生命週期就是目標Observable發射數據的有效期
    // resultSelector: 接收源Observable和目標Observable發射的數據項, 處理後的數據返回給觀察者對象
    sourceObservable.join(targetObservable, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long t) throws Exception {
            System.out.println("-----> t1 is emitter: " + t);
            return Observable.timer(1000, TimeUnit.MILLISECONDS);   // 源Observable發射數據的有效期爲1000毫秒
        }
    }, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long t) throws Exception {
            System.out.println("-----> t2 is emitter: " + t);
            return Observable.timer(1000, TimeUnit.MILLISECONDS);   // 目標Observable發射數據的有效期爲1000毫秒
        }
    }, new BiFunction<Long, Long, String>() {
        @Override
        public String apply(Long t1, Long t2) throws Exception {
            return "t1 = " + t1 + ", t2 = " + t2;                   // 對數據進行組合後返回和觀察者
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String t) throws Exception {
            System.out.println("--> accept(1): " + t);
        }
    });

    System.in.read();

輸出:

-----> t1 is emitter: 1
-----> t2 is emitter: 10
--> accept(1): t1 = 1, t2 = 10
-----> t1 is emitter: 2
--> accept(1): t1 = 2, t2 = 10
-----> t1 is emitter: 3
--> accept(1): t1 = 3, t2 = 10
-----> t2 is emitter: 11
--> accept(1): t1 = 1, t2 = 11
--> accept(1): t1 = 2, t2 = 11
--> accept(1): t1 = 3, t2 = 11
-----> t1 is emitter: 4
--> accept(1): t1 = 4, t2 = 11
-----> t1 is emitter: 5
--> accept(1): t1 = 5, t2 = 11
-----> t2 is emitter: 12
--> accept(1): t1 = 3, t2 = 12
--> accept(1): t1 = 4, t2 = 12
--> accept(1): t1 = 5, t2 = 12
-----> t2 is emitter: 13
--> accept(1): t1 = 5, t2 = 13
-----> t2 is emitter: 14   // 此時源t1中已經沒有數據還處於時間窗口有效期內

Javadoc: join(other, leftEnd, rightEnd, resultSelector)

groupJoin

groupJoin 操做符與 join 相同,只是參數傳遞有所區別。groupJoin(other, leftEnd, rightEnd, resultSelector) 中的resultSelector 能夠將原始數據轉換爲 Observable 類型的數據發送給觀察者。

img-groupJoin

示例代碼:

// Observable的建立
    Observable<Long> sourceObservable = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
    Observable<Long> targetObservable = Observable.intervalRange(10, 5, 1, 1000, TimeUnit.MILLISECONDS);

    // 2. groupJoin(other, leftEnd, rightEnd, resultSelector)
    // groupJoin操做符與join相同,只是參數傳遞有所區別。
    // resultSelector能夠將原始數據轉換爲Observable類型的數據發送給觀察者。
    sourceObservable.groupJoin(targetObservable, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long t) throws Exception {
            System.out.println("-----> t1 is emitter: " + t);
            return Observable.timer(1000, TimeUnit.MILLISECONDS);   // 源Observable發射數據的有效期爲1000毫秒
        }
    }, new Function<Long, ObservableSource<Long>>() {
        @Override
        public ObservableSource<Long> apply(Long t) throws Exception {
            System.out.println("-----> t2 is emitter: " + t);
            return Observable.timer(1000, TimeUnit.MILLISECONDS);   // 目標Observable發射數據的有效期爲1000毫秒
        }
    }, new BiFunction<Long, Observable<Long>, Observable<String>>() {
        @Override
        public Observable<String> apply(Long t1, Observable<Long> t2) throws Exception {
            System.out.println("--> apply(2) combine: " + t1);            // 結合操做
            return t2.map(new Function<Long, String>() {
                @Override
                public String apply(Long t) throws Exception {
                    System.out.println("-----> apply(2) operation: " + t);
                    return "t1 = " + t1 + ", t2 = " + t;
                }
            });
        }
    }).subscribe(new Consumer<Observable<String>>() {
        @Override
        public void accept(Observable<String> stringObservable) throws Exception {
            stringObservable.subscribe(new Consumer<String>() {
                @Override
                public void accept(String t) throws Exception {
                    System.out.println("--> accept(2): " + t);
                }
            });
        }
    });

輸出:

-----> t1 is emitter: 1
--> apply(2) combine: 1
-----> t2 is emitter: 10
-----> apply(2) operation: 10
--> accept(2): t1 = 1, t2 = 10
-----> t1 is emitter: 2
--> apply(2) combine: 2
-----> apply(2) operation: 10
--> accept(2): t1 = 2, t2 = 10
-----> t1 is emitter: 3
--> apply(2) combine: 3
-----> apply(2) operation: 10
--> accept(2): t1 = 3, t2 = 10
-----> t2 is emitter: 11
-----> apply(2) operation: 11
--> accept(2): t1 = 1, t2 = 11
-----> apply(2) operation: 11
--> accept(2): t1 = 2, t2 = 11
-----> apply(2) operation: 11
--> accept(2): t1 = 3, t2 = 11
-----> t1 is emitter: 4
--> apply(2) combine: 4
-----> apply(2) operation: 11
--> accept(2): t1 = 4, t2 = 11
-----> t1 is emitter: 5
--> apply(2) combine: 5
-----> apply(2) operation: 11
--> accept(2): t1 = 5, t2 = 11
-----> t2 is emitter: 12
-----> apply(2) operation: 12
--> accept(2): t1 = 3, t2 = 12
-----> apply(2) operation: 12
--> accept(2): t1 = 4, t2 = 12
-----> apply(2) operation: 12
--> accept(2): t1 = 5, t2 = 12
-----> t2 is emitter: 13
-----> apply(2) operation: 13
--> accept(2): t1 = 5, t2 = 13
-----> t2 is emitter: 14

Javadoc: groupJoin(other, leftEnd, rightEnd, resultSelector)

3. Merge

合併多個Observables的發射物。

img-Merge

使用 Merge 操做符你能夠將多個Observables的輸出合併,就好像它們是一個單個的 Observable 同樣。

3.1 merge

Merge 可能會讓合併的Observables發射的數據交錯(有一個相似的操做符 Concat 不會讓數據交錯,它會按順序一個接着一個發射多個Observables的發射物),任何一個原始Observable的 onError 通知會被當即傳遞給觀察者,並且會終止合併後的Observable。

img-merge

除了傳遞多個Observable給 merge ,你還能夠傳遞一個Observable列表 List ,數組,甚至是一個發射Observable序列的Observable, merge 將合併它們的輸出做爲單個Observable的輸出。

img-merge-observables

若是你傳遞一個發射Observables序列的Observable,你能夠指定 merge 應該同時訂閱的 Observable 的最大數量。一旦達到訂閱數的限制,它將再也不訂閱原始Observable發射的任何其它Observable,直到某個已經訂閱的Observable發射了 onCompleted 通知。

示例代碼:

// 建立Observable對象
    Observable<Integer> odd = Observable.just(1, 3, 5);
    Observable<Integer> even = Observable.just(2, 4, 6);
    Observable<Integer> big = Observable.just(188888, 688888, 888888);

    // 建立list對象
    List<Observable<Integer>> list = new ArrayList<>();
    list.add(odd);
    list.add(even);
    list.add(big);

    // 建立Array對象
    Observable<Integer>[] observables = new Observable[3];
    observables[0] = odd;
    observables[1] = even;
    observables[2] = big;

    // 建立發射Observable序列的Observable
    Observable<ObservableSource<Integer>> sources = Observable.create(new ObservableOnSubscribe<ObservableSource<Integer>>() {
        @Override
        public void subscribe(ObservableEmitter<ObservableSource<Integer>> emitter) throws Exception {
            emitter.onNext(Observable.just(1));
            emitter.onNext(Observable.just(1, 2));
            emitter.onNext(Observable.just(1, 2, 3));
            emitter.onNext(Observable.just(1, 2, 3, 4));
            emitter.onNext(Observable.just(1, 2, 3, 4, 5));
            emitter.onComplete();
        }
    });

    // 1. merge(ObservableSource source1, ObservableSource source2, ..., ObservableSource source4)
    // 可接受 2-4 個Observable對象進行merge
    Observable.merge(odd, even)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(1): " + integer);
                }
            });

    System.out.println("-----------------------------------------------");
    // 2. merge(Iterable<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int bufferSize)
    // 可選參數, maxConcurrency: 最大的併發處理數, bufferSize: 緩存的數量(從每一個內部觀察資源預取的項數)
    // 接受一個Observable的列表List
    Observable.merge(list)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(2): " + integer);
                }
            });

    System.out.println("-----------------------------------------------");
    // 3. mergeArray(int maxConcurrency, int bufferSize, ObservableSource<? extends T>... sources)
    // 可選參數, maxConcurrency: 最大的併發處理數, bufferSize: 緩存的數量(從每一個內部觀察資源預取的項數)
    // 接受一個Observable的數組Array
    Observable.mergeArray(observables)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(3): " + integer);
                }
            });

    System.out.println("-----------------------------------------------");
    // 4. merge(ObservableSource<? extends ObservableSource<? extends T>> sources, int maxConcurrency)
    // 可選參數, maxConcurrency: 最大的併發處理數
    // 接受一個發射Observable序列的Observable
    Observable.merge(sources)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(4): " + integer);
                }
            });

    System.out.println("-----------------------------------------------");
    // 5. mergeWith(other)
    // merge 是靜態方法, mergeWith 是對象方法: Observable.merge(odd,even) 等價於 odd.mergeWith(even)
    odd.mergeWith(even)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(5): " + integer);
                }
            });

輸出:

--> accept(1): 1
--> accept(1): 3
--> accept(1): 5
--> accept(1): 2
--> accept(1): 4
--> accept(1): 6
-----------------------------------------------
--> accept(2): 1
--> accept(2): 3
--> accept(2): 5
--> accept(2): 2
--> accept(2): 4
--> accept(2): 6
--> accept(2): 188888
--> accept(2): 688888
--> accept(2): 888888
-----------------------------------------------
--> accept(3): 1
--> accept(3): 3
--> accept(3): 5
--> accept(3): 2
--> accept(3): 4
--> accept(3): 6
--> accept(3): 188888
--> accept(3): 688888
--> accept(3): 888888
-----------------------------------------------
--> accept(4): 1
--> accept(4): 1
--> accept(4): 2
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
--> accept(4): 4
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3
--> accept(4): 4
--> accept(4): 5
-----------------------------------------------
--> accept(5): 1
--> accept(5): 3
--> accept(5): 5
--> accept(5): 2
--> accept(5): 4
--> accept(5): 6

Javadoc: merge(source1, ... , source4)
Javadoc: merge(Iterable sources, int maxConcurrency, int bufferSize)
Javadoc: mergeArray(int maxConcurrency, int bufferSize, ObservableSource... sources)
Javadoc: merge(ObservableSource sources, int maxConcurrency)

3.2 mergeDelayError

若是傳遞給 merge 的任何一個的Observable發射了 onError通知終止了, merge 操做符生成的Observable也會當即以onError通知終止。若是你想讓它繼續發射數據,在最後才報告錯誤,可使用 mergeDelayError

img-mergeDelayError

MergeDelayError 操做符,mergeDelayError 在合併與交錯輸出的使用上與 merge 相同,區別在於它會保留 onError 通知直到其餘沒有Error的Observable全部的數據發射完成,在那時它纔會把onError傳遞給觀察者。

注意: 若是有多個原始Observable出現了Error, 這些Error通知會被合併成一個 CompositeException ,保留在CompositeException 內部的 List<Throwable> exceptions 中,可是若是隻有一個原始Observable出現了Error,則不會生成 CompositeException ,只會發送這個Error通知。

因爲MergeDelayError使用上和merge相同 ,因此這裏就不作詳細分析了,這裏就簡單描述其中的一種的使用實例。

實例代碼:

// 建立有Error的Observable序列的Observable
    Observable<ObservableSource<Integer>> DelayErrorObservable = Observable.create(new ObservableOnSubscribe<ObservableSource<Integer>>() {

        @Override
        public void subscribe(ObservableEmitter<ObservableSource<Integer>> emitter) throws Exception {
            emitter.onNext(Observable.just(1));
            emitter.onNext(Observable.error(new Exception("Error Test1"))); // 發射一個Error的通知的Observable
            emitter.onNext(Observable.just(2, 3));
            emitter.onNext(Observable.error(new Exception("Error Test2"))); // 發射一個Error的通知的Observable
            emitter.onNext(Observable.just(4, 5, 6));
            emitter.onComplete();
        }
    });

    // 6. mergeDelayError
    // 保留onError通知直到合併後的Observable全部的數據發射完成,在那時它纔會把onError傳遞給觀察者
    Observable.mergeDelayError(DelayErrorObservable)
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(6)");
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("--> onNext(6): " + integer);
                }

                @Override
                public void onError(Throwable e) {
                    // 判斷是不是CompositeException對象(發生多個Observable出現Error時會發送的對象)
                    if (e instanceof CompositeException) {
                        CompositeException compositeException = (CompositeException) e;
                        List<Throwable> exceptions = compositeException.getExceptions();
                        System.out.println("--> onError(6): " + exceptions);
                    } else {
                        System.out.println("--> onError(6): " + e);
                    }
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(6)");
                }
            });

輸出:

--> onSubscribe(6)
--> onNext(6): 1
--> onNext(6): 2
--> onNext(6): 3
--> onNext(6): 4
--> onNext(6): 5
--> onNext(6): 6
--> onError(6): [java.lang.Exception: Error Test1, java.lang.Exception: Error Test2]

Javadoc: mergeDelayError(source1, … , source4)
Javadoc: mergeDelayError(Iterable sources, int maxConcurrency, int bufferSize)
Javadoc: mergeArrayDelayError(int maxConcurrency, int bufferSize, ObservableSource… sources)
Javadoc: mergeDelayError(ObservableSource sources, int maxConcurrency)

4. Zip

經過一個函數將多個Observables的發射物結合到一塊兒,基於這個函數的結果爲每一個 結合體 發射單個數據項。

img-Zip

Zip 操做符與 Merge 相似,都是合併多個Observables的數據,返回一個Obversable,主要不一樣的是它使用這個函數按順序結合兩個或多個Observables發射的數據項,而後它發射這個函數返回的結果。它按照嚴格的順序應用這個函數。 它只發射與發射數據項最少的那個Observable同樣多的數據。

img-Zip-Sources

解析:

  1. Zip 操做符與 Merge 的使用上基本一致,主要不一樣的是 zip 發射的數據取決於發射數據項最少的那個Observable而且按照嚴格的順序去結合數據。
  2. 一樣具有靜態方法 zip 與對象方法 zipWith,能夠傳遞一個Observable列表 List ,數組,甚至是一個發射Observable序列的Observable。

使用上在此就不作詳細的展開了,可參照上面的 Merge 使用方法,下面就針對 zip 的特性實現一個簡單的實例。

實例代碼:

// 建立Observable
    Observable<Integer> observable1 = Observable.just(1, 2, 3);
    Observable<Integer> observable2 = Observable.just(1, 2, 3, 4, 5, 6);
    
    // zip(sources)
    // 可接受2-9個參數的Observable,對其進行順序合併操做,最終合併的數據項取決於最少的數據項的Observable
    Observable.zip(observable1, observable2, new BiFunction<Integer, Integer, String>() {
        @Override
        public String apply(Integer t1, Integer t2) throws Exception {
            System.out.println("--> apply: t1 = " + t1 + ", t2 = " + t2);
            return t1 + t2 + "";
        }
    }).subscribe(new Consumer<String>() {

        @Override
        public void accept(String s) throws Exception {
            System.out.println("--> accept: " + s);  // 最終接受observable1所有數據項與observable2相同數量順序部分數據
        }
    });

輸出:

--> apply: t1 = 1, t2 = 1
--> accept: 2
--> apply: t1 = 2, t2 = 2
--> accept: 4
--> apply: t1 = 3, t2 = 3
--> accept: 6

Javadoc: zip( source1, source2, ... , source9, zipper )
Javadoc: zip( Iterable sources, Function zipper )
Javadoc: zipIterable(Iterable<ObservableSource> sources, Function<Object[],R> zipper, boolean delayError, int bufferSize)
Javadoc: zipArray( Function<Object[]> zipper, boolean delayError, int bufferSize, ObservableSource... sources )
Javadoc: zip( ObservableSource<ObservableSource> sources, Function<Object[]> zipper )

5. StartWith

在數據序列的開頭插入一條指定的數據項或者數據序列。

img-StartWith
若是你想要一個Observable在發射數據以前先發射一個指定的數據或者數據序列(能夠是單個數據、數組、列表,Observable中的數據),可使 用 StartWith 操做符。(若是你想一個Observable發射的數據末尾追加一個數據序列可使用 Concat 操做符。)

img-StartWith-Items

實例代碼:

// 建立列表List
    List<Integer> lists = new ArrayList<>();
    lists.add(999);
    lists.add(9999);
    lists.add(99999);

    // 建立數組Array
    Integer[] arrays = new Integer[3];
    arrays[0] = 999;
    arrays[1] = 9999;
    arrays[2] = 9999;

    // 1. startWith(item)
    // 在Observable數據發射前發射item數據項
    Observable.just(1, 2, 3)
            .startWith(999)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(1): " + integer);
                }
            });

    System.out.println("-----------------------------------------");
    // 2. startWith(Iterable items)
    // 在Observable數據發射前發射items列表中的數據序列
    Observable.just(1, 2, 3)
            .startWith(lists)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(2): " + integer);
                }
            });

    System.out.println("-----------------------------------------");
    // 3. startWithArray(items)
    // 在Observable數據發射前發射items數組中的數據序列
    Observable.just(1, 2, 3)
            .startWithArray(arrays)
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(3): " + integer);
                }
            });

    System.out.println("-----------------------------------------");
    // 4. startWith(ObservableSource other)
    // 在Observable數據發射前發射other中的數據序列
    Observable.just(1, 2, 3)
            .startWith(Observable.just(999, 9999, 99999))
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    System.out.println("--> accept(4): " + integer);
                }
            });

輸出:

--> accept(1): 999
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
-----------------------------------------
--> accept(2): 999
--> accept(2): 9999
--> accept(2): 99999
--> accept(2): 1
--> accept(2): 2
--> accept(2): 3
-----------------------------------------
--> accept(3): 999
--> accept(3): 9999
--> accept(3): 9999
--> accept(3): 1
--> accept(3): 2
--> accept(3): 3
-----------------------------------------
--> accept(4): 999
--> accept(4): 9999
--> accept(4): 99999
--> accept(4): 1
--> accept(4): 2
--> accept(4): 3

Javadoc: startWith(item)
Javadoc: startWith(Iterable items)
Javadoc: startWithArray(items)
Javadoc: startWith(ObservableSource other)

6. SwitchOnNext

將一個發射多個Observables的Observable轉換成另外一個單獨的Observable,後者發射那些 Observables最近發射的數據項。

6.1 switchOnNext

switchOnNext 訂閱一個發射多個Observables的Observable。它每次觀察那些Observables中的一個, switchOnNext 發射的這個新Observable並取消訂閱前一個發射數據的舊Observable,開始發射最新的Observable發射的數據。

img-switchOnNext

注意: 當原始Observables發射了一個新的Observable時(不是這個新的Observable發射了一條數據時),它將取消訂閱以前的那個Observable。這意味着,在 後來那個Observable產生以後到它開始發射數據以前的這段時間裏,前一個Observable發射 的數據將被丟棄(就像圖例上的那個黃色圓圈同樣)。

6.2 switchOnNextDelayError

Observables發射一個新的Observable後,則會取消訂閱前面的舊observable,直接開始接受新Observable的數據,若是Observables中的Observable有 Error 異常,將保留 onError 通知直到其餘沒有Error的Observable全部的數據發射完成,在那時它纔會把 onError 傳遞給觀察者。

img-switchOnNextDelayError

注意: 若是有多個原始Observable出現了Error, 這些Error通知會被合併成一個 CompositeException ,保留在CompositeException 內部的 List<Throwable> exceptions 中,可是若是隻有一個原始Observable出現了Error,則不會生成 CompositeException ,只會發送這個Error通知。

實例代碼:

// 建立Observable
    Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
    Observable<Long> observable2 = Observable.intervalRange(10, 5, 1, 500, TimeUnit.MILLISECONDS);

    // 建立發射Observable序列的Observable
    Observable<Observable<Long>> sources = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {

        @Override
        public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
            emitter.onNext(observable1);
            Thread.sleep(1000);
            // 此時發射一個新的observable2,將會取消訂閱observable1
            emitter.onNext(observable2);
            emitter.onComplete();
        }
    });

    // 建立發射含有Error通知的Observable序列的Observable
    Observable<Observable<Long>> sourcesError = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {

        @Override
        public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
            emitter.onNext(observable1);
            emitter.onNext(Observable.error(new Exception("Error Test1!"))); // 發射一個發射Error通知的Observable
            emitter.onNext(Observable.error(new Exception("Error Test2!"))); // 發射一個發射Error通知的Observable
            Thread.sleep(1000);
            // 此時發射一個新的observable2,將會取消訂閱observable1
            emitter.onNext(observable2);
            emitter.onComplete();
        }
    });

    // 1. switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize)
    // 可選參數 bufferSize: 緩存數據項大小
    // 接受一個發射Observable序列的Observable類型的sources,
    // 當sources發射一個新的Observable後,則會取消訂閱前面的舊observable,直接開始接受新Observable的數據
    Observable.switchOnNext(sources)
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long integer) throws Exception {
                    System.out.println("--> accept(1): " + integer);
                }
            });

    System.in.read();
    System.out.println("--------------------------------------------------------------------");
    // 2. switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch)
    // 可選參數 prefetch: 與讀取數據項大小
    // 當sources發射一個新的Observable後,則會取消訂閱前面的舊observable,直接開始接受新Observable的數據,
    // 保留onError通知直到合併後的Observable全部的數據發射完成,在那時它纔會把onError傳遞給觀察者
    Observable.switchOnNextDelayError(sourcesError)
            .subscribe(new Observer<Long>() {

                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(2)");
                }

                @Override
                public void onNext(Long t) {
                    System.out.println("--> onNext(2): " + t);
                }

                @Override
                public void onError(Throwable e) {
                    // 判斷是不是CompositeException對象(發生多個Observable出現Error時會發送的對象)
                    if (e instanceof CompositeException) {
                        CompositeException compositeException = (CompositeException) e;
                        List<Throwable> exceptions = compositeException.getExceptions();
                        System.out.println("--> onError(2): " + exceptions);
                    } else {
                        System.out.println("--> onError(2): " + e);
                    }
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onComplete(2)");
                }
            });

    System.in.read();

輸出:

--> accept(1): 1
--> accept(1): 2
--> accept(1): 10
--> accept(1): 11
--> accept(1): 12
--> accept(1): 13
--> accept(1): 14
--------------------------------------------------------------------
--> onSubscribe(2)
--> onNext(2): 10
--> onNext(2): 11
--> onNext(2): 12
--> onNext(2): 13
--> onNext(2): 14
--> onError(2): [java.lang.Exception: Error Test1!, java.lang.Exception: Error Test2!]

Javadoc: switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize)
Javadoc: switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch)

小結

Rxjava 的合併操做符可以同時處理多個被觀察者,併發送相應的事件通知以及數據。經常應用於多業務合併處理場景,好比表單的聯動驗證,網絡交互性數據的校驗等,rxjava的合併操做符可以很好的去實現和處理。

提示:以上使用的Rxjava2版本: 2.2.12

Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例

實例代碼:

相關文章
相關標籤/搜索