Rxjava2-Android-Samlpes(一)

  • 最近工做不那麼忙,計劃從新搭建一個MVP架構!基於Rxjava2-Rxandroid首先掌握RxJava2的使用的方式!java

  • 此文章是根據老外amitshekhariitbhuRxJava2-Android-SamplesDemo 改裝而成,是個翻譯版本,足夠應付對RxJava2的所有姿式!android

  • GitHub地址: RxJava2-Android-Samplesgit

    • Map - >經過對每一個項應用函數來變換Observable發出的項
    • Zip - >經過指定的函數將多個Observable的排放組合在一塊兒,並根據此函數的結果爲每一個組合發出單個項目
    • Filter - >僅發出經過謂詞測試的Observable中的那些項
    • FlatMap - >將Observable發出的項目轉換爲Observables,而後將這些項目的排放量變爲單個Observable
    • Take - >僅發出Observable發出的前n項
    • Reduce - >按順序將一個函數應用於Observable發出的每一個項目,併發出最終值
    • Skip - >抑制Observable發出的前n項
    • Buffer - >按期將Observable發出的項目收集到bundle中併發出這些bundle而不是一次發送一個項目
    • Concat - >從兩個或多個Observable發射發射而不交錯
    • Replay - >確保全部觀察者看到相同的發射物品序列,即便他們在Observable開始發射物品後訂閱
    • Merge - >經過合併它們的排放將多個Observable組合成一個
    • SwitchMap - >將Observable發出的項目變換爲Observables,並鏡像最近轉換的Observable發出的項目

Operators

  • 一、 簡單的一個順序執行的Demo
/*
     * 一個一個地發出兩個值的簡單例子
     */
    private void doSomeWork() {
        getObservable()
                // 在後臺線程上運行
                .subscribeOn(Schedulers.io())
                // 在主線程上被通知
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(getObserver());
    }

    private Observable<String> getObservable() {
        return Observable.just("1", "2","3","4","5","6");
    }
複製代碼
  • d.dispose(); todo 若是這個方法放開的話,就不會往下面走了
private Observer<String> getObserver() {
        return new Observer<String>() {
            /**
             *
             *爲觀察者提供取消(處理)的方法。
             *鏈接(通道)和可觀察到的兩個
             *同步(從{{Link Lang-OnNeXT(object)})和異步方式。
             */
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
                //處理資源,操做應該是冪等的。
                //d.dispose();  todo 若是這個方法放開的話,就不會往下面走了

            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, " onNext : value : " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, " onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, " onComplete");
            }
        };
    }
複製代碼
  • 輸出結果
11-30 10:03:17.883 16586-16586/com.rxjava2.android.samples D/SimpleExampleActivity:  onSubscribe : false
11-30 10:03:17.923 16586-16586/com.rxjava2.android.samples D/SimpleExampleActivity:  onNext : value : 1
11-30 10:03:17.931 16586-16586/com.rxjava2.android.samples D/SimpleExampleActivity:  onNext : value : 2
11-30 10:03:17.938 16586-16586/com.rxjava2.android.samples D/SimpleExampleActivity:  onNext : value : 3
11-30 10:03:17.944 16586-16586/com.rxjava2.android.samples D/SimpleExampleActivity:  onNext : value : 4
11-30 10:03:17.950 16586-16586/com.rxjava2.android.samples D/SimpleExampleActivity:  onNext : value : 5
11-30 10:03:17.955 16586-16586/com.rxjava2.android.samples D/SimpleExampleActivity:  onNext : value : 6
11-30 10:03:17.961 16586-16586/com.rxjava2.android.samples D/SimpleExampleActivity:  onComplete
複製代碼
  • 二、經過map運算符 處理網咯請求的Demo,就好比說我去網絡上請求個ApiUser回來,而後轉化成我想要的User
private void doSomeWork() {
        getObservable()
                // Run on a background thread
                .subscribeOn(Schedulers.io())
                // Be notified on the main thread
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<List<ApiUser>, List<User>>() {
                     //經過前面的東西,如何獲取後面的東西
                    @Override
                    public List<User> apply(List<ApiUser> apiUsers) {
                        return Utils.convertApiUserListToUserList(apiUsers);
                    }
                })
                .subscribe(getObserver());
    }

    private Observable<List<ApiUser>> getObservable() {
        return Observable.create(new ObservableOnSubscribe<List<ApiUser>>() {
            @Override
            public void subscribe(ObservableEmitter<List<ApiUser>> e) {
                if (!e.isDisposed()) {
                    // List<ApiUser> 獲得這個 對象
                    e.onNext(Utils.getApiUserList());
                    e.onComplete();
                }
            }
        });
    }

複製代碼
  • 處理獲得的List<User>
//處理獲得的 List<User>
    private Observer<List<User>> getObserver() {
        return new Observer<List<User>>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onNext(List<User> userList) {
                Log.d(TAG, " onNext : " + userList.size());
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, " onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, " onComplete");
            }
        };
