RxJava(八):合併操做符和鏈接操做符

博客主頁java

RxJava 的合併操做符主要包括以下幾個:segmentfault

  • startWith :在數據序列的開頭增長一項數據
  • merge :將多個 Observable 合併爲一個
  • mergeDelayError :合併多個 Observable,讓沒有錯誤的 Observable 都完成後再發射錯誤通知
  • zip :使用一個函數組合多個 Observable 發射的數據集合,而後再發射這個結果
  • combineLatest :當兩個 Observable 中的任何一個發射了一個數據時,經過一個指定的函數組合每一個 Observable 發射的最新數據( 一共兩個數據),而後發射這個函數的結果
  • join and groupJoin :不管什麼時候,若是一個 Observable 發射了一個數據項,就須要在另外一個 Observable 發射的數據項定義的時間窗口內,將兩個 Observable 發射的數據合併發射
  • switchOnNext :將一個發射 Observable 的 Observable 轉換成另外一個 Observable,後者發射這些 Observable 最近發射的數據

RxJava 的鏈接操做符,主要是 ConnectableObservable 所使用的操做符和 Observable 所使用
的操做符:數組

  • ConnectableObservable.connect : 指示一個可鏈接的 Observable 開始發射數據
  • Observable.publish :將一個 Observable 轉換爲一個可鏈接的 Observable
  • Observable.replay :確保全部的訂閱者看到相同的數據序列,即便它們在 Observable 開始發射數據以後才訂閱
  • ConnectableObservable.refCount :讓一個可鏈接的 Observable 表現得像一個普通的 Observable

1. merge 和 zip

1.1 merge 操做符

合併多個 Observable 的發射物

merge 操做符能夠將多個 Observable 的輸出合井,使得它們就像是單個的 Observable 同樣。緩存

Observable.merge(
        Observable.just(1, 3, 5, 7, 9),
        Observable.just(2, 4, 6)
).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next: " + integer);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error: " + throwable);
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

// 執行結果
 Next: 1
 Next: 3
 Next: 5
 Next: 7
 Next: 9
 Next: 2
 Next: 4
 Next: 6
 Complete.

merge 是按照時間線並行的。若是傳遞給 merge 的任何一個 Observable 發射了 onError 通知終止,則 merge 操做符生成的 Observable 也會當即以 onError 通知終止。若是想讓它繼續發射數據,直到最後才報告錯誤,則可使用 mergeDelayError 操做符.

merge 操做符最多隻能合併 4 個被觀察者,若是須要合併更多個被觀察者,則可使用 mergeArray 操做符.併發

1.2 zip

經過一個函數將多個 Observable 的發射物結合到一塊兒,基於這個函數的結果爲每一個結合體發射單個數據項app

zip 操做符返回一個 Obversable ,它使用這個函數按順序結合兩個或多個 Observable 發射的數據項,而後發射這個函數返回的結果。它按照嚴格的順序應用這個函數,只發射與發射數據項最少的那個 Observable 同樣多的數據。ide

zip 的最後一個參數接收每一個 Observable 發射的一項數據,返回被壓縮後的數據,它能夠接收 1~9 個參數:一個 Observable 序列 或者一些發射 Observable 的 Observable函數

Observable.zip(
        Observable.just(1, 3, 5, 7),
        Observable.just(2, 4, 6),
        new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }
).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next: " + integer);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error: " + throwable);
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});

// 執行結果
 Next: 3
 Next: 7
 Next: 11
 Complete.

zip 操做符相對於 merge 操做符,除發射數據外,還會進行合併操做,並且 zip 發射的數據與數據項最少的 Observable 有關。spa

這裏 BiFunction 至關於一個合併函數,並不必定要返回 Integer 類型,能夠根據業務須要返回合適的類型。 BiFunction 的源碼以下:線程

public interface BiFunction<T1, T2, R> {

    /**
     * Calculate a value based on the input values.
     * @param t1 the first value
     * @param t2 the second value
     * @return the result value
     * @throws Exception on error
     */
    @NonNull
    R apply(@NonNull T1 t1, @NonNull T2 t2) throws Exception;
}

RxJava 2.x FuncN 遵循 Java 8 的命名規範。相對於 RxJava l.x, Func 更名成 Function, Func2 更名成 BiFunction, Func3~ Func9 更名成 Function3~ Function9,FuncN 由 Function 取代

