史上最全的Rxjava2講解(使用篇)

1.前言

在好久以前就一直想整理一下rxjava,可是一直沒有時間,最近是由於離職了,總算有時間整理一下了。由於打算每篇博客都記錄一個框架。因此爲了描述清楚,本篇博客可能略長(包含rxjava的簡介,使用,背壓,原理等),但願大家能認真的讀完,收穫確定仍是有的,也會採用大量的圖來介紹,這樣能夠加深理解。也能夠當一個工具博客,須要的使用的話隨時查閱。java

後續還會繼續出背壓和原理篇,敬請期待
react

2.簡介

什麼是rxjava? 是一種事件驅動的基於異步數據流的編程模式,整個數據流就像一條河流,它能夠被觀測(監聽),過濾,操控或者與其餘數據流合併爲一條新的數據流。android

三要素git

  1. 被觀察者(Observable)
  2. 觀察者(Observer)
  3. 訂閱(subscribe)

好了,由於秉持着要有圖的思想,在介紹rxjava各個操做符的時候,會採用大量的圖示來表示,圖示來源於官方,這裏先給你們介紹一下怎麼看。
ok,進入到擼碼環節
github

3.簡單使用

1.首先要在 build.gradle 文件中添加依賴編程

implementation 'io.reactivex.rxjava2:rxjava:2.1.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'
複製代碼

2.依賴搭建完畢了,咱們先寫個最簡單的案例,一共3步走併發

  • 2.1 建立被觀察者
// 建立被觀察者
   Observable.create(new ObservableOnSubscribe<String>() {

       @Override
       public void subscribe(ObservableEmitter<String> emitter) throws Exception {

            emitter.onNext("你好呀");
            emitter.onNext("我愛中國");
            emitter.onNext("祝願祖國繁榮富強");
            emitter.onComplete();
        }
   });
複製代碼
  • 2.2 建立觀察者
// 建立觀察者
   Observer observer = new Observer<String>(){

       @Override
       public void onSubscribe(Disposable d) {

           Log.i("lybj", "準備監聽");
       }

       @Override
       public void onNext(String s) {

           Log.i("lybj", s);
       }

       @Override
       public void onError(Throwable e) {

           Log.i("lybj", "error");
       }

       @Override
       public void onComplete() {

           Log.i("lybj", "監聽完畢");
       }
   };
複製代碼
  • 2.3 訂閱(也就是將被觀察者和觀察者關聯)
// 訂閱
   observable.subscribe(observer);
複製代碼

這就完事了,看下結果app

是否是很簡單,幾個概念再介紹一下框架

  • onNext():當被觀察者(observable)經過調用onNext()發射數據的時候,觀察者(observer)調用onNext()接收數據
  • onError():當被觀察者(observable)調用該函數時,觀察者(observer)調用onError(),其餘事件將不會繼續發送
  • onComplete():當被觀察者(observable)調用該函數時,觀察者(observer)調用onComplete(),其餘事件將不會繼續發送

其實rxjava,打個比方,就相似花灑的頭,數據流就相似水流,它的被觀察者(observable)的各類操做符就是花灑的那個頭,能夠有各類模式,好比中間噴水的,周圍噴水的,噴水霧的等等。根據操做符的不一樣,能夠改變數據的各類樣式,根據花灑頭的不一樣,能夠把水流改爲各類樣式。 接下來,就來學習下observable的豐富的操做符。異步

先看看大綱


4.建立操做符

1.create()

1.1 作啥的?

建立被觀察者對象

1.2 如何用?

// 建立被觀察者
Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                emitter.onNext("你好呀");
                emitter.onNext("我愛中國");
                emitter.onNext("祝願祖國繁榮富強");
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>(){ // 關聯觀察者

            @Override
            public void onSubscribe(Disposable d) {

                Log.i("lybj", "準備監聽");
            }

            @Override
            public void onNext(String s) {

                Log.i("lybj", s);
            }

            @Override
            public void onError(Throwable e) {

                Log.i("lybj", "error");
            }

            @Override
            public void onComplete() {

                Log.i("lybj", "監聽完畢");
            }
        });
