博客主頁java
RxJava 的合併操做符主要包括以下幾個:segmentfault
RxJava 的鏈接操做符,主要是 ConnectableObservable 所使用的操做符和 Observable 所使用
的操做符:數組
合併多個 Observable 的發射物
merge 操做符能夠將多個 Observable 的輸出合井,使得它們就像是單個的 Observable 同樣。緩存
Observable.merge( Observable.just(1, 3, 5, 7, 9), Observable.just(2, 4, 6) ).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next: 1 Next: 3 Next: 5 Next: 7 Next: 9 Next: 2 Next: 4 Next: 6 Complete.
merge 是按照時間線並行的。若是傳遞給 merge 的任何一個 Observable 發射了 onError 通知終止,則 merge 操做符生成的 Observable 也會當即以 onError 通知終止。若是想讓它繼續發射數據,直到最後才報告錯誤,則可使用 mergeDelayError 操做符.
merge 操做符最多隻能合併 4 個被觀察者,若是須要合併更多個被觀察者,則可使用 mergeArray 操做符.併發
經過一個函數將多個 Observable 的發射物結合到一塊兒,基於這個函數的結果爲每一個結合體發射單個數據項app
zip 操做符返回一個 Obversable ,它使用這個函數按順序結合兩個或多個 Observable 發射的數據項,而後發射這個函數返回的結果。它按照嚴格的順序應用這個函數,只發射與發射數據項最少的那個 Observable 同樣多的數據。ide
zip 的最後一個參數接收每一個 Observable 發射的一項數據,返回被壓縮後的數據,它能夠接收 1~9 個參數:一個 Observable 序列 或者一些發射 Observable 的 Observable函數
Observable.zip( Observable.just(1, 3, 5, 7), Observable.just(2, 4, 6), new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { return integer + integer2; } } ).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 Next: 3 Next: 7 Next: 11 Complete.
zip 操做符相對於 merge 操做符,除發射數據外,還會進行合併操做,並且 zip 發射的數據與數據項最少的 Observable 有關。spa
這裏 BiFunction 至關於一個合併函數,並不必定要返回 Integer 類型,能夠根據業務須要返回合適的類型。 BiFunction 的源碼以下:線程
public interface BiFunction<T1, T2, R> { /** * Calculate a value based on the input values. * @param t1 the first value * @param t2 the second value * @return the result value * @throws Exception on error */ @NonNull R apply(@NonNull T1 t1, @NonNull T2 t2) throws Exception; }
RxJava 2.x FuncN 遵循 Java 8 的命名規範。相對於 RxJava l.x, Func 更名成 Function, Func2 更名成 BiFunction, Func3~ Func9 更名成 Function3~ Function9,FuncN 由 Function 取代
combineLatest 操做符的行爲相似於 zip,可是隻有當原始的 Observable 中的每個都發射了一條數據時 zip 才發射數據,而 combineLatest 是當原始的 Observable 中任意一個發射了數據時就發射一條數據。當原始 Observable 的任何一個發射了一條數據時, combineLatest 使用一個函數結合它們最近發射的數據,而後發射這個函數的返回值。
Observable.combineLatest( Observable.just(1, 3, 5), Observable.just(2, 4, 6), new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { Log.d(TAG, "integer: " + integer + " ## integer2: " + integer2); return integer + integer2; } } ).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 執行結果 integer: 5 ## integer2: 2 Next: 7 integer: 5 ## integer2: 4 Next: 9 integer: 5 ## integer2: 6 Next: 11 Complete.
join 操做符結合兩個 Observable 發射的數據,基於時間窗口(針對每條數據特定的原則)選擇待集合的數據項。將這些時間窗口實現爲一些 Observable ,它們的生命週期從任何一條 Observable 發射的每一條數據開始。當這個定義時間窗口的 Observable 發射了一條數據或者完成時,與這條數據關聯的窗口也會關閉。只要這條數據的窗口是打開的,它就繼續結合其餘 Observable 發射的任何數據項。
Observable<Integer> o1 = Observable.just(1, 2, 3); Observable<Integer> o2 = Observable.just(4, 5, 6); o1.join(o2, new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { return Observable.just(String.valueOf(integer)).delay(200, TimeUnit.MILLISECONDS); } }, new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { return Observable.just(String.valueOf(integer)).delay(200, TimeUnit.MILLISECONDS); } }, new BiFunction<Integer, Integer, String>() { @Override public String apply(Integer integer, Integer integer2) throws Exception { return integer + ":" + integer2; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } }); // 執行結果 Next: 1:4 Next: 2:4 Next: 3:4 Next: 1:5 Next: 2:5 Next: 3:5 Next: 1:6 Next: 2:6 Next: 3:6
join(Observable, Function, Function, BiFunction) 有四個參數:
的生命週期決定了源 Observable 發射數據的有效期。
的生命週期決定了目標 bservable 發射數據的有效期。
合後返回。
join 操做符的效果相似於排列組合,把第一個數據源 A 做爲基座窗口,它根據本身的節奏不斷髮射數據元素;第二個數據源 B,每發射一個數據,咱們都把它和第一個數據源 A 中己經發射的數據進行一對一匹配。舉例來講,若是某一時刻 B 發射了一個數據 "B",此時 A 己經發射了 a,b,c,d 共4個數據,那麼合併操做就是把 「B」 依次與 a,b,c,d 配對,獲得4組數據:[a, B], [b, B], [c, B], [d, B]
Observable<Integer> o1 = Observable.just(1, 2, 3).delay(200, TimeUnit.MILLISECONDS); Observable<Integer> o2 = Observable.just(4, 5, 6); o1.join(o2, new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { return Observable.just(String.valueOf(integer)).delay(200, TimeUnit.MILLISECONDS); } }, new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { return Observable.just(String.valueOf(integer)).delay(200, TimeUnit.MILLISECONDS); } }, new BiFunction<Integer, Integer, String>() { @Override public String apply(Integer integer, Integer integer2) throws Exception { return integer + ":" + integer2; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } }); // 執行結果 Next: 1:4 Next: 1:5 Next: 1:6 Next: 2:4 Next: 2:5 Next: 2:6 Next: 3:4 Next: 3:5 Next: 3:6
在數據序列的開頭插入一條指定的項
若是想讓 Observable 在發射數據以前先發射一個指定的數據序列,則可使用 startWith 操做符。若是想在一個 Observable 發射數據的末尾追加一個數據序列 ,則可使用 concat 操做符。
Observable.just("hello, java", "hello, kotlin") .startWith("hello, rxjava") .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } }); // 執行結果 Next: hello, rxjava Next: hello, java Next: hello, kotlin
startWith 操做符支持傳遞 Iterable, 同時還有一個 startWithArray 的操做符
Observable.just("hello, java", "hello, kotlin") .startWithArray("hello, rxjava", "hello, flutter") .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } }); // 執行結果 Next: hello, rxjava Next: hello, flutter Next: hello, java Next: hello, kotlin
使用了 startWithArray 操做符以後,能夠再使用 startWith 操做符。
Observable.just("hello, java", "hello, kotlin") .startWithArray("hello, rxjava", "hello, flutter") .startWith("hello, groovy") .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } }); // 執行結果 Next: hello, groovy Next: hello, rxjava Next: hello, flutter Next: hello, java Next: hello, kotlin
startWith 還能夠傳遞一個 Observable ,它會將那個 Observable 的發射物插在原始 Observable 發射的數據序列以前,而後把這個看成本身的發射物集合。
Observable.just("hello, java", "hello, kotlin") .startWithArray("hello, rxjava", "hello, flutter") .startWith(Observable.just("hello, groovy")) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } }); // 執行結果 Next: hello, groovy Next: hello, rxjava Next: hello, flutter Next: hello, java Next: hello, kotlin
connect 和 refCount 是 ConnectableObservable 所使用的操做符。
ConnectableObservable 繼承自 Observable ,然而它並非在調用 subscribe() 的時候發射數據,而是隻有對其使用 connect 操做符時它纔會發射數據,因此能夠用來更靈活地控制數據發射的時機。另外, ConnectableObservable 是 Hot Observable
push 操做符是將普通的 Observable 轉換成 ConnectableObservable
connect 操做符是用來觸發 ConnectableObservable 發射數據的 。咱們能夠等全部的觀察者都訂閱了 ConnectableObservable 以後再發射數據。
final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss"); Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS).take(6); ConnectableObservable<Long> connectableObservable = observable.publish(); connectableObservable.subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { Log.d(TAG, "Next#1: " + aLong + "->time:" + format.format(new Date())); } @Override public void onError(Throwable e) { Log.d(TAG, "Error: " + e); } @Override public void onComplete() { Log.d(TAG, "Complete#1."); } }); connectableObservable.delaySubscription(3, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { Log.d(TAG, "Next#2: " + aLong + "->time:" + format.format(new Date())); } @Override public void onError(Throwable e) { Log.d(TAG, "Error: " + e); } @Override public void onComplete() { Log.d(TAG, "Complete#2."); } }); connectableObservable.connect(); // 執行結果 Next#1: 0->time:09:27:44 Next#1: 1->time:09:27:45 Next#1: 2->time:09:27:46 Next#1: 3->time:09:27:47 Next#2: 3->time:09:27:47 Next#1: 4->time:09:27:48 Next#2: 4->time:09:27:48 Next#1: 5->time:09:27:49 Next#2: 5->time:09:27:49 Complete#1. Complete#2.
refCount 操做符是將 ConnectableObservable 轉換成普通的 Observable,同時又保持了 Hot Observable 的特性。當出現第一個訂閱者時, refCount 會調用 connect()。 每一個訂閱者每次都會接收到一樣的數據,可是當全部訂閱者都取消訂閱(dispose)時, refCount 會自動 dispose 上游 Observable
全部的訂閱者都取消訂閱後,則數據流中止。若是從新訂閱則數據流從新開始。若是不是全部的訂閱者都取消了訂閱,而是隻取消了部分。則部分訂閱者/觀察者從新開始訂閱時,不會從頭開始數據流。
final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss"); Observable<Long> obs = Observable.interval(1, TimeUnit.SECONDS).take(6); ConnectableObservable<Long> connectableObservable = obs.publish(); Observable<Long> obsRefCount = connectableObservable.refCount(); obs.subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { Log.d(TAG, "Next#1: " + aLong + "->time:" + format.format(new Date())); } @Override public void onError(Throwable e) { Log.d(TAG, "Error: " + e); } @Override public void onComplete() { Log.d(TAG, "Complete#1."); } }); obs.delaySubscription(3, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { Log.d(TAG, "Next#2: " + aLong + "->time:" + format.format(new Date())); } @Override public void onError(Throwable e) { Log.d(TAG, "Error: " + e); } @Override public void onComplete() { Log.d(TAG, "Complete#2."); } }); obsRefCount.subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { Log.d(TAG, "Next#3: " + aLong + "->time:" + format.format(new Date())); } @Override public void onError(Throwable e) { Log.d(TAG, "Error: " + e); } @Override public void onComplete() { Log.d(TAG, "Complete#3."); } }); obsRefCount.delaySubscription(3, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { Log.d(TAG, "Next#4: " + aLong + "->time:" + format.format(new Date())); } @Override public void onError(Throwable e) { Log.d(TAG, "Error: " + e); } @Override public void onComplete() { Log.d(TAG, "Complete#4."); } }); // 執行結果 Next#1: 0->time:09:39:50 Next#3: 0->time:09:39:50 Next#1: 1->time:09:39:51 Next#3: 1->time:09:39:51 Next#1: 2->time:09:39:52 Next#3: 2->time:09:39:52 Next#1: 3->time:09:39:53 Next#3: 3->time:09:39:53 Next#4: 3->time:09:39:53 Next#2: 0->time:09:39:53 Next#1: 4->time:09:39:54 Next#3: 4->time:09:39:54 Next#4: 4->time:09:39:54 Next#2: 1->time:09:39:54 Next#3: 5->time:09:39:55 Next#4: 5->time:09:39:55 Complete#3. Next#1: 5->time:09:39:55 Complete#4. Complete#1. Next#2: 2->time:09:39:55 Next#2: 3->time:09:39:56 Next#2: 4->time:09:39:57 Next#2: 5->time:09:39:58 Complete#2.
保證全部的觀察者收到相同的數據序列,即便它們 Observable 開始發射數據以後才訂閱
replay 操做符返回一個 ConnectableObservable 對象,而且能夠緩存發射過的數據,這樣即便有訂閱者在發射數據以後進行訂閱,也能收到以前發射過的數據。不過使用 replay 操做符最好仍是先限定緩存的大小,不然緩存的數據太多時會佔用很大一塊內存。對緩存的控制能夠從空間和時間兩個方面來實現。
replay 操做符生成的 ConnectableObservable ,使得觀察者不管何時開始訂閱,都能收到 Observable 發送的全部數據
final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss"); Observable<Long> obs = Observable.interval(1, TimeUnit.SECONDS).take(6); ConnectableObservable<Long> connectableObservable = obs.replay(); connectableObservable.connect(); connectableObservable.subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { Log.d(TAG, "Next#1: " + aLong + "->time:" + format.format(new Date())); } @Override public void onError(Throwable e) { Log.d(TAG, "Error: " + e); } @Override public void onComplete() { Log.d(TAG, "Complete#1."); } }); connectableObservable.delaySubscription(3, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { Log.d(TAG, "Next#2: " + aLong + "->time:" + format.format(new Date())); } @Override public void onError(Throwable e) { Log.d(TAG, "Error: " + e); } @Override public void onComplete() { Log.d(TAG, "Complete#2."); } }); // 執行結果 Next#1: 0->time:09:59:01 Next#1: 1->time:09:59:02 Next#1: 2->time:09:59:03 Next#2: 0->time:09:59:03 Next#2: 1->time:09:59:03 Next#2: 2->time:09:59:03 Next#1: 3->time:09:59:04 Next#2: 3->time:09:59:04 Next#1: 4->time:09:59:05 Next#2: 4->time:09:59:05 Next#1: 5->time:09:59:06 Next#2: 5->time:09:59:06 Complete#1. Complete#2.
connect() 無須在觀察者訂閱以後調用也能執行.
replay 有多個接收不一樣參數的重載方法,有的能夠指定 replay 的最大緩存數量,有的能夠指定調度器。
ConnectableObservable 的線程切換隻能經過 replay 操做符實現,普通 Observable 的subscribeOn() 和 observerOn() 在 ConnectableObservable 中不起做用。 replay 操做符能夠經過指定線程的方式來切換線程。
若是個人文章對您有幫助,不妨點個贊鼓勵一下(^_^)