操做符之組合 / 合併操做符

     一、做用

  • 組合 多個被觀察者(Observable) & 合併須要發送的事件

     二、類型

     三、詳解

          3.1   concat() / concatArray()數據結構

       做用:組合多個被觀察者一塊兒發送數據,合併後 按發送順序串行執行app

                     區別:組合被觀察者的數量,即concat()組合被觀察者數量≤4個,而concatArray()則可>4個ide

 

    public static void concat() {
        Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

          輸出:函數

08-06 16:28:21.348 22182 22182 D Operation: onSubscribe
08-06 16:28:21.348 22182 22182 D Operation: onNext: value = 1
08-06 16:28:21.348 22182 22182 D Operation: onNext: value = 2
08-06 16:28:21.348 22182 22182 D Operation: onNext: value = 3
08-06 16:28:21.349 22182 22182 D Operation: onNext: value = 4
08-06 16:28:21.349 22182 22182 D Operation: onNext: value = 5
08-06 16:28:21.349 22182 22182 D Operation: onComplete

 

          3.2   merge() / mergeArray()----- 見rxdocs.pdf第139頁spa

       做用:組合多個被觀察者一塊兒發送數據,合併後 按時間線並行執行3d

       區別:組合被觀察者的數量,即merge()組合被觀察者數量≤4個,而mergeArray()則可>4個code

                 區別上述concat()操做符:一樣是組合多個被觀察者一塊兒發送數據,但concat()操做符合並後是按發送順序串行執行server

 

 

   public static void merge() {
        Observable.merge(Observable.intervalRange(0, 3, 0, 2, TimeUnit.SECONDS), Observable.intervalRange(6, 3, 1, 2, TimeUnit.SECONDS))
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

          輸出:blog

08-06 16:42:42.503 23013 23013 D Operation: onSubscribe
08-06 16:42:42.505 23013 23037 D Operation: onNext: value = 0
08-06 16:42:43.505 23013 23038 D Operation: onNext: value = 6
08-06 16:42:44.506 23013 23037 D Operation: onNext: value = 1
08-06 16:42:45.506 23013 23038 D Operation: onNext: value = 7
08-06 16:42:46.505 23013 23037 D Operation: onNext: value = 2
08-06 16:42:47.506 23013 23038 D Operation: onNext: value = 8
08-06 16:42:47.509 23013 23038 D Operation: onComplete

 

          3.3   concatDelayError() / mergeDelayError()----- 見rxdocs.pdf第140頁事件

       做用:若其中一個被觀察者發生onError事件,推遲到其餘觀察者發送完事件後才觸發。

 

          3.4   zip()----- 見rxdocs.pdf第147頁

       做用:將多個Observable發射的數據按順序組合起來,每一個數據只能組合一次,並且都是有序的。最終組合的數據的數量由發射數據最少的Observable來決定。

 

    public static void zip() {
        Observable.zip(Observable.just(1, 2, 3, 4), Observable.just("A", "B", "C"), new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer + s;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "onNext: value = " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

          輸出:

08-06 17:19:13.248 24303 24303 D Operation: onSubscribe
08-06 17:19:13.249 24303 24303 D Operation: onNext: value = 1A
08-06 17:19:13.249 24303 24303 D Operation: onNext: value = 2B
08-06 17:19:13.249 24303 24303 D Operation: onNext: value = 3C
08-06 17:19:13.249 24303 24303 D Operation: onComplete

 

          3.5   combineLatest()----- 見rxdocs.pdf第133頁

       做用:當兩個Observables中的任何一個發射了數據時,使用一個函數結合每一個Observable發射的最 近數據項,而且基於這個函數的結果發射數據

                     與zip()的區別:zip() = 按個數合併,即1對1合併;combineLatest() = 按時間合併,即在同一個時間點上合併

 

    public static void combineLatest() {
        Observable.combineLatest(Observable.intervalRange(0, 3, 0, 2, TimeUnit.SECONDS), Observable.intervalRange(6, 3, 1, 2, TimeUnit.SECONDS), new BiFunction<Long, Long, String>() {
            @Override
            public String apply(Long aLong, Long aLong2) throws Exception {
                return "" + aLong + "+" + aLong2;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "onNext: value = " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

          輸出:

08-06 17:50:20.240  5792  5792 D Operation: onSubscribe
08-06 17:50:21.248  5792  5811 D Operation: onNext: value = 0+6
08-06 17:50:22.242  5792  5810 D Operation: onNext: value = 1+6
08-06 17:50:23.242  5792  5811 D Operation: onNext: value = 1+7
08-06 17:50:24.241  5792  5810 D Operation: onNext: value = 2+7
08-06 17:50:25.242  5792  5811 D Operation: onNext: value = 2+8
08-06 17:50:25.242  5792  5811 D Operation: onComplete

 

          3.6   combineLatestDelayError()

       做用:做用相似於concatDelayError() / mergeDelayError() ,即錯誤處理,此處不做過多描述

 

          3.7   reduce()----- 見rxdocs.pdf第210頁

       做用:把被觀察者須要發送的事件聚合成1個事件 & 發送

 

    public static void reduce() {
        Observable.just(1, 2, 3, 4, 5)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) throws Exception {
                        return integer + integer2;
                    }
                }).subscribe(new MaybeObserver<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onSuccess(Integer value) {
                Log.d(TAG, "onSuccess: value = " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

          輸出:

08-06 18:02:50.062  7606  7606 D Operation: onSubscribe
08-06 18:02:50.062  7606  7606 D Operation: onSuccess: value = 15

 

          3.8   collect()

       做用:將被觀察者Observable發送的數據事件收集到一個數據結構裏

 

 

   public static void collect() {
        Observable.just(1, 2, 3)
                .collect(new Callable<List<Integer>>() {
                    @Override
                    public List<Integer> call() throws Exception {
                        return new ArrayList<>();
                    }
                }, new BiConsumer<List<Integer>, Integer>() {
                    @Override
                    public void accept(List<Integer> list, Integer integer) throws Exception {
                        list.add(integer);
                    }
                }).subscribe(new SingleObserver<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onSuccess(List<Integer> list) {
                for (Integer i : list) {
                    Log.d(TAG, "onSuccess: value = " + i);
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }
        });
    }

          輸出:

08-06 19:09:56.003 12640 12640 D Operation: onSubscribe
08-06 19:09:56.003 12640 12640 D Operation: onSuccess: value = 1
08-06 19:09:56.003 12640 12640 D Operation: onSuccess: value = 2
08-06 19:09:56.003 12640 12640 D Operation: onSuccess: value = 3

 

          3.9   startWith() / startWithArray()----- 見rxdocs.pdf第144頁

       做用:在一個被觀察者發送事件前,追加發送一些數據 / 一個新的被觀察者

 

    public static void startWith() {
        Observable.just(1, 2, 3)
                .startWith(Observable.just(7, 8, 9))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

          輸出:

08-06 19:18:16.663 13408 13408 D Operation: onSubscribe
08-06 19:18:16.663 13408 13408 D Operation: onNext: value = 7
08-06 19:18:16.663 13408 13408 D Operation: onNext: value = 8
08-06 19:18:16.663 13408 13408 D Operation: onNext: value = 9
08-06 19:18:16.663 13408 13408 D Operation: onNext: value = 1
08-06 19:18:16.663 13408 13408 D Operation: onNext: value = 2
08-06 19:18:16.663 13408 13408 D Operation: onNext: value = 3
08-06 19:18:16.664 13408 13408 D Operation: onComplete

 

          3.10   count()----- 見rxdocs.pdf第207頁

       做用:統計被觀察者發送事件的數量

    public static void count() {
        Observable.just(1, 2, 3)
                .count()
                .subscribe(new SingleObserver<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onSuccess(Long aLong) {
                        Log.d(TAG, "onSuccess: value = " + aLong);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }
                });
    }

          輸出:

08-06 19:25:07.151 14405 14405 D Operation: onSubscribe
08-06 19:25:07.151 14405 14405 D Operation: onSuccess: value = 3

 

     四、總結

相關文章
相關標籤/搜索