複製代碼

1.3 結果

能夠直接鏈式調用關聯觀察者


2 just()

2.1 作啥的?

經過上面的圖,應該很形象的說明了,主要做用就是建立一個被觀察者,併發送事件,可是發送的事件不能夠超過10個以上。

2.2 如何用?

Observable.just("小明", "小紅", "小蘭").subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

                Log.i("lybj", "準備監聽");
            }

            @Override
            public void onNext(String s) {

                Log.i("lybj", s+"來了");
            }

            @Override
            public void onError(Throwable e) {

                Log.i("lybj", "Error");
            }

            @Override
            public void onComplete() {

                Log.i("lybj", "完畢");
            }
        });
複製代碼

2.3 結果


3 timer()

3.1 作啥的?

當到指定時間後就會發送一個 0 的值給觀察者。 在項目中,能夠作一些延時的處理,相似於Handler中的延時

3.2 如何用?

Observable.timer(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {

        @Override
        public void accept(Long aLong) throws Exception {

             Log.i("lybj", aLong+"");
        }
});
複製代碼

3.3 結果

延遲2秒後,將結果發送給觀察者,Consumer和Observer是建立觀察者的兩種寫法,至關於觀察者中的onNext方法。


4 interval()

4.1 作啥的?

每隔一段時間就會發送一個事件,這個事件是從0開始,不斷增1的數字。 相似於項目中的timer,作計時器

4.2 如何用?

Observable.interval(3,TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
            
     @Override
     public void accept(Long aLong) throws Exception {

          Log.i("lybj", aLong+"");
     }
});
複製代碼

4.3 結果


5 intervalRange()

5.1 作啥的?

能夠指定發送事件的開始值和數量,其餘與 interval() 的功能同樣。

5.2 如何用?

Observable.intervalRange(100, 4, 0, 10, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
           
      @Override
      public void accept(Long aLong) throws Exception {

           Log.i("lybj", aLong+"");
      }
});
複製代碼

5.3 結果

參數依次是:開始值,循環執行的次數,初始延遲時間,執行間隔時間,時間單位


6 range()

6.1 作啥的?

同時發送必定範圍的事件序列。

6.2 如何用?

Observable.range(0,10).subscribe(new Consumer<Integer>() {
            
    @Override
    public void accept(Integer integer) throws Exception {

         Log.i("lybj", integer+"");
    }
});
複製代碼

6.3 結果


7 rangeLong()

7.1 作啥的?

做用與 range() 同樣,只是數據類型爲 Long

7.2 如何用?

Observable.rangeLong(0,10).subscribe(new Consumer<Long>() {
            
    @Override
    public void accept(Long aLong) throws Exception {

         Log.i("lybj", aLong+"");
    }
});
複製代碼

7.3 結果


8 empty() & never() & error()

8.1 作啥的?

  • never():不發送任何事件
  • error():發送 onError() 事件
  • empty() : 直接發送 onComplete() 事件

8.2 如何用?