複製代碼
  • 輸出結果
11-30 10:22:26.566 16586-16586/com.rxjava2.android.samples D/MapExampleActivity:  onSubscribe : false
11-30 10:22:26.619 16586-16586/com.rxjava2.android.samples D/MapExampleActivity:  onNext : 3
11-30 10:22:26.624 16586-16586/com.rxjava2.android.samples D/MapExampleActivity:  onComplete

複製代碼
  • 三、兩個複雜的操做放到子線程中去,而後在主線程中去處理 兩隊球迷的問題 zip
  • 獲取數據源
private Observable<List<User>> getCricketFansObservable() {
        return Observable.create(new ObservableOnSubscribe<List<User>>() {
            @Override
            public void subscribe(ObservableEmitter<List<User>> e) {
                if (!e.isDisposed()) {
                    e.onNext(Utils.getUserListWhoLovesCricket());
                    e.onComplete();
                }
            }
        });
    }

    private Observable<List<User>> getFootballFansObservable() {
        return Observable.create(new ObservableOnSubscribe<List<User>>() {
            @Override
            public void subscribe(ObservableEmitter<List<User>> e) {
                if (!e.isDisposed()) {
                    e.onNext(Utils.getUserListWhoLovesFootball());
                    e.onComplete();
                }
            }
        });
    }
複製代碼
  • 處理
private void doSomeWork() {
        // 獲取喜歡足球名單的人員   獲取板球球迷的名單
        Observable.zip(getCricketFansObservable(), getFootballFansObservable(),
                // 有點kotlin的啊 第一個對應的是沙面位置的第一個,最後是指望
                new BiFunction<List<User>, List<User>, List<User>>() {
                    @Override
                    public List<User> apply(List<User> cricketFans, List<User> footballFans) {
                        return Utils.filterUserWhoLovesBoth(cricketFans, footballFans);
                    }
                })
                // Run on a background thread
                .subscribeOn(Schedulers.io())
                // Be notified on the main thread
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(getObserver());
    }
複製代碼
  • 結果監聽
private Observer<List<User>> getObserver() {
        return new Observer<List<User>>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onNext(List<User> userList) {
                textView.append(" onNext");
                textView.append(AppConstant.LINE_SEPARATOR);
                for (User user : userList) {
                    textView.append(" firstname : " + user.firstname);
                    textView.append(AppConstant.LINE_SEPARATOR);
                }
                Log.d(TAG, " onNext : " + userList.size());
            }

            @Override
            public void onError(Throwable e) {
                textView.append(" onError : " + e.getMessage());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                textView.append(" onComplete");
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onComplete");
            }
        };
複製代碼
  • 輸出,過程就是找出兩個類的共性,而後放到主線程操做它的結果
11-30 10:28:16.326 16586-16586/com.rxjava2.android.samples D/ZipExampleActivity:  onSubscribe : false
11-30 10:28:16.353 16586-16586/com.rxjava2.android.samples D/ZipExampleActivity:  onNext : 1
11-30 10:28:16.358 16586-16586/com.rxjava2.android.samples D/ZipExampleActivity:  onComplete
複製代碼
  • 四、對一些耗時操做的問題,可使用使用容器 Disposable ,在活動被破壞後不要發送事件
//一次性容器,能夠容納多個其餘一次性物品,並提供O(1)添加和移除複雜性。
    private final CompositeDisposable disposables = new CompositeDisposable();
  @Override
    protected void onDestroy() {
        super.onDestroy();
        //在活動被破壞後不要發送事件
        disposables.clear(); // do not send event after activity has been destroyed
    }
複製代碼
disposables.add(sampleObservable()
                // Run on a background thread
                .subscribeOn(Schedulers.io())
                // Be notified on the main thread
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(new DisposableObserver<String>() {
                    @Override
                    public void onComplete() {
                        textView.append(" onComplete");
                        textView.append(AppConstant.LINE_SEPARATOR);
                        Log.d(TAG, " onComplete");
                    }

                    @Override
                    public void onError(Throwable e) {
                        textView.append(" onError : " + e.getMessage());
                        textView.append(AppConstant.LINE_SEPARATOR);
                        Log.d(TAG, " onError : " + e.getMessage());
                    }

                    @Override
                    public void onNext(String value) {
                        textView.append(" onNext : value : " + value);
                        textView.append(AppConstant.LINE_SEPARATOR);
                        Log.d(TAG, " onNext value : " + value);
                    }
                }));

複製代碼
  • sampleObservable
static Observable<String> sampleObservable() {
        return Observable.defer(new Callable<ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> call() {
                // Do some long running operation
                // 作一些長時間運行的操做
                SystemClock.sleep(2000);
                return Observable.just("one", "two", "three", "four", "five");
            }
        });
    }
複製代碼
  • 輸出結果
11-30 10:32:47.735 16586-16586/com.rxjava2.android.samples D/DisposableExampleActivity:  onNext value : one
11-30 10:32:47.748 16586-16586/com.rxjava2.android.samples D/DisposableExampleActivity:  onNext value : two
11-30 10:32:47.755 16586-16586/com.rxjava2.android.samples D/DisposableExampleActivity:  onNext value : three
11-30 10:32:47.762 16586-16586/com.rxjava2.android.samples D/DisposableExampleActivity:  onNext value : four
11-30 10:32:47.770 16586-16586/com.rxjava2.android.samples D/DisposableExampleActivity:  onNext value : five
11-30 10:32:47.775 16586-16586/com.rxjava2.android.samples D/DisposableExampleActivity:  onComplete
複製代碼
  • 五、使用取算符,它只發出所需數量的值。這裏只有5箇中的3個
private void doSomeWork() {
        getObservable()
                // Run on a background thread
                .subscribeOn(Schedulers.io())
                // Be notified on the main thread
                .observeOn(AndroidSchedulers.mainThread())
                .take(3)
                .subscribe(getObserver());
    }

    private Observable<Integer> getObservable() {
        return Observable.just(1, 2, 3, 4, 5);
    }
複製代碼
  • 監聽
private Observer<Integer> getObserver() {
        return new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onNext(Integer value) {
                textView.append(" onNext : value : " + value);
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onNext value : " + value);
            }

            @Override
            public void onError(Throwable e) {
                textView.append(" onError : " + e.getMessage());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                textView.append(" onComplete");
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onComplete");
            }
        };
    }
複製代碼
  • 輸出結果
11-30 10:33:43.235 16586-16586/com.rxjava2.android.samples D/TakeExampleActivity:  onSubscribe : false
11-30 10:33:43.254 16586-16586/com.rxjava2.android.samples D/TakeExampleActivity:  onNext value : 1
11-30 10:33:43.259 16586-16586/com.rxjava2.android.samples D/TakeExampleActivity:  onNext value : 2
11-30 10:33:43.265 16586-16586/com.rxjava2.android.samples D/TakeExampleActivity:  onNext value : 3
11-30 10:33:43.271 16586-16586/com.rxjava2.android.samples D/TakeExampleActivity:  onComplete

複製代碼
  • 六、延遲兩秒運行
private void doSomeWork() {
        getObservable()
                // Run on a background thread
                .subscribeOn(Schedulers.io())
                // Be notified on the main thread
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(getObserver());
    }

    private Observable<? extends Long> getObservable() {
        return Observable.timer(2, TimeUnit.SECONDS);
    }
複製代碼
  • 七、定時器 不斷重複的運行 使用RxJava運行 使用間隔2秒的間隔運行任務的簡單示例
private final CompositeDisposable disposables = new CompositeDisposable();
 @Override
    protected void onDestroy() {
        super.onDestroy();
        disposables.clear(); // clearing it : do not emit after destroy
    }
複製代碼
  • 使用間隔2秒的間隔運行任務的簡單示例當即開始,initialDelay的開始的時間爲0
private void doSomeWork() {
        disposables.add(getObservable()
                // Run on a background thread
                .subscribeOn(Schedulers.io())
                // Be notified on the main thread
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(getObserver()));
    }

    private Observable<? extends Long> getObservable() {
        return Observable.interval(0, 2, TimeUnit.SECONDS);
    }
複製代碼
  • 八、使用單觀察者的簡單例子
/*
     * simple example using SingleObserver
     *使用單觀察者的簡單例子
     */
    private void doSomeWork() {
        Single.just("Amit")
                .subscribe(getSingleObserver());
    }
複製代碼
  • 觀察者
private SingleObserver<String> getSingleObserver() {
        return new SingleObserver<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onSuccess(String value) {
                textView.append(" onNext : value : " + value);
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onNext value : " + value);
            }

            @Override
            public void onError(Throwable e) {
                textView.append(" onError : " + e.getMessage());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onError : " + e.getMessage());
            }
        };
    }
複製代碼
  • 輸出結果 :注意這裏沒有onNext的方法了,這個比較特殊
11-30 11:11:00.612 16586-16586/com.rxjava2.android.samples D/SingleObserverExampleActivity:  onSubscribe : true
11-30 11:11:00.615 16586-16586/com.rxjava2.android.samples D/SingleObserverExampleActivity:  onNext value : Amit

複製代碼
  • 九、使用徹底觀測器的簡單示例,延遲1s纔去自行
Completable completable = Completable.timer(1000, TimeUnit.MILLISECONDS);

        completable
                .subscribeOn(Schedulers.io())
                // Be notified on the main thread
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(getCompletableObserver());

 private CompletableObserver getCompletableObserver() {
        return new CompletableObserver() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onComplete() {
                textView.append(" onComplete");
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onComplete");
            }

            @Override
            public void onError(Throwable e) {
                textView.append(" onError : " + e.getMessage());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onError : " + e.getMessage());
            }
        };
    }
複製代碼
  • 輸出結果 :延遲1s