2. combineLatest 和 join

2.1 combineLatest 操做符

combineLatest 操做符的行爲相似於 zip,可是隻有當原始的 Observable 中的每個都發射了一條數據時 zip 才發射數據,而 combineLatest 是當原始的 Observable 中任意一個發射了數據時就發射一條數據。當原始 Observable 的任何一個發射了一條數據時, combineLatest 使用一個函數結合它們最近發射的數據,而後發射這個函數的返回值。

Observable.combineLatest(
        Observable.just(1, 3, 5),
        Observable.just(2, 4, 6),
        new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                Log.d(TAG, "integer: " + integer + " ## integer2: " + integer2);
                return integer + integer2;
            }
        }
).subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        Log.d(TAG, "Next: " + integer);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Log.d(TAG, "Error: " + throwable);
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Log.d(TAG, "Complete.");
    }
});


// 執行結果
 integer: 5 ## integer2: 2
 Next: 7
 integer: 5 ## integer2: 4
 Next: 9
 integer: 5 ## integer2: 6
 Next: 11
 Complete.

2.2 join 操做符

join 操做符結合兩個 Observable 發射的數據,基於時間窗口(針對每條數據特定的原則)選擇待集合的數據項。將這些時間窗口實現爲一些 Observable ,它們的生命週期從任何一條 Observable 發射的每一條數據開始。當這個定義時間窗口的 Observable 發射了一條數據或者完成時,與這條數據關聯的窗口也會關閉。只要這條數據的窗口是打開的,它就繼續結合其餘 Observable 發射的任何數據項。

Observable<Integer> o1 = Observable.just(1, 2, 3);
Observable<Integer> o2 = Observable.just(4, 5, 6);

o1.join(o2, new Function<Integer, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(Integer integer) throws Exception {
        return Observable.just(String.valueOf(integer)).delay(200, TimeUnit.MILLISECONDS);
    }
}, new Function<Integer, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(Integer integer) throws Exception {
        return Observable.just(String.valueOf(integer)).delay(200, TimeUnit.MILLISECONDS);
    }
}, new BiFunction<Integer, Integer, String>() {
    @Override
    public String apply(Integer integer, Integer integer2) throws Exception {
        return integer + ":" + integer2;
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Next: " + s);
    }
});

// 執行結果
 Next: 1:4
 Next: 2:4
 Next: 3:4
 Next: 1:5
 Next: 2:5
 Next: 3:5
 Next: 1:6
 Next: 2:6
 Next: 3:6

join(Observable, Function, Function, BiFunction) 有四個參數:

  • Observable :源 Observable 須要組合 Observable ,這裏能夠稱之爲目標 Observable
  • Function: 接收從源 Observable 發射來的數據,井返回一個 Observable ,這個 Observable

的生命週期決定了源 Observable 發射數據的有效期。

  • Function :接收目標 Observable 發射的數據,井返回一個 Observable ,這個 Observable

的生命週期決定了目標 bservable 發射數據的有效期。

  • BiFunction :接收從源 Observable 和目 Observable 發射的數據,並將這兩個數據組

合後返回。

join 操做符的效果相似於排列組合,把第一個數據源 A 做爲基座窗口,它根據本身的節奏不斷髮射數據元素;第二個數據源 B,每發射一個數據,咱們都把它和第一個數據源 A 中己經發射的數據進行一對一匹配。舉例來講,若是某一時刻 B 發射了一個數據 "B",此時 A 己經發射了 a,b,c,d 共4個數據,那麼合併操做就是把 「B」 依次與 a,b,c,d 配對,獲得4組數據:[a, B], [b, B], [c, B], [d, B]

Observable<Integer> o1 = Observable.just(1, 2, 3).delay(200, TimeUnit.MILLISECONDS);
Observable<Integer> o2 = Observable.just(4, 5, 6);

o1.join(o2, new Function<Integer, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(Integer integer) throws Exception {
        return Observable.just(String.valueOf(integer)).delay(200, TimeUnit.MILLISECONDS);
    }
}, new Function<Integer, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(Integer integer) throws Exception {
        return Observable.just(String.valueOf(integer)).delay(200, TimeUnit.MILLISECONDS);
    }
}, new BiFunction<Integer, Integer, String>() {
    @Override
    public String apply(Integer integer, Integer integer2) throws Exception {
        return integer + ":" + integer2;
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "Next: " + s);
    }
});

// 執行結果
 Next: 1:4
 Next: 1:5
 Next: 1:6
 Next: 2:4
 Next: 2:5
 Next: 2:6
 Next: 3:4
 Next: 3:5
 Next: 3:6

3. startWith

在數據序列的開頭插入一條指定的項

若是想讓 Observable 在發射數據以前先發射一個指定的數據序列,則可使用 startWith 操做符。若是想在一個 Observable 發射數據的末尾追加一個數據序列 ,則可使用 concat 操做符。

Observable.just("hello, java", "hello, kotlin")
        .startWith("hello, rxjava")
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "Next: " + s);
            }
        });

// 執行結果
 Next: hello, rxjava
 Next: hello, java
 Next: hello, kotlin

startWith 操做符支持傳遞 Iterable, 同時還有一個 startWithArray 的操做符

Observable.just("hello, java", "hello, kotlin")
        .startWithArray("hello, rxjava", "hello, flutter")
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "Next: " + s);
            }
        });

// 執行結果
 Next: hello, rxjava
 Next: hello, flutter
 Next: hello, java
 Next: hello, kotlin

使用了 startWithArray 操做符以後,能夠再使用 startWith 操做符。

Observable.just("hello, java", "hello, kotlin")
        .startWithArray("hello, rxjava", "hello, flutter")
        .startWith("hello, groovy")
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "Next: " + s);
            }
        });

// 執行結果
 Next: hello, groovy
 Next: hello, rxjava
 Next: hello, flutter
 Next: hello, java
 Next: hello, kotlin

startWith 還能夠傳遞一個 Observable ,它會將那個 Observable 的發射物插在原始 Observable 發射的數據序列以前,而後把這個看成本身的發射物集合。

Observable.just("hello, java", "hello, kotlin")
        .startWithArray("hello, rxjava", "hello, flutter")
        .startWith(Observable.just("hello, groovy"))
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "Next: " + s);
            }
        });

// 執行結果
 Next: hello, groovy
 Next: hello, rxjava
 Next: hello, flutter
 Next: hello, java
 Next: hello, kotlin

4. connect、push 和 refCount

connect 和 refCount 是 ConnectableObservable 所使用的操做符。

ConnectableObservable 繼承自 Observable ,然而它並非在調用 subscribe() 的時候發射數據,而是隻有對其使用 connect 操做符時它纔會發射數據,因此能夠用來更靈活地控制數據發射的時機。另外, ConnectableObservable 是 Hot Observable

push 操做符是將普通的 Observable 轉換成 ConnectableObservable

connect 操做符是用來觸發 ConnectableObservable 發射數據的 。咱們能夠等全部的觀察者都訂閱了 ConnectableObservable 以後再發射數據。

final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");

Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS).take(6);

ConnectableObservable<Long> connectableObservable = observable.publish();

connectableObservable.subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "Next#1: " + aLong + "->time:" + format.format(new Date()));
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "Error: " + e);
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "Complete#1.");
    }
});

connectableObservable.delaySubscription(3, TimeUnit.SECONDS)
        .subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                Log.d(TAG, "Next#2: " + aLong + "->time:" + format.format(new Date()));
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "Error: " + e);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "Complete#2.");
            }
        });

connectableObservable.connect();

// 執行結果
 Next#1: 0->time:09:27:44
 Next#1: 1->time:09:27:45
 Next#1: 2->time:09:27:46
 Next#1: 3->time:09:27:47
 Next#2: 3->time:09:27:47
 Next#1: 4->time:09:27:48
 Next#2: 4->time:09:27:48
 Next#1: 5->time:09:27:49
 Next#2: 5->time:09:27:49
 Complete#1.
 Complete#2.

refCount 操做符是將 ConnectableObservable 轉換成普通的 Observable,同時又保持了 Hot Observable 的特性。當出現第一個訂閱者時, refCount 會調用 connect()。 每一個訂閱者每次都會接收到一樣的數據,可是當全部訂閱者都取消訂閱(dispose)時, refCount 會自動 dispose 上游 Observable

全部的訂閱者都取消訂閱後,則數據流中止。若是從新訂閱則數據流從新開始。若是不是全部的訂閱者都取消了訂閱,而是隻取消了部分。則部分訂閱者/觀察者從新開始訂閱時,不會從頭開始數據流。

final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");

Observable<Long> obs = Observable.interval(1, TimeUnit.SECONDS).take(6);

ConnectableObservable<Long> connectableObservable = obs.publish();

Observable<Long> obsRefCount = connectableObservable.refCount();

obs.subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "Next#1: " + aLong + "->time:" + format.format(new Date()));
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "Error: " + e);
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "Complete#1.");
    }
});

obs.delaySubscription(3, TimeUnit.SECONDS)
        .subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                Log.d(TAG, "Next#2: " + aLong + "->time:" + format.format(new Date()));
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "Error: " + e);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "Complete#2.");
            }
        });

obsRefCount.subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "Next#3: " + aLong + "->time:" + format.format(new Date()));
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "Error: " + e);
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "Complete#3.");
    }
});

obsRefCount.delaySubscription(3, TimeUnit.SECONDS)
        .subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                Log.d(TAG, "Next#4: " + aLong + "->time:" + format.format(new Date()));
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "Error: " + e);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "Complete#4.");
            }
        });


// 執行結果
 Next#1: 0->time:09:39:50
 Next#3: 0->time:09:39:50
 Next#1: 1->time:09:39:51
 Next#3: 1->time:09:39:51
 Next#1: 2->time:09:39:52
 Next#3: 2->time:09:39:52
 Next#1: 3->time:09:39:53
 Next#3: 3->time:09:39:53
 Next#4: 3->time:09:39:53
 Next#2: 0->time:09:39:53
 Next#1: 4->time:09:39:54
 Next#3: 4->time:09:39:54
 Next#4: 4->time:09:39:54
 Next#2: 1->time:09:39:54
 Next#3: 5->time:09:39:55
 Next#4: 5->time:09:39:55
 Complete#3.
 Next#1: 5->time:09:39:55
 Complete#4.
 Complete#1.
 Next#2: 2->time:09:39:55
 Next#2: 3->time:09:39:56
 Next#2: 4->time:09:39:57
 Next#2: 5->time:09:39:58
 Complete#2.

5. replay

保證全部的觀察者收到相同的數據序列,即便它們 Observable 開始發射數據以後才訂閱

replay 操做符返回一個 ConnectableObservable 對象,而且能夠緩存發射過的數據,這樣即便有訂閱者在發射數據以後進行訂閱,也能收到以前發射過的數據。不過使用 replay 操做符最好仍是先限定緩存的大小,不然緩存的數據太多時會佔用很大一塊內存。對緩存的控制能夠從空間和時間兩個方面來實現。

replay 操做符生成的 ConnectableObservable ,使得觀察者不管何時開始訂閱,都能收到 Observable 發送的全部數據

final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss");

Observable<Long> obs = Observable.interval(1, TimeUnit.SECONDS).take(6);

ConnectableObservable<Long> connectableObservable = obs.replay();

connectableObservable.connect();

connectableObservable.subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d) {

    }

    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "Next#1: " + aLong + "->time:" + format.format(new Date()));
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "Error: " + e);
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "Complete#1.");
    }
});

connectableObservable.delaySubscription(3, TimeUnit.SECONDS)
        .subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                Log.d(TAG, "Next#2: " + aLong + "->time:" + format.format(new Date()));
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "Error: " + e);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "Complete#2.");
            }
        });


// 執行結果
 Next#1: 0->time:09:59:01
 Next#1: 1->time:09:59:02
 Next#1: 2->time:09:59:03
 Next#2: 0->time:09:59:03
 Next#2: 1->time:09:59:03
 Next#2: 2->time:09:59:03
 Next#1: 3->time:09:59:04
 Next#2: 3->time:09:59:04
 Next#1: 4->time:09:59:05
 Next#2: 4->time:09:59:05
 Next#1: 5->time:09:59:06
 Next#2: 5->time:09:59:06
 Complete#1.
 Complete#2.

connect() 無須在觀察者訂閱以後調用也能執行.

replay 有多個接收不一樣參數的重載方法,有的能夠指定 replay 的最大緩存數量,有的能夠指定調度器。

ConnectableObservable 的線程切換隻能經過 replay 操做符實現,普通 Observable 的subscribeOn() 和 observerOn() 在 ConnectableObservable 中不起做用。 replay 操做符能夠經過指定線程的方式來切換線程。

若是個人文章對您有幫助,不妨點個贊鼓勵一下(^_^)