目錄html
需求瞭解:java
在使用 RxJava
開發的過程當中,不少時候須要結合多個條件或者數據的邏輯判斷,好比登陸功能的表單驗證,實時數據比對等。這個時候咱們就須要使用 RxJava 的結合操做符來完成這一需求,Rx中提供了豐富的結合操做處理的操做方法。react
可用於組合多個Observables的操做方法:git
當 Observables 中的任何一個發射了數據時,使用一個函數結合每一個 Observable 發射的最近數據項,而且基於這個函數的結果發射數據。github
CombineLatest
操做符行爲相似於zip
,可是隻有當原始的Observable中的每個都發射了一條數據時 zip 才發射數據。 CombineLatest
則在原始的Observable中任意一個發射了數據時發射一條數據。當原始Observables的任何一個發射了一條數據時, CombineLatest 使用一 個函數結合它們最近發射的數據,而後發射這個函數的返回值。api
解析: combineLatest
操做符能夠結合多個Observable,能夠接收 2-9 個Observable對象, 在其中原始Observables的任何一個發射了一條數據時, CombineLatest 使用一個函數結合它們最近發射的數據,而後發射這個函數的返回值。此外combineLatest
操做符還有一些接收 Iterable , 數組方式的變體,以及其餘指定參數combiner、bufferSize、和combineLatestDelayError方法等變體,在此就不在詳細展開了,有興趣的能夠查看官方的相關API文檔瞭解。數組
實例代碼:緩存
// Observables 建立 Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS); Observable<Long> observable2 = Observable.intervalRange(1, 5, 1, 2, TimeUnit.SECONDS); Observable<Long> observable3 = Observable.intervalRange(100, 5, 1, 1, TimeUnit.SECONDS); // 1. combineLatest(ObservableSource, ObservableSource [支持2-9個參數]..., BiFunction) // 結合多個Observable, 當他們其中任意一個發射了數據時,使用函數結合他們最近發射的一項數據 Observable.combineLatest(observable1, observable2, new BiFunction<Long, Long, String>() { @Override public String apply(Long t1, Long t2) throws Exception { System.out.println("--> apply(1) t1 = " + t1 + ", t2 = " + t2); if (t1 + t2 == 10) { return "Success"; // 知足必定條件,返回指定的字符串 } return t1 + t2 + ""; // 計算全部數據的和並轉換爲字符串 } }).subscribe(new Consumer<String>() { @Override public void accept(String t) throws Exception { System.out.println("----> accept combineLatest(1): " + t); } }); System.out.println("--------------------------------------------------------"); // 2. combineLatest(T1, T2, T3, Function) // Observables的結合 Observable.combineLatest(observable1, observable2, observable3, new Function3<Long, Long, Long, String>() { @Override public String apply(Long t1, Long t2, Long t3) throws Exception { System.out.println("--> apply(2): t1 = " + t1 + ", t2 = " + t2 + ", t3 = " + t3); return t1 + t2 + t3 + ""; // 計算全部數據的和並轉換爲字符串 } }).subscribe(new Consumer<String>() { @Override public void accept(String t) throws Exception { System.out.println("--> accept(2): " + t); } });
輸出:網絡
--> apply(1) t1 = 1, t2 = 1 ----> accept combineLatest(1): 2 --> apply(1) t1 = 2, t2 = 1 ----> accept combineLatest(1): 3 --> apply(1) t1 = 3, t2 = 1 ----> accept combineLatest(1): 4 --> apply(1) t1 = 3, t2 = 2 ----> accept combineLatest(1): 5 --> apply(1) t1 = 4, t2 = 2 ----> accept combineLatest(1): 6 --> apply(1) t1 = 4, t2 = 3 ----> accept combineLatest(1): 7 --> apply(1) t1 = 5, t2 = 3 ----> accept combineLatest(1): 8 --> apply(1) t1 = 5, t2 = 4 ----> accept combineLatest(1): 9 --> apply(1) t1 = 5, t2 = 5 ----> accept combineLatest(1): Success -------------------------------------------------------- --> apply(2): t1 = 1, t2 = 1, t3 = 100 --> accept(2): 102 --> apply(2): t1 = 2, t2 = 1, t3 = 100 --> accept(2): 103 --> apply(2): t1 = 2, t2 = 1, t3 = 101 --> accept(2): 104 --> apply(2): t1 = 2, t2 = 2, t3 = 101 --> accept(2): 105 --> apply(2): t1 = 3, t2 = 2, t3 = 101 --> accept(2): 106 --> apply(2): t1 = 3, t2 = 2, t3 = 102 --> accept(2): 107 --> apply(2): t1 = 4, t2 = 2, t3 = 102 --> accept(2): 108 --> apply(2): t1 = 4, t2 = 2, t3 = 103 --> accept(2): 109 --> apply(2): t1 = 5, t2 = 2, t3 = 103 --> accept(2): 110 --> apply(2): t1 = 5, t2 = 3, t3 = 103 --> accept(2): 111 --> apply(2): t1 = 5, t2 = 3, t3 = 104 --> accept(2): 112 --> apply(2): t1 = 5, t2 = 4, t3 = 104 --> accept(2): 113 --> apply(2): t1 = 5, t2 = 5, t3 = 104 --> accept(2): 114
Javadoc: combineLatest(T1, T2, T3... , T9, combiner)併發
任什麼時候候,只要在另外一個Observable發射的數據定義的時間窗口內,這個Observable發射了一條數據,就結合兩個Observable發射的數據。
Join
操做符結合兩個Observable發射的數據,基於時間窗口(你定義的針對每條數據特定的原則)選擇待集合的數據項。你將這些時間窗口實現爲一些Observables,它們的生命週期從任何一條Observable發射的每一條數據開始。當這個定義時間窗口的Observable發射了一條數據或者完成時,與這條數據關聯的窗口也會關閉。只要這條數據的窗口是打開的,它將繼續結合其它Observable發射的任何數據項。你定義一個用於結合數據的函數。
解析: join(other, leftEnd, rightEnd, resultSelector)
相關參數的解析
注意: 這是源Observable和目標Observable發射數據在任意一個基於時間窗口的有效期內纔會接收到組合數據,這就意味着可能有數據丟失的狀況,在其中一個已經發射完全部數據,而且沒有處於時間窗口的數據狀況,另外一個Observable的數據發射將不會收到組合數據。
示例代碼:
// Observable的建立 Observable<Long> sourceObservable = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS); Observable<Long> targetObservable = Observable.intervalRange(10, 5, 1, 1000, TimeUnit.MILLISECONDS); // 1. join(other, leftEnd, rightEnd, resultSelector) // other: 目標組合的Observable // leftEnd: 接收一個源數據項,返回一個Observable,這個Observable的生命週期就是源Observable發射數據的有效期 // rightEnd: 接收一個源數據項,返回一個Observable,這個Observable的生命週期就是目標Observable發射數據的有效期 // resultSelector: 接收源Observable和目標Observable發射的數據項, 處理後的數據返回給觀察者對象 sourceObservable.join(targetObservable, new Function<Long, ObservableSource<Long>>() { @Override public ObservableSource<Long> apply(Long t) throws Exception { System.out.println("-----> t1 is emitter: " + t); return Observable.timer(1000, TimeUnit.MILLISECONDS); // 源Observable發射數據的有效期爲1000毫秒 } }, new Function<Long, ObservableSource<Long>>() { @Override public ObservableSource<Long> apply(Long t) throws Exception { System.out.println("-----> t2 is emitter: " + t); return Observable.timer(1000, TimeUnit.MILLISECONDS); // 目標Observable發射數據的有效期爲1000毫秒 } }, new BiFunction<Long, Long, String>() { @Override public String apply(Long t1, Long t2) throws Exception { return "t1 = " + t1 + ", t2 = " + t2; // 對數據進行組合後返回和觀察者 } }).subscribe(new Consumer<String>() { @Override public void accept(String t) throws Exception { System.out.println("--> accept(1): " + t); } }); System.in.read();
輸出:
-----> t1 is emitter: 1 -----> t2 is emitter: 10 --> accept(1): t1 = 1, t2 = 10 -----> t1 is emitter: 2 --> accept(1): t1 = 2, t2 = 10 -----> t1 is emitter: 3 --> accept(1): t1 = 3, t2 = 10 -----> t2 is emitter: 11 --> accept(1): t1 = 1, t2 = 11 --> accept(1): t1 = 2, t2 = 11 --> accept(1): t1 = 3, t2 = 11 -----> t1 is emitter: 4 --> accept(1): t1 = 4, t2 = 11 -----> t1 is emitter: 5 --> accept(1): t1 = 5, t2 = 11 -----> t2 is emitter: 12 --> accept(1): t1 = 3, t2 = 12 --> accept(1): t1 = 4, t2 = 12 --> accept(1): t1 = 5, t2 = 12 -----> t2 is emitter: 13 --> accept(1): t1 = 5, t2 = 13 -----> t2 is emitter: 14 // 此時源t1中已經沒有數據還處於時間窗口有效期內
groupJoin
groupJoin
操做符與 join
相同,只是參數傳遞有所區別。groupJoin(other, leftEnd, rightEnd, resultSelector) 中的resultSelector
能夠將原始數據轉換爲 Observable 類型的數據發送給觀察者。
示例代碼:
// Observable的建立 Observable<Long> sourceObservable = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS); Observable<Long> targetObservable = Observable.intervalRange(10, 5, 1, 1000, TimeUnit.MILLISECONDS); // 2. groupJoin(other, leftEnd, rightEnd, resultSelector) // groupJoin操做符與join相同,只是參數傳遞有所區別。 // resultSelector能夠將原始數據轉換爲Observable類型的數據發送給觀察者。 sourceObservable.groupJoin(targetObservable, new Function<Long, ObservableSource<Long>>() { @Override public ObservableSource<Long> apply(Long t) throws Exception { System.out.println("-----> t1 is emitter: " + t); return Observable.timer(1000, TimeUnit.MILLISECONDS); // 源Observable發射數據的有效期爲1000毫秒 } }, new Function<Long, ObservableSource<Long>>() { @Override public ObservableSource<Long> apply(Long t) throws Exception { System.out.println("-----> t2 is emitter: " + t); return Observable.timer(1000, TimeUnit.MILLISECONDS); // 目標Observable發射數據的有效期爲1000毫秒 } }, new BiFunction<Long, Observable<Long>, Observable<String>>() { @Override public Observable<String> apply(Long t1, Observable<Long> t2) throws Exception { System.out.println("--> apply(2) combine: " + t1); // 結合操做 return t2.map(new Function<Long, String>() { @Override public String apply(Long t) throws Exception { System.out.println("-----> apply(2) operation: " + t); return "t1 = " + t1 + ", t2 = " + t; } }); } }).subscribe(new Consumer<Observable<String>>() { @Override public void accept(Observable<String> stringObservable) throws Exception { stringObservable.subscribe(new Consumer<String>() { @Override public void accept(String t) throws Exception { System.out.println("--> accept(2): " + t); } }); } });
輸出:
-----> t1 is emitter: 1 --> apply(2) combine: 1 -----> t2 is emitter: 10 -----> apply(2) operation: 10 --> accept(2): t1 = 1, t2 = 10 -----> t1 is emitter: 2 --> apply(2) combine: 2 -----> apply(2) operation: 10 --> accept(2): t1 = 2, t2 = 10 -----> t1 is emitter: 3 --> apply(2) combine: 3 -----> apply(2) operation: 10 --> accept(2): t1 = 3, t2 = 10 -----> t2 is emitter: 11 -----> apply(2) operation: 11 --> accept(2): t1 = 1, t2 = 11 -----> apply(2) operation: 11 --> accept(2): t1 = 2, t2 = 11 -----> apply(2) operation: 11 --> accept(2): t1 = 3, t2 = 11 -----> t1 is emitter: 4 --> apply(2) combine: 4 -----> apply(2) operation: 11 --> accept(2): t1 = 4, t2 = 11 -----> t1 is emitter: 5 --> apply(2) combine: 5 -----> apply(2) operation: 11 --> accept(2): t1 = 5, t2 = 11 -----> t2 is emitter: 12 -----> apply(2) operation: 12 --> accept(2): t1 = 3, t2 = 12 -----> apply(2) operation: 12 --> accept(2): t1 = 4, t2 = 12 -----> apply(2) operation: 12 --> accept(2): t1 = 5, t2 = 12 -----> t2 is emitter: 13 -----> apply(2) operation: 13 --> accept(2): t1 = 5, t2 = 13 -----> t2 is emitter: 14
Javadoc: groupJoin(other, leftEnd, rightEnd, resultSelector)
合併多個Observables的發射物。
使用 Merge
操做符你能夠將多個Observables的輸出合併,就好像它們是一個單個的 Observable 同樣。
Merge 可能會讓合併的Observables發射的數據交錯(有一個相似的操做符 Concat
不會讓數據交錯,它會按順序一個接着一個發射多個Observables的發射物),任何一個原始Observable的 onError
通知會被當即傳遞給觀察者,並且會終止合併後的Observable。
除了傳遞多個Observable給 merge ,你還能夠傳遞一個Observable列表 List ,數組,甚至是一個發射Observable序列的Observable, merge 將合併它們的輸出做爲單個Observable的輸出。
若是你傳遞一個發射Observables序列的Observable,你能夠指定 merge 應該同時訂閱的 Observable 的最大數量。一旦達到訂閱數的限制,它將再也不訂閱原始Observable發射的任何其它Observable,直到某個已經訂閱的Observable發射了 onCompleted 通知。
示例代碼:
// 建立Observable對象 Observable<Integer> odd = Observable.just(1, 3, 5); Observable<Integer> even = Observable.just(2, 4, 6); Observable<Integer> big = Observable.just(188888, 688888, 888888); // 建立list對象 List<Observable<Integer>> list = new ArrayList<>(); list.add(odd); list.add(even); list.add(big); // 建立Array對象 Observable<Integer>[] observables = new Observable[3]; observables[0] = odd; observables[1] = even; observables[2] = big; // 建立發射Observable序列的Observable Observable<ObservableSource<Integer>> sources = Observable.create(new ObservableOnSubscribe<ObservableSource<Integer>>() { @Override public void subscribe(ObservableEmitter<ObservableSource<Integer>> emitter) throws Exception { emitter.onNext(Observable.just(1)); emitter.onNext(Observable.just(1, 2)); emitter.onNext(Observable.just(1, 2, 3)); emitter.onNext(Observable.just(1, 2, 3, 4)); emitter.onNext(Observable.just(1, 2, 3, 4, 5)); emitter.onComplete(); } }); // 1. merge(ObservableSource source1, ObservableSource source2, ..., ObservableSource source4) // 可接受 2-4 個Observable對象進行merge Observable.merge(odd, even) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(1): " + integer); } }); System.out.println("-----------------------------------------------"); // 2. merge(Iterable<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int bufferSize) // 可選參數, maxConcurrency: 最大的併發處理數, bufferSize: 緩存的數量(從每一個內部觀察資源預取的項數) // 接受一個Observable的列表List Observable.merge(list) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(2): " + integer); } }); System.out.println("-----------------------------------------------"); // 3. mergeArray(int maxConcurrency, int bufferSize, ObservableSource<? extends T>... sources) // 可選參數, maxConcurrency: 最大的併發處理數, bufferSize: 緩存的數量(從每一個內部觀察資源預取的項數) // 接受一個Observable的數組Array Observable.mergeArray(observables) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(3): " + integer); } }); System.out.println("-----------------------------------------------"); // 4. merge(ObservableSource<? extends ObservableSource<? extends T>> sources, int maxConcurrency) // 可選參數, maxConcurrency: 最大的併發處理數 // 接受一個發射Observable序列的Observable Observable.merge(sources) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(4): " + integer); } }); System.out.println("-----------------------------------------------"); // 5. mergeWith(other) // merge 是靜態方法, mergeWith 是對象方法: Observable.merge(odd,even) 等價於 odd.mergeWith(even) odd.mergeWith(even) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(5): " + integer); } });
輸出:
--> accept(1): 1 --> accept(1): 3 --> accept(1): 5 --> accept(1): 2 --> accept(1): 4 --> accept(1): 6 ----------------------------------------------- --> accept(2): 1 --> accept(2): 3 --> accept(2): 5 --> accept(2): 2 --> accept(2): 4 --> accept(2): 6 --> accept(2): 188888 --> accept(2): 688888 --> accept(2): 888888 ----------------------------------------------- --> accept(3): 1 --> accept(3): 3 --> accept(3): 5 --> accept(3): 2 --> accept(3): 4 --> accept(3): 6 --> accept(3): 188888 --> accept(3): 688888 --> accept(3): 888888 ----------------------------------------------- --> accept(4): 1 --> accept(4): 1 --> accept(4): 2 --> accept(4): 1 --> accept(4): 2 --> accept(4): 3 --> accept(4): 1 --> accept(4): 2 --> accept(4): 3 --> accept(4): 4 --> accept(4): 1 --> accept(4): 2 --> accept(4): 3 --> accept(4): 4 --> accept(4): 5 ----------------------------------------------- --> accept(5): 1 --> accept(5): 3 --> accept(5): 5 --> accept(5): 2 --> accept(5): 4 --> accept(5): 6
Javadoc: merge(source1, ... , source4)
Javadoc: merge(Iterable sources, int maxConcurrency, int bufferSize)
Javadoc: mergeArray(int maxConcurrency, int bufferSize, ObservableSource... sources)
Javadoc: merge(ObservableSourcesources, int maxConcurrency)
若是傳遞給 merge 的任何一個的Observable發射了 onError
通知終止了, merge 操做符生成的Observable也會當即以onError
通知終止。若是你想讓它繼續發射數據,在最後才報告錯誤,可使用 mergeDelayError
。
MergeDelayError
操做符,mergeDelayError 在合併與交錯輸出的使用上與 merge
相同,區別在於它會保留 onError
通知直到其餘沒有Error的Observable全部的數據發射完成,在那時它纔會把onError
傳遞給觀察者。
注意: 若是有多個原始Observable出現了Error
, 這些Error通知會被合併成一個 CompositeException
,保留在CompositeException 內部的 List<Throwable> exceptions
中,可是若是隻有一個原始Observable出現了Error,則不會生成 CompositeException ,只會發送這個Error通知。
因爲MergeDelayError
使用上和merge
相同 ,因此這裏就不作詳細分析了,這裏就簡單描述其中的一種的使用實例。
實例代碼:
// 建立有Error的Observable序列的Observable Observable<ObservableSource<Integer>> DelayErrorObservable = Observable.create(new ObservableOnSubscribe<ObservableSource<Integer>>() { @Override public void subscribe(ObservableEmitter<ObservableSource<Integer>> emitter) throws Exception { emitter.onNext(Observable.just(1)); emitter.onNext(Observable.error(new Exception("Error Test1"))); // 發射一個Error的通知的Observable emitter.onNext(Observable.just(2, 3)); emitter.onNext(Observable.error(new Exception("Error Test2"))); // 發射一個Error的通知的Observable emitter.onNext(Observable.just(4, 5, 6)); emitter.onComplete(); } }); // 6. mergeDelayError // 保留onError通知直到合併後的Observable全部的數據發射完成,在那時它纔會把onError傳遞給觀察者 Observable.mergeDelayError(DelayErrorObservable) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(6)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(6): " + integer); } @Override public void onError(Throwable e) { // 判斷是不是CompositeException對象(發生多個Observable出現Error時會發送的對象) if (e instanceof CompositeException) { CompositeException compositeException = (CompositeException) e; List<Throwable> exceptions = compositeException.getExceptions(); System.out.println("--> onError(6): " + exceptions); } else { System.out.println("--> onError(6): " + e); } } @Override public void onComplete() { System.out.println("--> onComplete(6)"); } });
輸出:
--> onSubscribe(6) --> onNext(6): 1 --> onNext(6): 2 --> onNext(6): 3 --> onNext(6): 4 --> onNext(6): 5 --> onNext(6): 6 --> onError(6): [java.lang.Exception: Error Test1, java.lang.Exception: Error Test2]
Javadoc: mergeDelayError(source1, … , source4)
Javadoc: mergeDelayError(Iterable sources, int maxConcurrency, int bufferSize)
Javadoc: mergeArrayDelayError(int maxConcurrency, int bufferSize, ObservableSource… sources)
Javadoc: mergeDelayError(ObservableSource sources, int maxConcurrency)
經過一個函數將多個Observables的發射物結合到一塊兒,基於這個函數的結果爲每一個 結合體 發射單個數據項。
Zip
操做符與 Merge
相似,都是合併多個Observables的數據,返回一個Obversable,主要不一樣的是它使用這個函數按順序結合兩個或多個Observables發射的數據項,而後它發射這個函數返回的結果。它按照嚴格的順序應用這個函數。 它只發射與發射數據項最少的那個Observable同樣多的數據。
解析:
Zip
操做符與 Merge
的使用上基本一致,主要不一樣的是 zip 發射的數據取決於發射數據項最少的那個Observable而且按照嚴格的順序去結合數據。zip
與對象方法 zipWith
,能夠傳遞一個Observable列表 List ,數組,甚至是一個發射Observable序列的Observable。使用上在此就不作詳細的展開了,可參照上面的 Merge
使用方法,下面就針對 zip
的特性實現一個簡單的實例。
實例代碼:
// 建立Observable Observable<Integer> observable1 = Observable.just(1, 2, 3); Observable<Integer> observable2 = Observable.just(1, 2, 3, 4, 5, 6); // zip(sources) // 可接受2-9個參數的Observable,對其進行順序合併操做,最終合併的數據項取決於最少的數據項的Observable Observable.zip(observable1, observable2, new BiFunction<Integer, Integer, String>() { @Override public String apply(Integer t1, Integer t2) throws Exception { System.out.println("--> apply: t1 = " + t1 + ", t2 = " + t2); return t1 + t2 + ""; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println("--> accept: " + s); // 最終接受observable1所有數據項與observable2相同數量順序部分數據 } });
輸出:
--> apply: t1 = 1, t2 = 1 --> accept: 2 --> apply: t1 = 2, t2 = 2 --> accept: 4 --> apply: t1 = 3, t2 = 3 --> accept: 6
Javadoc: zip( source1, source2, ... , source9, zipper )
Javadoc: zip( Iterable sources, Function zipper )
Javadoc: zipIterable(Iterable<ObservableSource> sources, Function<Object[],R> zipper, boolean delayError, int bufferSize)
Javadoc: zipArray( Function<Object[]> zipper, boolean delayError, int bufferSize, ObservableSource... sources )
Javadoc: zip( ObservableSource<ObservableSource> sources, Function<Object[]> zipper )
在數據序列的開頭插入一條指定的數據項或者數據序列。
若是你想要一個Observable在發射數據以前先發射一個指定的數據或者數據序列(能夠是單個數據、數組、列表,Observable中的數據),可使 用 StartWith
操做符。(若是你想一個Observable發射的數據末尾追加一個數據序列可使用 Concat
操做符。)
實例代碼:
// 建立列表List List<Integer> lists = new ArrayList<>(); lists.add(999); lists.add(9999); lists.add(99999); // 建立數組Array Integer[] arrays = new Integer[3]; arrays[0] = 999; arrays[1] = 9999; arrays[2] = 9999; // 1. startWith(item) // 在Observable數據發射前發射item數據項 Observable.just(1, 2, 3) .startWith(999) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(1): " + integer); } }); System.out.println("-----------------------------------------"); // 2. startWith(Iterable items) // 在Observable數據發射前發射items列表中的數據序列 Observable.just(1, 2, 3) .startWith(lists) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(2): " + integer); } }); System.out.println("-----------------------------------------"); // 3. startWithArray(items) // 在Observable數據發射前發射items數組中的數據序列 Observable.just(1, 2, 3) .startWithArray(arrays) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(3): " + integer); } }); System.out.println("-----------------------------------------"); // 4. startWith(ObservableSource other) // 在Observable數據發射前發射other中的數據序列 Observable.just(1, 2, 3) .startWith(Observable.just(999, 9999, 99999)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(4): " + integer); } });
輸出:
--> accept(1): 999 --> accept(1): 1 --> accept(1): 2 --> accept(1): 3 ----------------------------------------- --> accept(2): 999 --> accept(2): 9999 --> accept(2): 99999 --> accept(2): 1 --> accept(2): 2 --> accept(2): 3 ----------------------------------------- --> accept(3): 999 --> accept(3): 9999 --> accept(3): 9999 --> accept(3): 1 --> accept(3): 2 --> accept(3): 3 ----------------------------------------- --> accept(4): 999 --> accept(4): 9999 --> accept(4): 99999 --> accept(4): 1 --> accept(4): 2 --> accept(4): 3
Javadoc: startWith(item)
Javadoc: startWith(Iterable items)
Javadoc: startWithArray(items)
Javadoc: startWith(ObservableSource other)
將一個發射多個Observables的Observable轉換成另外一個單獨的Observable,後者發射那些 Observables最近發射的數據項。
switchOnNext
訂閱一個發射多個Observables的Observable。它每次觀察那些Observables中的一個, switchOnNext
發射的這個新Observable並取消訂閱前一個發射數據的舊Observable,開始發射最新的Observable發射的數據。
注意: 當原始Observables發射了一個新的Observable時(不是這個新的Observable發射了一條數據時),它將取消訂閱以前的那個Observable。這意味着,在 後來那個Observable產生以後到它開始發射數據以前的這段時間裏,前一個Observable發射 的數據將被丟棄(就像圖例上的那個黃色圓圈同樣)。
當Observables
發射一個新的Observable後,則會取消訂閱前面的舊observable,直接開始接受新Observable的數據,若是Observables中的Observable有 Error
異常,將保留 onError
通知直到其餘沒有Error的Observable全部的數據發射完成,在那時它纔會把 onError 傳遞給觀察者。
注意: 若是有多個原始Observable出現了Error
, 這些Error通知會被合併成一個 CompositeException
,保留在CompositeException 內部的 List<Throwable> exceptions
中,可是若是隻有一個原始Observable出現了Error,則不會生成 CompositeException ,只會發送這個Error通知。
實例代碼:
// 建立Observable Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS); Observable<Long> observable2 = Observable.intervalRange(10, 5, 1, 500, TimeUnit.MILLISECONDS); // 建立發射Observable序列的Observable Observable<Observable<Long>> sources = Observable.create(new ObservableOnSubscribe<Observable<Long>>() { @Override public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception { emitter.onNext(observable1); Thread.sleep(1000); // 此時發射一個新的observable2,將會取消訂閱observable1 emitter.onNext(observable2); emitter.onComplete(); } }); // 建立發射含有Error通知的Observable序列的Observable Observable<Observable<Long>> sourcesError = Observable.create(new ObservableOnSubscribe<Observable<Long>>() { @Override public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception { emitter.onNext(observable1); emitter.onNext(Observable.error(new Exception("Error Test1!"))); // 發射一個發射Error通知的Observable emitter.onNext(Observable.error(new Exception("Error Test2!"))); // 發射一個發射Error通知的Observable Thread.sleep(1000); // 此時發射一個新的observable2,將會取消訂閱observable1 emitter.onNext(observable2); emitter.onComplete(); } }); // 1. switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize) // 可選參數 bufferSize: 緩存數據項大小 // 接受一個發射Observable序列的Observable類型的sources, // 當sources發射一個新的Observable後,則會取消訂閱前面的舊observable,直接開始接受新Observable的數據 Observable.switchOnNext(sources) .subscribe(new Consumer<Long>() { @Override public void accept(Long integer) throws Exception { System.out.println("--> accept(1): " + integer); } }); System.in.read(); System.out.println("--------------------------------------------------------------------"); // 2. switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch) // 可選參數 prefetch: 與讀取數據項大小 // 當sources發射一個新的Observable後,則會取消訂閱前面的舊observable,直接開始接受新Observable的數據, // 保留onError通知直到合併後的Observable全部的數據發射完成,在那時它纔會把onError傳遞給觀察者 Observable.switchOnNextDelayError(sourcesError) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(2)"); } @Override public void onNext(Long t) { System.out.println("--> onNext(2): " + t); } @Override public void onError(Throwable e) { // 判斷是不是CompositeException對象(發生多個Observable出現Error時會發送的對象) if (e instanceof CompositeException) { CompositeException compositeException = (CompositeException) e; List<Throwable> exceptions = compositeException.getExceptions(); System.out.println("--> onError(2): " + exceptions); } else { System.out.println("--> onError(2): " + e); } } @Override public void onComplete() { System.out.println("--> onComplete(2)"); } }); System.in.read();
輸出:
--> accept(1): 1 --> accept(1): 2 --> accept(1): 10 --> accept(1): 11 --> accept(1): 12 --> accept(1): 13 --> accept(1): 14 -------------------------------------------------------------------- --> onSubscribe(2) --> onNext(2): 10 --> onNext(2): 11 --> onNext(2): 12 --> onNext(2): 13 --> onNext(2): 14 --> onError(2): [java.lang.Exception: Error Test1!, java.lang.Exception: Error Test2!]
Javadoc: switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize)
Javadoc: switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch)
Rxjava 的合併操做符可以同時處理多個被觀察者,併發送相應的事件通知以及數據。經常應用於多業務合併處理場景,好比表單的聯動驗證,網絡交互性數據的校驗等,rxjava的合併操做符可以很好的去實現和處理。
提示:以上使用的Rxjava2版本: 2.2.12
Rx介紹與講解及完整目錄參考:Rxjava2 介紹與詳解實例
實例代碼: