Android RxJava使用介紹(四) RxJava的操做符

本篇文章繼續介紹下面類型的操做符java

  • Combining Observables(Observable的組合操做符)
  • Error Handling Operators(Observable的錯誤處理操做符)

Combining Observables(Observable的組合操做符)

combineLatest操做符

combineLatest操做符把兩個Observable產生的結果進行合併,合併的結果組成一個新的Observable。markdown

這兩個Observable中隨意一個Observable產生的結果,都和還有一個Observable最後產生的結果,依照必定的規則進行合併。ide

流程圖例如如下:
這裏寫圖片描寫敘述
調用樣例例如如下:函數

//產生0,5,10,15,20數列
        Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 5;
                    }
                }).take(5);

        //產生0,10,20,30,40數列
        Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 10;
                    }
                }).take(5);


        Observable.combineLatest(observable1, observable2, new Func2<Long, Long, Long>() {
            @Override
            public Long call(Long aLong, Long aLong2) {
                return aLong+aLong2;
            }
        }).subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onNext(Long aLong) {
                System.out.println("Next: " + aLong);
            }
        });

執行結果例如如下:
Next: 0
Next: 5
Next: 15
Next: 20
Next: 30
Next: 35
Next: 45
Next: 50
Next: 60
Sequence complete.post

join操做符

join操做符把類似於combineLatest操做符,也是兩個Observable產生的結果進行合併,合併的結果組成一個新的Observable。但是join操做符可以控制每個Observable產生結果的生命週期,在每個結果的生命週期內,可以與還有一個Observable產生的結果依照必定的規則進行合併。流程圖例如如下:
這裏寫圖片描寫敘述this

join方法的使用方法例如如下:
observableA.join(observableB,
observableA產生結果生命週期控制函數,
observableB產生結果生命週期控制函數。
observableA產生的結果與observableB產生的結果的合併規則)spa

調用樣例例如如下:3d

//產生0,5,10,15,20數列
        Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 5;
                    }
                }).take(5);

        //產生0,10,20,30,40數列
        Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 10;
                    }
                }).take(5);

        observable1.join(observable2, new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                //使Observable延遲600毫秒執行
                return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);
            }
        }, new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                //使Observable延遲600毫秒執行
                return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);
            }
        }, new Func2<Long, Long, Long>() {
            @Override
            public Long call(Long aLong, Long aLong2) {
                return aLong + aLong2;
            }
        }).subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onNext(Long aLong) {
                System.out.println("Next: " + aLong);
            }
        });

執行結果例如如下:
Next: 0
Next: 5
Next: 15
Next: 20
Next: 30
Next: 35
Next: 45
Next: 50
Next: 60
Sequence complete.code

groupJoin操做符

groupJoin操做符很類似於join操做符,差異在於join操做符中第四個參數的傳入函數不一致。其流程圖例如如下:
這裏寫圖片描寫敘述blog

調用樣例例如如下:

//產生0,5,10,15,20數列
        Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 5;
                    }
                }).take(5);

        //產生0,10,20,30,40數列
        Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 10;
                    }
                }).take(5);

        observable1.groupJoin(observable2, new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                return Observable.just(aLong).delay(1600, TimeUnit.MILLISECONDS);
            }
        }, new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);
            }
        }, new Func2<Long, Observable<Long>, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong, Observable<Long> observable) {
                return observable.map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong2) {
                        return aLong + aLong2;
                    }
                });
            }
        }).subscribe(new Subscriber<Observable<Long>>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onNext(Observable<Long> observable) {
                observable.subscribe(new Subscriber<Long>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Long aLong) {
                        System.out.println("Next: " + aLong);
                    }
                });
            }
        });

執行結果例如如下:
Next: 0
Next: 5
Next: 10
Next: 15
Next: 20
Next: 25
Next: 30
Next: 35
Next: 40
Next: 45
Next: 50
Next: 60
Next: 55
Sequence complete.

merge操做符

merge操做符是依照兩個Observable提交結果的時間順序,對Observable進行合併,如ObservableA每隔500毫秒產生數據爲0,5,10,15,20。而ObservableB每隔500毫秒產生數據0,10,20,30,40。當中第一個數據延遲500毫秒產生,最後合併結果爲:0,0,5,10,10,20,15,30,20,40;其流程圖例如如下:
這裏寫圖片描寫敘述

調用樣例例如如下:

//產生0,5,10,15,20數列
        Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 5;
                    }
                }).take(5);

        //產生0,10,20,30,40數列
        Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 10;
                    }
                }).take(5);

        Observable.merge(observable1, observable2)
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("Sequence complete.");
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.err.println("Error: " + e.getMessage());
                    }

                    @Override
                    public void onNext(Long aLong) {
                        System.out.println("Next:" + aLong);
                    }
                });

執行結果例如如下:
Next:0
Next:0
Next:5
Next:10
Next:10
Next:20
Next:15
Next:30
Next:20
Next:40
Sequence complete.

mergeDelayError操做符

從merge操做符的流程圖可以看出,一旦合併的某一個Observable中出現錯誤,就會當即中止合併,並對訂閱者回調執行onError方法,而mergeDelayError操做符會把錯誤放到所有結果都合併完畢以後才執行,其流程圖例如如下:
這裏寫圖片描寫敘述

調用樣例例如如下:

//產生0,5,10數列,最後會產生一個錯誤
        Observable<Long> errorObservable = Observable.error(new Exception("this is end!"));
        Observable < Long > observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 5;
                    }
                }).take(3).mergeWith(errorObservable.delay(3500, TimeUnit.MILLISECONDS));

        //產生0,10,20,30,40數列
        Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 10;
                    }
                }).take(5);

        Observable.mergeDelayError(observable1, observable2)
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("Sequence complete.");
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.err.println("Error: " + e.getMessage());
                    }

                    @Override
                    public void onNext(Long aLong) {
                        System.out.println("Next:" + aLong);
                    }
                });

執行結果例如如下:
Next:0
Next:0
Next:5
Next:10
Next:10
Next:20
Next:30
Next:40
Error: this is end!

startWith操做符

startWith操做符是在源Observable提交結果以前。插入指定的某些數據。其流程圖例如如下:
這裏寫圖片描寫敘述

調用樣例例如如下:

Observable.just(10,20,30).startWith(2, 3, 4).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onNext(Integer value) {
                System.out.println("Next:" + value);
            }
        });

執行結果例如如下:
Next:2
Next:3
Next:4
Next:10
Next:20
Next:30
Sequence complete.

switchOnNext操做符

switchOnNext操做符是把一組Observable轉換成一個Observable,轉換規則爲:對於這組Observable中的每個Observable所產生的結果,假設在同一個時間內存在兩個或多個Observable提交的結果,僅僅取最後一個Observable提交的結果給訂閱者,其流程圖例如如下:
這裏寫圖片描寫敘述

調用樣例例如如下:

//每隔500毫秒產生一個observable
        Observable<Observable<Long>> observable = Observable.timer(0, 500, TimeUnit.MILLISECONDS).map(new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                //每隔200毫秒產生一組數據(0,10,20,30,40)
                return Observable.timer(0, 200, TimeUnit.MILLISECONDS).map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 10;
                    }
                }).take(5);
            }
        }).take(2);

        Observable.switchOnNext(observable).subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onNext(Long aLong) {
                System.out.println("Next:" + aLong);
            }
        });

執行結果例如如下:
Next:0
Next:10
Next:20
Next:0
Next:10
Next:20
Next:30
Next:40
Sequence complete.

zip操做符

zip操做符是把兩個observable提交的結果,嚴格依照順序進行合併,其流程圖例如如下:
這裏寫圖片描寫敘述

調用樣例例如如下:

Observable<Integer> observable1 = Observable.just(10,20,30);
        Observable<Integer> observable2 = Observable.just(4, 8, 12, 16);
        Observable.zip(observable1, observable2, new Func2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) {
                return integer + integer2;
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onNext(Integer value) {
                System.out.println("Next:" + value);
            }
        });

執行結果例如如下:
Next:14
Next:28
Next:42
Sequence complete.

Error Handling Operators(Observable的錯誤處理操做符)

onErrorReturn操做符

onErrorReturn操做符是在Observable錯誤發生或異常的時候(即將回調oError方法時),攔截錯誤並執行指定的邏輯,返回一個跟源Observable一樣類型的結果。最後回調訂閱者的onComplete方法。其流程圖例如如下:
這裏寫圖片描寫敘述
調用樣例例如如下:

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { if (subscriber.isUnsubscribed()) return; //循環輸出數字 try { for (int i = 0; i < 10; i++) { if (i == 4) { throw new Exception("this is number 4 error!

"); } subscriber.onNext(i); } subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } } }); observable.onErrorReturn(new Func1<Throwable, Integer>() { @Override public Integer call(Throwable throwable) { return 1004; } }).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); } @Override public void onNext(Integer value) { System.out.println("Next:" + value); } });

執行結果例如如下:
Next:0
Next:1
Next:2
Next:3
Next:1004
Sequence complete.

onErrorResumeNext操做符

onErrorResumeNext操做符跟onErrorReturn類似,僅僅只是onErrorReturn僅僅能在錯誤或異常發生時僅僅返回一個和源Observable一樣類型的結果,而onErrorResumeNext操做符是在錯誤或異常發生時返回一個Observable,也就是說可以返回多個和源Observable一樣類型的結果,其流程圖例如如下:
這裏寫圖片描寫敘述
調用樣例例如如下:

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<?

super Integer> subscriber) { if (subscriber.isUnsubscribed()) return; //循環輸出數字 try { for (int i = 0; i < 10; i++) { if (i == 4) { throw new Exception("this is number 4 error。"); } subscriber.onNext(i); } subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(e); } } }); observable.onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() { @Override public Observable<? extends Integer> call(Throwable throwable) { return Observable.just(100,101, 102); } }).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); } @Override public void onNext(Integer value) { System.out.println("Next:" + value); } });

執行結果例如如下:
Next:0
Next:1
Next:2
Next:3
Next:100
Next:101
Next:102
Sequence complete.

onExceptionResumeNext操做符

onExceptionResumeNext操做符和onErrorResumeNext操做符類似。不一樣的地方在於onErrorResumeNext操做符是當Observable錯誤發生或異常時觸發,而onExceptionResumeNext是當Observable發生異常時才觸發。

這裏要普及一個概念就是,java的異常分爲錯誤(error)和異常(exception)兩種。它們都是繼承於Throwable類。

錯誤(error)一般是比較嚴重的系統問題,比方咱們經常遇到的OutOfMemoryError、StackOverflowError等都是錯誤。錯誤通常繼承於Error類,而Error類又繼承於Throwable類,假設需要捕獲錯誤,需要使用try..catch(Error e)或者try..catch(Throwable e)句式。

使用try..catch(Exception e)句式沒法捕獲錯誤

異常(Exception)也是繼承於Throwable類。一般是依據實際處理業務拋出的異常。分爲執行時異常(RuntimeException)和普通異常。普通異常直接繼承於Exception類。假設方法內部沒有經過try..catch句式進行處理。必須經過throws關鍵字把異常拋出外部進行處理(即checked異常);而執行時異常繼承於RuntimeException類,假設方法內部沒有經過try..catch句式進行處理,不需要顯式經過throws關鍵字拋出外部。如IndexOutOfBoundsException、NullPointerException、ClassCastException等都是執行時異常。固然RuntimeException也是繼承於Exception類,所以是可以經過try..catch(Exception e)句式進行捕獲處理的。
onExceptionResumeNext流程圖例如如下:
這裏寫圖片描寫敘述

調用樣例例如如下:

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<?

super Integer> subscriber) { if (subscriber.isUnsubscribed()) return; //循環輸出數字 try { for (int i = 0; i < 10; i++) { if (i == 4) { throw new Exception("this is number 4 error!"); } subscriber.onNext(i); } subscriber.onCompleted(); } catch (Throwable e) { subscriber.onError(e); } } }); observable.onExceptionResumeNext(Observable.just(100, 101, 102)).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); } @Override public void onNext(Integer value) { System.out.println("Next:" + value); } });

執行結果例如如下:
Next:0
Next:1
Next:2
Next:3
Next:100
Next:101
Next:102
Sequence complete.

retry操做符

retry操做符是當Observable錯誤發生或者異常時,又一次嘗試執行Observable的邏輯。假設通過n次又一次嘗試執行後仍然出現錯誤或者異常,則最後回調執行onError方法。固然假設源Observable沒有錯誤或者異常出現,則依照正常流程執行。

其流程圖例如如下:
這裏寫圖片描寫敘述

調用樣例例如如下:

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<?

super Integer> subscriber) { if (subscriber.isUnsubscribed()) return; //循環輸出數字 try { for (int i = 0; i < 10; i++) { if (i == 4) { throw new Exception("this is number 4 error!"); } subscriber.onNext(i); } subscriber.onCompleted(); } catch (Throwable e) { subscriber.onError(e); } } }); observable.retry(2).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); } @Override public void onNext(Integer value) { System.out.println("Next:" + value); } });

執行結果例如如下:
Next:0
Next:1
Next:2
Next:3

Next:0
Next:1
Next:2
Next:3

Next:0
Next:1
Next:2
Next:3
Error: this is number 4 error!

retryWhen操做符

retryWhen操做符類似於retry操做符,都是在源observable出現錯誤或者異常時,又一次嘗試執行源observable的邏輯,不一樣在於retryWhen操做符是在源Observable出現錯誤或者異常時,經過回調第二個Observable來推斷是否又一次嘗試執行源Observable的邏輯。假設第二個Observable沒有錯誤或者異常出現。則就會又一次嘗試執行源Observable的邏輯,不然就會直接回調執行訂閱者的onError方法。其流程圖例如如下:
這裏寫圖片描寫敘述

調用樣例例如如下:

Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { System.out.println("subscribing"); subscriber.onError(new RuntimeException("always fails")); } }); observable.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() { @Override public Observable<?> call(Observable<? extends Throwable> observable) { return observable.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() { @Override public Integer call(Throwable throwable, Integer integer) { return integer; } }).flatMap(new Func1<Integer, Observable<?

>>() { @Override public Observable<?

> call(Integer integer) { System.out.println("delay retry by " + integer + " second(s)"); //每一秒中執行一次 return Observable.timer(integer, TimeUnit.SECONDS); } }); } }).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("Sequence complete."); } @Override public void onError(Throwable e) { System.err.println("Error: " + e.getMessage()); } @Override public void onNext(Integer value) { System.out.println("Next:" + value); } });

執行結果例如如下:
subscribing
delay retry by 1 second(s)
subscribing
delay retry by 2 second(s)
subscribing
delay retry by 3 second(s)
subscribing
Sequence complete.

好了,先介紹這麼多。下回繼續介紹其它的操做符。敬請期待!

相關文章
相關標籤/搜索