private void empty_never_error(){

        Observable.empty().subscribe(new Observer(){
            @Override
            public void onSubscribe(Disposable d) {

                Log.i("lybj", "準備監聽");
            }

            @Override
            public void onNext(Object o) {

                Log.i("lybj", o+"");
            }

            @Override
            public void onError(Throwable e) {

                Log.i("lybj", "onError");
            }

            @Override
            public void onComplete() {

                Log.i("lybj", "onComplete");
            }
        });
複製代碼

8.3 結果

若是是empty() 則:

若是是error() 則:

若是是never()則:



5.轉換操做符

1 map()

1.1 作啥的?

map 能夠將被觀察者發送的數據類型轉變成其餘的類型

1.2 怎麼用?

Observable.just("中國", "祖國", "中國軍人")
                .map(new Function<String, String>() {

                    @Override
                    public String apply(String s) throws Exception {

                        return "我愛" + s;
                    }
                })
                .subscribe(new Consumer<String>() {

                    @Override
                    public void accept(String s) throws Exception {

                        Log.i("lybj", s);
                    }
                });
複製代碼

1.3 結果

簡單來說,就是能夠對發射過來的數據進行再加工,再傳給觀察者


2 flatMap()

2.1 作啥的?

這個方法能夠將事件序列中的元素進行整合加工,返回一個新的被觀察者。 flatMap() 其實與 map() 相似,可是 flatMap() 返回的是一個 Observerable,map()只是返回數據,若是在元素再加工的時候,想再使用上面的建立操做符的話,建議使用flatMap(),而非map()。

2.2 怎麼用?

Observable.just("中國", "祖國", "中國軍人", "貪官")
                .flatMap(new Function<String, ObservableSource<String>>() {

                    @Override
                    public ObservableSource<String> apply(String s) throws Exception {

                        if(s.equals("貪官")){

                            return Observable.error(new Exception("貪官不能被喜歡"));
                        }
                        return Observable.just("我愛"+s);
                    }
                })
                .subscribe(new Consumer<String>() {

                    @Override
                    public void accept(String s) throws Exception {

                        Log.i("lybj", s);
                    }
                }, new Consumer<Throwable>() {

                    @Override
                    public void accept(Throwable throwable) throws Exception {

                        Log.i("lybj", throwable.getMessage());
                    }
                });
複製代碼

2.3 結果

new Consumer方法監聽的是Observable.error()


3 concatMap()

3.1 作啥的?

concatMap() 和 flatMap() 基本上是同樣的,只不過 concatMap() 轉發出來的事件是有序的,而 flatMap() 是無序的。

3.2 怎麼用?

Observable.just("中國", "祖國", "中國軍人", "貪官")
                .concatMap(new Function<String, ObservableSource<String>>() {

                    @Override
                    public ObservableSource<String> apply(String s) throws Exception {

                        if(s.equals("貪官")){

                            return Observable.error(new Exception("貪官不能被喜歡"));
                        }
                        return Observable.just("我愛"+s);
                    }
                })
                .subscribe(new Consumer<String>() {

                    @Override
                    public void accept(String s) throws Exception {

                        Log.i("lybj", s);
                    }
                }, new Consumer<Throwable>() {

                    @Override
                    public void accept(Throwable throwable) throws Exception {

                        Log.i("lybj", throwable.getMessage());
                    }
                });
複製代碼

3.3 結果


4 buffer()

4.1 作啥的?

從須要發送的事件當中獲取必定數量的事件,並將這些事件放到緩衝區當中一併發出。

4.2 怎麼用?

buffer 有兩個參數,一個是 count,另外一個 skip。count 緩衝區元素的數量,skip 就表明緩衝區滿了以後,發送下一次事件序列的時候要跳過多少元素。

Observable.just("1", "2", "3", "4", "5")
                .buffer(2,1)
                .subscribe(new Consumer<List<String>>() {

                    @Override
                    public void accept(List<String> strings) throws Exception {

                        Log.d("lybj", "緩衝區大小: " + strings.size());
                        for (String s : strings){
                            Log.d("lybj",  s);
                        }
                    }
                });
複製代碼

4.3 結果


5 scan()

5.1 作啥的?

將發射的數據經過一個函數進行變換,而後將變換後的結果做爲參數跟下一個發射的數據一塊兒繼續經過那個函數變換,這樣依次連續發射獲得最終結果。

5.2 怎麼用

Observable.just(1, 2, 3, 4, 5)
                .scan(new BiFunction<Integer, Integer, Integer>() {

                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {

                        Log.i("lybj", "integer01: " + integer + " integer02: "+ integer2);
                        return integer + integer2;
                    }
                }).subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj", "accept: " + integer);
                    }
                });
複製代碼

5.3 結果

簡單來講,先將第一個元素返回給觀察者,而後將1,2的和返給觀察者,而後將上一次計算的和,當第一個元素,也就是3,第2個元素,是一直按順序取值,取第3個元素也就是3,那麼將,3+3 =6,返回給觀察者,以此類推,將6做爲第一個元素,第二個元素取值4,將6+4=10返回給觀察者。

sacn操做符是遍歷源Observable產生的結果,再按照自定義規則進行運算,依次輸出每次計算後的結果給訂閱者


6 window()

6.1 作啥的?

發送事件時,將這些事件分爲按數量從新分組。window 中的 count 的參數就是表明指定的數量,例如將 count 指定爲2,那麼每發2個數據就會將這2個數據分紅一組。

window與buffer區別:window是把數據分割成了Observable,buffer是把數據分割成List

6.2 如何用?

Observable.just("魯班", "孫尚香", "亞索","火女","蓋倫")
                .window(2)
                .subscribe(new Consumer<Observable<String>>() {

                    @Override
                    public void accept(Observable<String> stringObservable) throws Exception {

                        Log.i("lybj", "分組開始");
                        stringObservable.subscribe(new Consumer<String>() {

                            @Override
                            public void accept(String s) throws Exception {

                                Log.i("lybj", s);
                            }
                        });
                    }
                });
複製代碼

6.3 結果


6.組合操做符

1.concat()

1.1 作啥的?

能夠將多個觀察者組合在一塊兒,而後按照以前發送順序發送事件。須要注意的是,concat() 最多隻能夠發送4個事件。

1.2 怎麼用?

private void concat(){

        Observable.concat(
                Observable.just(1, 2, 3),
                Observable.just(4, 5),
                Observable.just(6, 7),
                Observable.just(8, 9))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj", integer+"");
                    }
                });
    }
複製代碼

1.3 結果


2.concatArray()

2.1 作啥的?

與 concat() 做用同樣,不過 concatArray() 能夠發送多於 4 個被觀察者。

2.2 怎麼用?

Observable.concatArray(Observable.just(1, 2, 3, 4),
                Observable.just(5, 6),
                Observable.just(7, 8, 9, 10),
                Observable.just(11, 12, 13),
                Observable.just(14, 15),
                Observable.just(16))
                .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {

                Log.i("lybj", integer+"");
            }
        });
複製代碼

2.3 結果


3.merge()

3.1 作啥的?

這個方法與 concat() 做用基本同樣,可是 concat() 是串行發送事件,而 merge() 並行發送事件,也是隻能發送4個。

3.2 怎麼用?

Observable.merge(Observable.just(1, 2, 3, 4),
                Observable.just(5, 6),
                Observable.just(7, 8, 9, 10),
                Observable.just(11, 12, 13))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj", integer+"");
                    }
                });
複製代碼

3.3 結果


4.zip()

4.1 作啥的?

zip操做符用於將多個數據源合併,並生成一個新的數據源。新生成的數據源嚴格按照合併前的數據源的數據發射順序,而且新數據源的數據個數等於合併前發射數據個數最少的那個數據源的數據個數。

4.2 怎麼用?

Observable.zip(Observable.just(1, 2, 3),
                Observable.just("A", "B", "C", "D", "E"),
                new BiFunction<Integer, String, String>(){

                    @Override
                    public String apply(Integer o1, String o2) throws Exception {

                        return o1 +"_"+ o2;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String o) throws Exception {

                        Log.i("lybj", o);
                    }
                });
複製代碼

4.3 結果


5.startWith() & startWithArray()

5.1 作啥的?

在發送事件以前追加事件,startWith() 追加一個事件,startWithArray() 能夠追加多個事件。追加的事件會先發出。

5.2 怎麼用?

Observable.just(1, 2, 3)
               .startWithArray(4, 5)
               .subscribe(new Consumer<Integer>() {

                   @Override
                   public void accept(Integer integer) throws Exception {

                       Log.i("lybj", integer+"");
                   }
               });
複製代碼

5.3 結果


6.count()

6.1 作啥的?

返回被觀察者發送事件的數量。

6.2 怎麼用?

Observable.just(2, 3, 4, 5, 6)
                .count()
                .subscribe(new Consumer<Long>() {

                    @Override
                    public void accept(Long aLong) throws Exception {

                        Log.i("lybj", "事件數量:" + aLong);
                    }
                });
複製代碼

6.3 結果


7.功能操做符

1.delay()

1.1 作啥的?

延遲一段事件發送事件。

1.2 怎麼用?

Observable.just(1,2,3,4)
                .delay(3, TimeUnit.SECONDS)
                .subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
複製代碼

1.3 結果


2.周期函數

2.1 作啥的?

doOnEach(): 每次發送事件以前都會回調這個方法

doOnNext(): Observable 每發送 onNext() 以前都會先回調這個方法。

doAfterNext(): Observable 每發送 onNext() 以後都會回調這個方法。

doOnComplete(): Observable 每發送 onComplete() 以前都會回調這個方法。

doOnError(): Observable 每發送 onError() 以前都會回調這個方法。

doOnSubscribe(): Observable 每發送 onSubscribe()以前都會回調這個方法。

doOnDispose(): 當調用 Disposable 的 dispose() 以後回調該方法。

doOnTerminate(): 在 onError 或者 onComplete 發送以前回調。

doAfterTerminate(): onError 或者 onComplete 發送以後回調。

doFinally(): 在全部事件發送完畢以後回調該方法。若是取消訂閱以後doAfterTerminate()就不會被回調,而doFinally()不管怎麼樣都會被回調,且都會在事件序列的最後。

2.2 怎麼用?

Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).doOnEach(new Consumer<Notification<Integer>>() {

            @Override
            public void accept(Notification<Integer> integerNotification) throws Exception {

                Log.i("lybj",  "doOnEach 方法執行了, 結果:"+ integerNotification.getValue());
            }
        }).doOnNext(new Consumer<Integer>() {

            @Override
            public void accept(Integer integer) throws Exception {

                Log.i("lybj",  "doOnNext 方法執行了, 結果:"+ integer);
            }
        }).doAfterNext(new Consumer<Integer>() {

            @Override
            public void accept(Integer integer) throws Exception {

                Log.i("lybj",  "doAfterNext 方法執行了, 結果:"+ integer);
            }
        }).doOnComplete(new Action() {

            @Override
            public void run() throws Exception {

                Log.i("lybj",  "doOnComplete 方法執行了");
            }
        }).doOnError(new Consumer<Throwable>() {

            @Override
            public void accept(Throwable throwable) throws Exception {

                Log.i("lybj",  "doOnError 方法執行了");
            }
        }).doOnSubscribe(new Consumer<Disposable>() {

            @Override
            public void accept(Disposable disposable) throws Exception {

                Log.i("lybj",  "doOnSubscribe 方法執行了");
            }
        }).doOnDispose(new Action() {

            @Override
            public void run() throws Exception {

                Log.i("lybj",  "doOnDispose 方法執行了");
            }
        }).doOnTerminate(new Action() {

            @Override
            public void run() throws Exception {

                Log.i("lybj",  "doOnTerminate 方法執行了");
            }
        }).doAfterTerminate(new Action() {

            @Override
            public void run() throws Exception {

                Log.i("lybj",  "doAfterTerminate 方法執行了");
            }
        }).doFinally(new Action() {

            @Override
            public void run() throws Exception {

                Log.i("lybj",  "doFinally 方法執行了");
            }
        }).subscribe(new Observer<Integer>() {

            private Disposable disposable;

            @Override
            public void onSubscribe(Disposable d) {

                disposable = d;
                Log.i("lybj", "------觀察者onSubscribe()執行");
            }

            @Override
            public void onNext(Integer integer) {

                Log.i("lybj", "------觀察者onNext()執行:"+integer);
                if(integer == 2){
// disposable.dispose(); // 取消訂閱
                }
            }

            @Override
            public void onError(Throwable e) {

                Log.i("lybj", "------觀察者onError()執行");
            }

            @Override
            public void onComplete() {

                Log.i("lybj", "------觀察者onComplete()執行");
            }
        });
複製代碼

2.3 結果


3.onErrorReturn()

3.1 作啥的?

當接受到一個 onError() 事件以後回調,返回的值會回調 onNext() 方法,並正常結束該事件序列。

3.2 怎麼用?

Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                emitter.onNext("小明:到");
                emitter.onError(new IllegalStateException("error"));
                emitter.onNext("小方:到");
            }
        }).onErrorReturn(new Function<Throwable, String>() {

            @Override
            public String apply(Throwable throwable) throws Exception {

                Log.i("lybj",  "小紅請假了");
                return "小李:到";
            }
        }).subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(String s) {

                Log.i("lybj",  s);
            }

            @Override
            public void onError(Throwable e) {

                Log.i("lybj",  e.getMessage());
            }

            @Override
            public void onComplete() {
            }
        });
複製代碼

3.3 結果


4.onErrorResumeNext()

4.1 作啥的?

當接收到 onError() 事件時,返回一個新的 Observable,並正常結束事件序列。

4.2 怎麼用?

Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                emitter.onNext("小明");
                emitter.onNext("小方");
                emitter.onNext("小紅");
                emitter.onError(new NullPointerException("error"));
            }
        }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends String>>() {

            @Override
            public ObservableSource<? extends String> apply(Throwable throwable) throws Exception {

                return Observable.just("1", "2", "3");
            }
        }).subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

                Log.i("lybj",  "準備監聽");
            }

            @Override
            public void onNext(String s) {

                Log.i("lybj",  s);
            }

            @Override
            public void onError(Throwable e) {

                Log.i("lybj",  e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.i("lybj",  "onComplete");
            }
        });
複製代碼

4.3 結果


5.onExceptionResumeNext()

5.1 作啥的?

與 onErrorResumeNext() 做用基本一致,可是這個方法只能捕捉 Exception。

5.2 怎麼用?

Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                emitter.onNext("小明");
                emitter.onNext("小方");
                emitter.onNext("小紅");
                emitter.onError(new Error("error"));
            }
        }).onExceptionResumeNext(new Observable<String>() {

            @Override
            protected void subscribeActual(Observer observer) {

                observer.onNext("小張");
            }
        }).subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

                Log.i("lybj",  "準備監聽");
            }

            @Override
            public void onNext(String s) {

                Log.i("lybj",  s);
            }

            @Override
            public void onError(Throwable e) {

                Log.i("lybj",  e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.i("lybj",  "onComplete");
            }
        });
複製代碼

5.3 結果


6.retry()

6.1 作啥的?

若是出現錯誤事件,則會從新發送全部事件序列。times 是表明從新發的次數。

6.2 怎麼用?

Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {

                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onError(new IllegalStateException());
            }
        }).retry(2)
          .subscribe(new Observer<String>() {

              @Override
              public void onSubscribe(Disposable d) {

                  Log.i("lybj",  "準備監聽");
              }

              @Override
              public void onNext(String s) {

                  Log.i("lybj",  s);
              }

              @Override
              public void onError(Throwable e) {

                  Log.i("lybj",  e.getMessage());
              }

              @Override
              public void onComplete() {
                  Log.i("lybj",  "onComplete");
              }
          });
複製代碼

6.3 結果


7.retryUntil()

7.1 作啥的?

出現錯誤事件以後,能夠經過此方法判斷是否繼續發送事件。

7.2 怎麼用?