11-30 11:12:41.248 16586-16586/com.rxjava2.android.samples D/CompletableObserverExampleActivity:  onSubscribe : false
11-30 11:12:42.270 16586-16586/com.rxjava2.android.samples D/CompletableObserverExampleActivity:  onComplete
複製代碼
  • 十、使用流動性的簡單示例,說白了 就是累加的過程 1+2+3+4+5 ==== 使用Rxjava管理 這個帶了一個初始值
Flowable<Integer> observable = Flowable.just(100, 2, 3, 4);

        observable.reduce(50+1, new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer t1, Integer t2) {
                Log.d(TAG, " t1 : " + t1);
                Log.d(TAG, " t2 : " + t2);
                return t1 + t2;
            }
        }).subscribe(getObserver());

 private SingleObserver<Integer> getObserver() {

        return new SingleObserver<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onSuccess(Integer value) {
                textView.append(" onSuccess : value : " + value);
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onSuccess : value : " + value);
            }

            @Override
            public void onError(Throwable e) {
                textView.append(" onError : " + e.getMessage());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onError : " + e.getMessage());
            }
        };
    }
複製代碼
  • 輸出結果:51+100+2+3+4=160
11-30 11:14:40.489 16586-16586/com.rxjava2.android.samples D/FlowableExampleActivity:  onSubscribe : false
11-30 11:14:40.490 16586-16586/com.rxjava2.android.samples D/FlowableExampleActivity:   t1 : 51
      t2 : 100
      t1 : 151
      t2 : 2
      t1 : 153
      t2 : 3
      t1 : 156
11-30 11:14:40.491 16586-16586/com.rxjava2.android.samples D/FlowableExampleActivity:   t2 : 4
11-30 11:14:40.496 16586-16586/com.rxjava2.android.samples D/FlowableExampleActivity:  onSuccess : value : 160
複製代碼
  • 十一、說白了 就是累加的過程 1+2+3+4+5 ==== 使用Rxjava管理 這個沒有帶一個初始值
/*
     * simple example using reduce to add all the number
     *
     * 簡單的例子,用以減小全部數字的添加
     */
    private void doSomeWork() {
        getObservable()
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer t1, Integer t2) {
                        return t1 + t2;
                    }
                })
                .subscribe(getObserver());
    }

    private Observable<Integer> getObservable() {
        return Observable.just(1, 2, 3, 4);
    }

    private MaybeObserver<Integer> getObserver() {
        return new MaybeObserver<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onSuccess(Integer value) {
                textView.append(" onSuccess : value : " + value);
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onSuccess : value : " + value);
            }

            @Override
            public void onError(Throwable e) {
                textView.append(" onError : " + e.getMessage());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                textView.append(" onComplete");
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onComplete");
            }
        };
    }
複製代碼
  • 輸出結果
11-30 11:16:17.035 16586-16586/com.rxjava2.android.samples D/ReduceExampleActivity:  onSubscribe : false
11-30 11:16:17.039 16586-16586/com.rxjava2.android.samples D/ReduceExampleActivity:  onSuccess : value : 10
複製代碼
  • 12 、一個buffer 緩衝取 ,我要從緩衝區中取數據,並且是跳着取數據,就要這樣作
/*
     * simple example using buffer operator - bundles all emitted values into a list
     *
     * 使用緩衝運算符的簡單示例-將全部發出的值捆綁到列表中
     */
    private void doSomeWork() {

        Observable<List<String>> buffered = getObservable().buffer(3, 1);

        // 3 means,  從開始索引和建立列表中最多須要三個
        // 1 means, 每次跳一步
        // so the it gives the following list
        // 1 - one, two, three
        // 2 - two, three, four
        // 3 - three, four, five
        // 4 - four, five
        // 5 - five

        buffered.subscribe(getObserver());
    }

    private Observable<String> getObservable() {
        return Observable.just("one", "two", "three", "four", "five");
    }

    private Observer<List<String>> getObserver() {
        return new Observer<List<String>>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onNext(List<String> stringList) {
                textView.append(" onNext size : " + stringList.size());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onNext : size :" + stringList.size());
                for (String value : stringList) {
                    textView.append(" value : " + value);
                    textView.append(AppConstant.LINE_SEPARATOR);
                    Log.d(TAG, " : value :" + value);
                }

            }

            @Override
            public void onError(Throwable e) {
                textView.append(" onError : " + e.getMessage());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                textView.append(" onComplete");
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onComplete");
            }
        };
    }
複製代碼
  • 輸出結果:以此取出3個數字,同時角標移動1.把這個buffer中的值去完成就ok
11-30 11:17:25.002 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:  onSubscribe : false
11-30 11:17:25.009 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:  onNext : size :3
11-30 11:17:25.012 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:  : value :one
11-30 11:17:25.014 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:  : value :two
11-30 11:17:25.016 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:  : value :three
11-30 11:17:25.018 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:  onNext : size :3
11-30 11:17:25.019 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:  : value :two
11-30 11:17:25.021 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:  : value :three
11-30 11:17:25.022 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:  : value :four
11-30 11:17:25.025 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:  onNext : size :3
11-30 11:17:25.027 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:  : value :three
11-30 11:17:25.029 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:  : value :four
11-30 11:17:25.031 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:  : value :five
11-30 11:17:25.033 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:  onNext : size :2
11-30 11:17:25.035 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:  : value :four
11-30 11:17:25.037 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:  : value :five
11-30 11:17:25.039 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:  onNext : size :1
11-30 11:17:25.040 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:  : value :five
11-30 11:17:25.041 16586-16586/com.rxjava2.android.samples D/BufferExampleActivity:  onComplete
複製代碼
  • 1三、 對數據中特定的信息 作處理 使用過濾器操做符只發出偶數值的簡單示例
/*
     * simple example by using filter operator to emit only even value
     * 使用過濾器操做符只發出偶數值的簡單示例
     */
    private void doSomeWork() {
        Observable.just(1, 2, 3, 4, 5, 6)
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) {
                        return integer % 2 == 0;
                    }
                })
                .subscribe(getObserver());
    }


    private Observer<Integer> getObserver() {
        return new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onNext(Integer value) {
                textView.append(" onNext : ");
                textView.append(AppConstant.LINE_SEPARATOR);
                textView.append(" value : " + value);
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onNext ");
                Log.d(TAG, " value : " + value);
            }

            @Override
            public void onError(Throwable e) {
                textView.append(" onError : " + e.getMessage());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                textView.append(" onComplete");
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onComplete");
            }
        };
    }
複製代碼
  • 輸出結果,簡單的意思 就是取偶數
11-30 11:20:24.775 16586-16586/com.rxjava2.android.samples D/FilterExampleActivity:  onSubscribe : false
11-30 11:20:24.779 16586-16586/com.rxjava2.android.samples D/FilterExampleActivity:  onNext 
11-30 11:20:24.780 16586-16586/com.rxjava2.android.samples D/FilterExampleActivity:  value : 2
11-30 11:20:24.782 16586-16586/com.rxjava2.android.samples D/FilterExampleActivity:  onNext 
     value : 4
11-30 11:20:24.786 16586-16586/com.rxjava2.android.samples D/FilterExampleActivity:  onNext 
     value : 6
11-30 11:20:24.788 16586-16586/com.rxjava2.android.samples D/FilterExampleActivity:  onComplete
複製代碼
  • 1四、使用跳過操做符,它不會發出前2個值。 對前面兩個值不會操做
/* Using skip operator, it will not emit
    * the first 2 values.
    * 使用跳過操做符,它不會發出前2個值。
    */
    private void doSomeWork() {
        getObservable()
                // Run on a background thread
                .subscribeOn(Schedulers.io())
                // Be notified on the main thread
                .observeOn(AndroidSchedulers.mainThread())
                .skip(2)
                .subscribe(getObserver());
    }

    private Observable<Integer> getObservable() {
        return Observable.just(1, 2, 3, 4, 5);
    }

    private Observer<Integer> getObserver() {
        return new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onNext(Integer value) {
                textView.append(" onNext : value : " + value);
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onNext value : " + value);
            }

            @Override
            public void onError(Throwable e) {
                textView.append(" onError : " + e.getMessage());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                textView.append(" onComplete");
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onComplete");
            }
        };
    }
複製代碼
  • 輸出結果:跳出前面的兩個值
11-30 11:21:22.224 16586-16586/com.rxjava2.android.samples D/SkipExampleActivity:  onSubscribe : false
11-30 11:21:22.235 16586-16586/com.rxjava2.android.samples D/SkipExampleActivity:  onNext value : 3
11-30 11:21:22.236 16586-16586/com.rxjava2.android.samples D/SkipExampleActivity:  onNext value : 4
     onNext value : 5
11-30 11:21:22.237 16586-16586/com.rxjava2.android.samples D/SkipExampleActivity:  onComplete
複製代碼
  • 1五、 使用掃描算子,它也發送先前的結果。 意思就是 我關心每次運算的結果 ,是每次運算的結果 這個有個關鍵的地方 subscribe 裏面的 觀察者 onNext的方法是先行執行的
/* Using scan operator, it sends also the previous result
    * 使用掃描算子,它也發送先前的結果。
    * */
    private void doSomeWork() {
        getObservable()
                // Run on a background thread
                .subscribeOn(Schedulers.io())
                // Be notified on the main thread
                .observeOn(AndroidSchedulers.mainThread())
                .scan(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer int1, Integer int2) {
                        Log.d(TAG, " int1 : " + int1);
                        Log.d(TAG, " int2 : " + int2);
                        return int1 + int2;
                    }
                })
                .subscribe(getObserver());
    }

    private Observable<Integer> getObservable() {
        return Observable.just(1, 2, 3, 4, 5);
    }

    private Observer<Integer> getObserver() {
        return new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onNext(Integer value) {
                textView.append(" onNext : value : " + value);
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onNext value : " + value);
            }

            @Override
            public void onError(Throwable e) {
                textView.append(" onError : " + e.getMessage());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                textView.append(" onComplete");
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onComplete");
            }
        };
    }
複製代碼
  • 輸出結果:三角形數
11-30 11:22:24.396 16586-16586/com.rxjava2.android.samples D/ScanExampleActivity:  onSubscribe : false
11-30 11:22:24.409 16586-16586/com.rxjava2.android.samples D/ScanExampleActivity:  onNext value : 1
      int1 : 1
      int2 : 2
11-30 11:22:24.411 16586-16586/com.rxjava2.android.samples D/ScanExampleActivity:  onNext value : 3
      int1 : 3
      int2 : 3
11-30 11:22:24.414 16586-16586/com.rxjava2.android.samples D/ScanExampleActivity:  onNext value : 6
      int1 : 6
      int2 : 4
11-30 11:22:24.417 16586-16586/com.rxjava2.android.samples D/ScanExampleActivity:  onNext value : 10
      int1 : 10
      int2 : 5
11-30 11:22:24.419 16586-16586/com.rxjava2.android.samples D/ScanExampleActivity:  onNext value : 15
11-30 11:22:24.420 16586-16586/com.rxjava2.android.samples D/ScanExampleActivity:  onComplete
複製代碼
  • 1六、PublishSubject 我我的理解的話,就是一堆數據我要發送給別人,可是呢最後幾個數字我又要發送給其餘人,因此就須要使用到這個,使用重放操做符,重放確保全部觀察者看到相同的序列。發射項目,即便它們訂閱後,可觀測已經開始發射項目。我我的理解的是,發送一個buffer,給第一個觀察者,同時我要把這個buffer的尾部長度爲4的在發送給第二個觀察者。
private void doSomeWork() {
        PublishSubject<Integer> source = PublishSubject.create();
        ConnectableObservable<Integer> connectableObservable = source.replay(4); 
         //鏈接可鏈接的可觀察的
        connectableObservable.connect();
        connectableObservable.subscribe(getFirstObserver());
        source.onNext(1);
        source.onNext(2);
        source.onNext(3);
        source.onNext(4);
        source.onNext(5);
        source.onNext(6);
        source.onNext(7);
        source.onComplete();
        connectableObservable.subscribe(getSecondObserver());
    }


    private Observer<Integer> getFirstObserver() {
        return new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " First onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onNext(Integer value) {
                textView.append(" First onNext : value : " + value);
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " First onNext value : " + value);
            }

            @Override
            public void onError(Throwable e) {
                textView.append(" First onError : " + e.getMessage());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " First onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                textView.append(" First onComplete");
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " First onComplete");
            }
        };
    }

    private Observer<Integer> getSecondObserver() {
        return new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                textView.append(" Second onSubscribe : isDisposed :" + d.isDisposed());
                Log.d(TAG, " Second onSubscribe : " + d.isDisposed());
                textView.append(AppConstant.LINE_SEPARATOR);
            }

            @Override
            public void onNext(Integer value) {
                textView.append(" Second onNext : value : " + value);
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " Second onNext value : " + value);
            }

            @Override
            public void onError(Throwable e) {
                textView.append(" Second onError : " + e.getMessage());
                Log.d(TAG, " Second onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                textView.append(" Second onComplete");
                Log.d(TAG, " Second onComplete");
            }
        };
    }
複製代碼
  • 輸出結果
11-30 11:39:02.882 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:  First onSubscribe : false
11-30 11:39:02.903 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:  First onNext value : 1
11-30 11:39:02.911 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:  First onNext value : 2
11-30 11:39:02.919 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:  First onNext value : 3
11-30 11:39:02.926 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:  First onNext value : 4
11-30 11:39:02.932 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:  First onNext value : 5
11-30 11:39:02.938 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:  First onNext value : 6
11-30 11:39:02.944 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:  First onNext value : 7
11-30 11:39:02.950 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:  First onComplete
11-30 11:39:02.954 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:  Second onSubscribe : false
11-30 11:39:02.963 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:  Second onNext value : 4
11-30 11:39:02.968 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:  Second onNext value : 5
11-30 11:39:02.972 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:  Second onNext value : 6
11-30 11:39:02.977 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:  Second onNext value : 7
11-30 11:39:02.979 19690-19690/com.rxjava2.android.samples D/ReplayExampleActivity:  Second onComplete
複製代碼
  • 17 、依次的發送兩個數組,並且裏面兩個數組的是有序的輸出的,因此就要使用到這個裏面的
