操做符之建立操做符

     一、做用

  • 建立 被觀察者( Observable) 對象 & 發送事件。

     二、類型

 

     三、詳解

          3.1  create()數組

    public static void create() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "onNext: value = " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

         輸出:ide

08-06 10:27:59.073  8859  8859 D Operation: onSubscribe
08-06 10:27:59.073  8859  8859 D Operation: onNext: value = 1
08-06 10:27:59.073  8859  8859 D Operation: onNext: value = 2
08-06 10:27:59.073  8859  8859 D Operation: onNext: value = 3
08-06 10:27:59.073  8859  8859 D Operation: onComplete

 

          3.2   just()----- 見rxdocs.pdf第49頁測試

     做用:按順序原樣發射數據spa

                   注:最多發射10個參數3d

 

    public static void just() {
        Observable.just(1, 6, 8)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

          輸出:code

08-06 10:29:12.812  9095  9095 D Operation: onSubscribe
08-06 10:29:12.812  9095  9095 D Operation: onNext: value = 1
08-06 10:29:12.812  9095  9095 D Operation: onNext: value = 6
08-06 10:29:12.812  9095  9095 D Operation: onNext: value = 8
08-06 10:29:12.812  9095  9095 D Operation: onComplete

 

          3.3  fromArray()----- 見rxdocs.pdf第42頁server

       做用:直接發送 傳入的數組數據對象

 

    public static void fromArray() {
        String[] array = new String[]{"I", "am", "RxJava"};
        Observable.fromArray(array)
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(String value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

          輸出:blog

08-06 11:09:21.574 11135 11135 D Operation: onSubscribe
08-06 11:09:21.574 11135 11135 D Operation: onNext: value = I
08-06 11:09:21.574 11135 11135 D Operation: onNext: value = am
08-06 11:09:21.574 11135 11135 D Operation: onNext: value = RxJava
08-06 11:09:21.574 11135 11135 D Operation: onComplete

 

          3.4   fromIterable()  ----- 見rxdocs.pdf第42頁事件

       做用:直接發送 傳入的集合List數據

    public static void fromIterable() {
        List<String> array = new ArrayList<>();
        array.add("I");
        array.add("am");
        array.add("RxJava");
        Observable.fromIterable(array)
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(String value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

          輸出:

08-06 11:18:52.909 11428 11428 D Operation: onSubscribe
08-06 11:18:52.909 11428 11428 D Operation: onNext: value = I
08-06 11:18:52.909 11428 11428 D Operation: onNext: value = am
08-06 11:18:52.910 11428 11428 D Operation: onNext: value = RxJava
08-06 11:18:52.910 11428 11428 D Operation: onComplete

 

          3.5   empty,error,never  ----- 見rxdocs.pdf第41頁

// 下列方法通常用於測試使用

<-- empty()  -->
// 該方法建立的被觀察者對象發送事件的特色:僅發送Complete事件,直接通知完成
Observable observable1=Observable.empty(); 
// 即觀察者接收後會直接調用onCompleted()

<-- error()  -->
// 該方法建立的被觀察者對象發送事件的特色:僅發送Error事件,直接通知異常
// 可自定義異常
Observable observable2=Observable.error(new RuntimeException())
// 即觀察者接收後會直接調用onError()

<-- never()  -->
// 該方法建立的被觀察者對象發送事件的特色:不發送任何事件
Observable observable3=Observable.never();
// 即觀察者接收後什麼都不調用

 

          3.6   defer()  ----- 見rxdocs.pdf第38頁

     做用:直到有觀察者(Observer )訂閱時,才動態建立被觀察者對象(Observable) & 發送事件

 

 

    public static void defer() {
        Observable.defer(new Callable<ObservableSource<?>>() {
            @Override
            public ObservableSource<?> call() throws Exception {
                return Observable.just(3, 6, 9);
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Object value) {
                Log.d(TAG, "onNext: value = " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError: " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

          輸出:

08-06 11:38:32.468 12057 12057 D Operation: onSubscribe
08-06 11:38:32.468 12057 12057 D Operation: onNext: value = 3
08-06 11:38:32.468 12057 12057 D Operation: onNext: value = 6
08-06 11:38:32.468 12057 12057 D Operation: onNext: value = 9
08-06 11:38:32.468 12057 12057 D Operation: onComplete

 

          3.7   timer()  ----- 見rxdocs.pdf第61頁

      做用:延遲指定時間後,發送1個數值0(Long類型),   運行在computation Scheduler.

 

    public static void timer() {
        Observable.timer(2, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

          輸出:

08-06 11:44:25.364 12314 12314 D Operation: onSubscribe
08-06 11:44:27.366 12314 12340 D Operation: onNext: value = 0
08-06 11:44:27.368 12314 12340 D Operation: onComplete

 

         3.8   interval()  ----- 見rxdocs.pdf第47頁

      做用:每隔指定時間 就發送 事件

    public static void interval() {
        Observable.interval(0, 2, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

          輸出:

08-06 11:51:14.280 12641 12641 D Operation: onSubscribe
08-06 11:51:14.281 12641 12665 D Operation: onNext: value = 0
08-06 11:51:16.282 12641 12665 D Operation: onNext: value = 1
08-06 11:51:18.282 12641 12665 D Operation: onNext: value = 2
08-06 11:51:20.286 12641 12665 D Operation: onNext: value = 3

 

        3.9   intervalRange()

    做用:每隔指定時間 就發送 事件,可指定發送的數據的數量。 做用相似於interval(),但可指定發送的數據的數量。

    public static void intervalRange() {
        Observable.intervalRange(6, 5, 0, 2, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

          輸出:

08-06 14:14:14.095 15543 15543 D Operation: onSubscribe
08-06 14:14:14.097 15543 15567 D Operation: onNext: value = 6
08-06 14:14:16.098 15543 15567 D Operation: onNext: value = 7
08-06 14:14:18.097 15543 15567 D Operation: onNext: value = 8
08-06 14:14:20.098 15543 15567 D Operation: onNext: value = 9
08-06 14:14:22.097 15543 15567 D Operation: onNext: value = 10
08-06 14:14:22.098 15543 15567 D Operation: onComplete

 

        3.10   range()  ----- 見rxdocs.pdf第51頁

       做用:連續發送 1個事件序列,可指定範圍。做用相似於intervalRange(),但區別在於:無延遲發送事件

 

    public static void range() {
        Observable.range(6, 5)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "onNext: value = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

          輸出:

08-06 14:27:52.744 17569 17569 D Operation: onSubscribe
08-06 14:27:52.745 17569 17569 D Operation: onNext: value = 6
08-06 14:27:52.745 17569 17569 D Operation: onNext: value = 7
08-06 14:27:52.745 17569 17569 D Operation: onNext: value = 8
08-06 14:27:52.745 17569 17569 D Operation: onNext: value = 9
08-06 14:27:52.745 17569 17569 D Operation: onNext: value = 10
08-06 14:27:52.745 17569 17569 D Operation: onComplete

 

        3.11   rangeLong()

       做用:相似於range(),區別在於該方法支持數據類型Long

 

     四、總結

相關文章
相關標籤/搜索