Observable.create(new ObservableOnSubscribe<String>() {

            public void subscribe(@NonNull ObservableEmitter<String> emitter){

                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onError(new NullPointerException("error"));
                emitter.onNext("4");
                emitter.onNext("5");
            }
        }).retryUntil(new BooleanSupplier() {
            @Override
            public boolean getAsBoolean() throws Exception {

                Log.i("lybj",  "getAsBoolean");
                return true;
            }
        }).subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(String s) {

                Log.i("lybj",  s);
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });
複製代碼

7.3 結果


8.repeat()

8.1 作啥的?

重複發送被觀察者的事件,times 爲發送次數。

8.2 怎麼用?

Observable.just(1,2,3)
                .repeat(2)
                .subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
複製代碼

8.3 結果


9.subscribeOn() & observeOn()

9.1 作啥的?

subscribeOn(): 指定被觀察者的線程,若是屢次調用此方法,只有第一次有效。 observeOn(): 指定觀察者的線程

9.2 怎麼用?

Observable.create(new ObservableOnSubscribe<String>() {

            public void subscribe(@NonNull ObservableEmitter<String> emitter){

                emitter.onNext("1");
                Log.i("lybj",  Thread.currentThread().getName());
            }
        }).subscribeOn(Schedulers.io())
          .observeOn(Schedulers.newThread())
          .subscribe(new Consumer<String>() {

               @Override
               public void accept(String s) throws Exception {

                   Log.i("lybj",  s);
                   Log.i("lybj",  Thread.currentThread().getName());
               }
          });
複製代碼

9.3 結果


8.過濾操做符

1.filter()

1.1 作啥的?

若是返回 true 則會發送事件,不然不會發送

1.2 怎麼用?

Observable.just(1,2,3,4,5)
          .filter(new Predicate<Integer>() {
                    
                    @Override
                    public boolean test(Integer integer) throws Exception {

                        if(integer > 4){
                            return true;
                        }
                        return false;
                    }
           }).subscribe(new Consumer<Integer>() {
            
               @Override
               public void accept(Integer integer) throws Exception {

                   Log.i("lybj",  integer+"");
               }
          });
複製代碼

1.3 結果


2.ofType()

2.1 作啥的?

能夠過濾不符合該類型事件

2.2 怎麼用?

Observable.just(1, 2, 3, "小明", "小方")
                .ofType(String.class)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {

                        Log.i("lybj",  s+"");
                    }
                });
複製代碼

2.3 結果


3.skip()

3.1 作啥的?

跳過正序某些事件,count 表明跳過事件的數量

3.2 怎麼用?

Observable.just(1,2,3,4,5,6,7)
                .skip(2)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
複製代碼

3.3 結果


4.distinct()

4.1 作啥的?

過濾事件序列中的重複事件。

4.2 作啥的?

Observable.just(1,2,3,1,4,1,2)
                .distinct()
                .subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
複製代碼

4.3 結果


5.distinctUntilChanged()

5.1 作啥的?

過濾掉連續重複的事件

5.2 作啥的?

Observable.just(1,2,3,3,1,5,6)
        .distinctUntilChanged()
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {

                Log.i("lybj",  integer+"");
            }
        });
複製代碼

5.3 結果


6.take()

6.1 作啥的?

控制觀察者接收的事件的數量。

6.2 怎麼用?

Observable.just(1,2,3,4,5,6)
                .take(3)
                .subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
複製代碼

6.3 結果


7.debounce()

7.1 作啥的?

若是兩件事件發送的時間間隔小於設定的時間間隔則前一件事件就不會發送給觀察者。 簡單來講就是防抖動,好比按鈕控制快速點擊等。

7.2 怎麼用?

Observable.just(1,2,3,4,5)
                .map(new Function<Integer, Integer>() {

                    @Override
                    public Integer apply(Integer integer) throws Exception {

                        Thread.sleep(900);
                        return integer;
                    }
                })
                .debounce(1,TimeUnit.SECONDS)
                .subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
複製代碼

7.3 結果