/**
     * 使用CONTAT運算符組合可觀察性:CONTAT維護
     * 可觀察的順序。
     * 將按順序發射全部7個值
     * 這裏第一個「A1」,「A2」,「A3」,「A4」,而後是「B1」,「B2」,「B3」。
     * 首先從第一個觀察到而後
     * 全部從第二可觀察到的全部順序
     */
    private void doSomeWork() {
        final String[] aStrings = {"A1", "A2", "A3", "A4"};
        final String[] bStrings = {"B1", "B2", "B3"};

        final Observable<String> aObservable = Observable.fromArray(aStrings);
        final Observable<String> bObservable = Observable.fromArray(bStrings);

        Observable.concat(aObservable, bObservable)
                .subscribe(getObserver());
    }


    private Observer<String> getObserver() {
        return new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onNext(String value) {
                textView.append(" onNext : value : " + value);
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onNext : value : " + value);
            }

            @Override
            public void onError(Throwable e) {
                textView.append(" onError : " + e.getMessage());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                textView.append(" onComplete");
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onComplete");
            }
        };
    }
複製代碼
  • 輸出結果
11-30 11:42:43.359 19690-19690/com.rxjava2.android.samples D/ConcatExampleActivity:  onSubscribe : false
11-30 11:42:43.369 19690-19690/com.rxjava2.android.samples D/ConcatExampleActivity:  onNext : value : A1
11-30 11:42:43.375 19690-19690/com.rxjava2.android.samples D/ConcatExampleActivity:  onNext : value : A2
11-30 11:42:43.382 19690-19690/com.rxjava2.android.samples D/ConcatExampleActivity:  onNext : value : A3
11-30 11:42:43.388 19690-19690/com.rxjava2.android.samples D/ConcatExampleActivity:  onNext : value : A4
11-30 11:42:43.393 19690-19690/com.rxjava2.android.samples D/ConcatExampleActivity:  onNext : value : B1
11-30 11:42:43.399 19690-19690/com.rxjava2.android.samples D/ConcatExampleActivity:  onNext : value : B2
11-30 11:42:43.404 19690-19690/com.rxjava2.android.samples D/ConcatExampleActivity:  onNext : value : B3
11-30 11:42:43.409 19690-19690/com.rxjava2.android.samples D/ConcatExampleActivity:  onComplete
複製代碼
  • 1八、依次的發送兩個數組,並且裏面兩個數組的不是有序的輸出的,可是我始終沒有測出來結果,哎哎 難受的很 日了狗! RxJava 合併組合兩個(或多個)Observable數據源
private void doSomeWork() {
        final String[] aStrings = {"A1", "A2", "A3", "A4","1", "2", "3", "4","5", "6", "7", "8","9", "10", "11", "12",};
        final String[] bStrings = {"B1", "B2", "B3","B1", "B2", "B3","B1", "B2", "B3","B1", "B2", "B3","B1", "B2", "B3","B1", "B2", "B3","B1", "B2", "B3"};

        final Observable<String> aObservable = Observable.fromArray(aStrings);
        final Observable<String> bObservable = Observable.fromArray(bStrings);

        Observable.merge(aObservable, bObservable)
                .subscribe(getObserver());
    }


    private Observer<String> getObserver() {
        return new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onNext(String value) {
                textView.append(" onNext : value : " + value);
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onNext : value : " + value);
            }

            @Override
            public void onError(Throwable e) {
                textView.append(" onError : " + e.getMessage());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                textView.append(" onComplete");
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onComplete");
            }
        };
    }
複製代碼
  • 輸出結果:合併多個Observables的發射物, Merge 可能會讓合併的Observables發射的數據交錯(有一個相似的操做符 Concat 不會讓數 據交錯,它會按順序一個接着一個發射多個Observables的發射物,雖然個人結果沒有測試出來,可是呢?真的有可能數據會交叉!!!!
11-30 11:46:44.466 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onSubscribe : false
11-30 11:46:44.477 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : A1
11-30 11:46:44.484 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : A2
11-30 11:46:44.490 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : A3
11-30 11:46:44.495 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : A4
11-30 11:46:44.499 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : 1
11-30 11:46:44.503 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : 2
11-30 11:46:44.508 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : 3
11-30 11:46:44.512 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : 4
11-30 11:46:44.515 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : 5
11-30 11:46:44.517 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : 6
11-30 11:46:44.519 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : 7
11-30 11:46:44.522 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : 8
11-30 11:46:44.525 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : 9
11-30 11:46:44.528 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : 10
11-30 11:46:44.532 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : 11
11-30 11:46:44.535 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : 12
11-30 11:46:44.537 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : B1
11-30 11:46:44.539 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : B2
11-30 11:46:44.540 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : B3
11-30 11:46:44.542 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : B1
11-30 11:46:44.543 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : B2
11-30 11:46:44.546 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : B3
11-30 11:46:44.548 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : B1
11-30 11:46:44.551 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : B2
11-30 11:46:44.553 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : B3
11-30 11:46:44.556 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : B1
11-30 11:46:44.559 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : B2
11-30 11:46:44.561 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : B3
11-30 11:46:44.564 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : B1
11-30 11:46:44.567 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : B2
11-30 11:46:44.570 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : B3
11-30 11:46:44.573 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : B1
11-30 11:46:44.575 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : B2
11-30 11:46:44.577 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : B3
11-30 11:46:44.578 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : B1
11-30 11:46:44.580 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : B2
11-30 11:46:44.581 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onNext : value : B3
11-30 11:46:44.582 19690-19690/com.rxjava2.android.samples D/MergeExampleActivity:  onComplete
複製代碼
  • 1九、讓屬性跟着數據bean傳遞下去 Car 類
public class Car {

    private String brand;

    public void setBrand(String brand) {
        this.brand = brand;
    }

    public Observable<String> brandDeferObservable() {
        return Observable.defer(new Callable<ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> call() {
                return Observable.just(brand);
            }
        });
    }
}
複製代碼
/*
     * Defer used for Deferring Observable code until subscription in RxJava
     * 推遲在RxJava訂閱可觀察代碼直到訂閱
     */
    private void doSomeWork() {

        Car car = new Car();

        Observable<String> brandDeferObservable = car.brandDeferObservable();
        // 即便咱們在建立了可觀察的品牌以後設置了品牌,咱們也會獲得寶馬的品牌。若是咱們不使用延遲器,咱們將沒有做爲品牌。
        car.setBrand("BMW");  // Even if we are setting the brand after creating Observable
        // we will get the brand as BMW.
        // If we had not used defer, we would have got null as the brand.

        brandDeferObservable
                .subscribe(getObserver());
    }

    private Observer<String> getObserver() {
        return new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onNext(String value) {
                textView.append(" onNext : value : " + value);
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onNext : value : " + value);
            }

            @Override
            public void onError(Throwable e) {
                textView.append(" onError : " + e.getMessage());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                textView.append(" onComplete");
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onComplete");
            }
        };
    }

複製代碼
  • 輸出結果
11-30 14:17:07.380 19690-19690/com.rxjava2.android.samples D/DeferExampleActivity:  onSubscribe : false
11-30 14:17:07.388 19690-19690/com.rxjava2.android.samples D/DeferExampleActivity:  onNext : value : BMW
11-30 14:17:07.392 19690-19690/com.rxjava2.android.samples D/DeferExampleActivity:  onComplete

複製代碼
  • 20、去重,對數據源,進行去重的操做
/*
     * distinct() suppresses duplicate items emitted by the source Observable.
     * 區別()抑制由可觀察到的源發出的重複項。
     */
    private void doSomeWork() {

        getObservable()
                .distinct()
                .subscribe(getObserver());
    }

    private Observable<Integer> getObservable() {
        return Observable.just(1, 2, 1, 1, 2, 3, 4, 6, 4);
    }


    private Observer<Integer> getObserver() {
        return new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onNext(Integer value) {
                textView.append(" onNext : value : " + value);
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onNext value : " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, " onError : " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, " onComplete");
            }
        };
    }
複製代碼
  • 輸出的結果
11-30 14:20:04.382 19690-19690/com.rxjava2.android.samples D/DistinctExampleActivity:  onSubscribe : false
11-30 14:20:04.388 19690-19690/com.rxjava2.android.samples D/DistinctExampleActivity:  onNext value : 1
11-30 14:20:04.391 19690-19690/com.rxjava2.android.samples D/DistinctExampleActivity:  onNext value : 2
11-30 14:20:04.394 19690-19690/com.rxjava2.android.samples D/DistinctExampleActivity:  onNext value : 3
11-30 14:20:04.397 19690-19690/com.rxjava2.android.samples D/DistinctExampleActivity:  onNext value : 4
11-30 14:20:04.400 19690-19690/com.rxjava2.android.samples D/DistinctExampleActivity:  onNext value : 6
     onComplete

複製代碼
  • 2一、對數據源傳遞,可是隻不過是取最後一個數,把前面的都不須要了,同時給了一個默認的值 A1
private void doSomeWork() {
        // the default item ("A1") to emit if the source ObservableSource is empty
        getObservable().last("A1") // the default item ("A1") to emit if the source ObservableSource is empty
                .subscribe(getObserver());
    }

    private Observable<String> getObservable() {
        return Observable.just("A1", "A2", "A3", "A4", "A5", "A6");
    }

    private SingleObserver<String> getObserver() {
        return new SingleObserver<String>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, " onSubscribe : " + d.isDisposed());
            }

            @Override
            public void onSuccess(String value) {
                textView.append(" onNext : value : " + value);
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onNext value : " + value);
            }


            @Override
            public void onError(Throwable e) {
                textView.append(" onError : " + e.getMessage());
                textView.append(AppConstant.LINE_SEPARATOR);
                Log.d(TAG, " onError : " + e.getMessage());
            }
        };
    }
複製代碼
  • 輸出結果
11-30 14:20:42.788 19690-19690/com.rxjava2.android.samples D/DistinctExampleActivity:  onSubscribe : false
11-30 14:20:42.800 19690-19690/com.rxjava2.android.samples D/DistinctExampleActivity:  onNext value : A6
複製代碼
相關文章
相關標籤/搜索