8.firstElement() && lastElement() && elementAt()

8.1 作啥的?

firstElement(): 取事件序列的第一個元素。

lastElement(): 取事件序列的最後一個元素。

elementAt(): 以指定取出事件序列中事件,可是輸入的 index 超出事件序列的總數的話就不會出現任何結果。

8.2 怎麼用?

Observable.just(1,2,3,4)
                .firstElement()
                .subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
複製代碼

8.3 結果


9.條件操做符

1.all()

1.1 作啥的?

判斷事件序列是否所有知足某個事件,若是都知足則返回 true,反之則返回 false。

1.2 怎麼用?

Observable.just(1, 2, 3, 4, 5)
                .all(new Predicate<Integer>() {

                    @Override
                    public boolean test(Integer integer) throws Exception {

                        return integer <= 4;
                    }
                }).subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {

                Log.i("lybj",  aBoolean+"");
            }
        });
複製代碼

1.3 結果


2.takeWhile() & takeUntil()

2.1 作啥的?

takeWhile(): 從左邊開始,將知足條件的元素取出來,直到遇到第一個不知足條件的元素,則終止 takeUntil(): 從左邊開始,將知足條件的元素取出來,直到遇到第一個知足條件的元素,則終止 filter(): 是將全部知足條件的數據都取出。

2.2 怎麼用?

Observable.just(1, 2, 3, 4, 5)
                .takeWhile(new Predicate<Integer>() {

                    @Override
                    public boolean test(Integer integer) throws Exception {

                        return integer < 3;
                    }
                }).subscribe(new Consumer<Integer>() {
            
                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
複製代碼

2.3 結果


3.skipWhile

3.1 作啥的?

從左邊開始,根據條件跳過元素

3.2 怎麼用?

Observable.just(1,2,3,4,5,3,2,1,7)
                .skipWhile(new Predicate<Integer>() {

                    @Override
                    public boolean test(Integer integer) throws Exception {

                        return integer < 3;
                    }
                }).subscribe(new Consumer<Integer>() {

                    @Override
                    public void accept(Integer integer) throws Exception {

                        Log.i("lybj",  integer+"");
                    }
                });
複製代碼

3.3 結束


4.isEmpty() & defaultIfEmpty()

4.1 作啥的?

isEmpty(): 判斷事件序列是否爲空。

defaultIfEmpty(): 若是觀察者只發送一個 onComplete() 事件,則能夠利用這個方法發送一個值。

4.2 怎麼用?

Observable.create(new ObservableOnSubscribe<String>() {

            public void subscribe(@NonNull ObservableEmitter<String> emitter){

                emitter.onComplete();
            }
        }).isEmpty()
          .subscribe(new Consumer<Boolean>() {

              @Override
              public void accept(Boolean aBoolean) throws Exception {

                  Log.i("lybj",  aBoolean+"");
              }
          });
複製代碼

4.3 結果


5.contains()

5.1 作啥的?

判斷事件序列中是否含有某個元素,若是有則返回 true,若是沒有則返回 false。

5.2 怎麼用?

在Observable.just(1,2,3,4,5,6)
                .contains(2)
                .subscribe(new Consumer<Boolean>() {

                    @Override
                    public void accept(Boolean aBoolean) throws Exception {

                        Log.i("lybj",  aBoolean+"");
                    }
                });
複製代碼

5.3 結果


6.sequenceEqual()

6.1 作啥的?

判斷兩個 Observable 發送的事件是否相同。

6.2 怎麼用?

Observable.sequenceEqual(Observable.just("小明", "小方", "小李"),
                Observable.just("小明", "小方", "小李", "小張"))
                .subscribe(new Consumer<Boolean>() {

                    @Override
                    public void accept(Boolean aBoolean) throws Exception {

                        Log.i("lybj",  aBoolean+"");
                    }
                });
複製代碼

6.3 結果


10.源碼

demo下載

相關文章
相關標籤/搜索