Rxjava2(三)、操做符的使用(未完待續)

Android進階系列之第三方庫知識點整理。java

知識點總結,整理也是學習的過程,若有錯誤,歡迎批評指出。web

第一篇:Rxjava2(一)、基礎概念及使用
第二篇:Rxjava2(二)、五種觀察者模式及背壓數組

終於到操做符了,我以爲rxjava2如此好用,絕對少不了操做符的功勞,下面這張圖你就簡單的掃一眼,別慌,咱們慢慢啃。併發

上一篇講了,rxjava有五種觀察者建立模式,其中ObservableFlowable差很少,只是Flowable支持背壓,而其它三種,都是簡化版的Observable,因此,本篇以Observable方式來說操做符的使用。app

Observable源碼ide

Observable是一個抽象類,繼承ObservableSource函數

ObservableSourcepost

1、建立操做符

這類操做符,建立直接返回Observable學習

1.一、嵌套回調事件

1.1.一、create

create是最經常使用的一個操做符,該操做符的參數中產生的emitter發射器,經過onNext不斷給下游發送數據,也能夠發送onCompleteonError事件給下游。fetch

須要發送給下游的數據,就經過emitter.onNext()給下游發送。

當發送了onComplete或者onError事件後,下游中止接收剩下的onNext事件

示意圖:

方法:

static <T> Observable<T> create(ObservableOnSubscribe<T> source)
複製代碼

demo:

  Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("A");
                emitter.onNext("B");
                // .....
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: s=" + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete");
            }
        });
複製代碼

結果:

1.二、複雜數據遍歷

這類操做符,直接將一個數組集合拆分紅單個ObJect數據依次發送給下游,也能夠直接發送Object數據。

1.2.一、just

轉換一個或多個 Object 數據,依次將這些數據發射到下游。

最多接收十個Object參數。

示意圖:

方法:

A : 最多隻接收十個參數。

Demo:

        Observable.just("A""B""C""D")
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, "accept: s=" + s);
                    }
                });
複製代碼

結果:

1.2.二、fromArray

直接傳入一個數組數據,操做符將數組裏面的元素按前後順序依次發送給下游,能夠發送十個以上的數據。

示意圖:

方法:

static <T> Observable<T> fromArray(T... items)
複製代碼

Demo:

String[] data = new String[]{"A""B""C""D""E""F""G""H""I""J""K"};
        Observable.fromArray(data)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, "fromArray--accept: s=" + s
                        );
                    }
                });
複製代碼

結果:

1.2.三、fromIterable

直接傳入一個集合數據,操做符將集合裏面的元素按前後順序依次發送給下游,能夠發送十個以上的數據。

示意圖:

方法:

static <T> Observable<T> fromIterable(Iterable<? extends T> source)
複製代碼

Demo:

        List<String> mData = new ArrayList<>();
        mData.add("A");
        mData.add("B");
        mData.add("C");
        Observable.fromIterable(mData)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, "fromIterable--accept: s=" + s);
                    }
                });
複製代碼

結果:

1.2.四、range

快速建立一個被觀察者對象,連續發送一個指定開始和總數的事件序列

當即發送,無延時

示意圖:

方法:

static Observable<Integer> range(final int start, final int count)
複製代碼

Demo:

        // 從3開始發送,直到發送了十個數據中止。
        Observable.range(310).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "range--accept: integer=" + integer);
            }
        });
複製代碼

結果:

1.三、定時任務

1.3.一、interval

快速建立一個被觀察者,延遲必定時間後再每隔指定的一個時間發送一個事件(從0開始的整數)給下游。

發送數據從0開始,依次+1整數遞增

延遲時間能夠爲0,重載方法不設置默認使用第二個參數數值。

示意圖:

方法:

// initialDelay:發射第一個值須要等待時間
// period:後續每隔多少秒發射一個值
// unit:前兩個參數的時間單位
Observable<Long> interval(long initialDelay, long period, TimeUnit unit)

// 兩參方法
public static Observable<Long> interval(long period, TimeUnit unit) 
{
       // 第一個參數和第二個參數一致,即延遲period後再每隔period秒發送一個事件。
    // 默認使用 Schedulers.computation()
    return interval(period, period, unit, Schedulers.computation());
 }
複製代碼

示意圖:

方法:

// initialDelay:發射第一個值須要等待時間
// period:後續每隔多少秒發射一個值
// unit:前兩個參數的時間單位
// scheduler:等待發生併發出項目的調度程序
static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
複製代碼

demo:

// 延遲2秒後發送一個事件,後續每隔五秒發送一個事件
Observable.interval(25, TimeUnit.SECONDS)
        .subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                Log.d(TAG, "onNext: aLong=" + aLong);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: error" + e);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });
複製代碼
1.3.二、intervalRange

快速建立1個被觀察者對象,每隔指定時間發送1個事件,能夠指定事件發送開始的值和總的值。

示意圖:

方法:

// start:範圍起始值
// count:要發出的值的總數,若是爲零,則操做員在初始延遲後發出onComplete。
// initialDelay:發出第一個值(開始)以前的初始延遲
// period:後續值之間的時間段
// unit:前面時間參數單位
static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
複製代碼

demo:

// 第一個延遲三秒後發送int值2,後續每隔1秒累加發送給下游,一共發送10個數據。
Observable.intervalRange(21031, TimeUnit.SECONDS)
        .subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                Log.d(TAG, "onNext: aLong=" + aLong);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: error" + e);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });
複製代碼

1.四、延遲任務

1.4.一、defer

建立一個Observable對象,被觀察者邏輯真正執行的時機是在其被訂閱的時候。

當下遊訂閱後,上游纔開始處理邏輯。

示意圖:

方法:

// 
static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)
複製代碼

demo:

String[] mStrings = new String[]{"A""B""C""D"};

Observable observable = Observable.defer(new Callable<ObservableSource<String>>() {
    @Override
    public ObservableSource<String> call() throws Exception {
        // 上游發送mStrings數組
        return Observable.fromArray(mStrings);
    }
});
// 在訂閱以前,將數組數據改變
mStrings = new String[]{"defer,訂閱時候才建立"};
// 訂閱
observable.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d(TAG, "accept: s=" + s);
    }
});
複製代碼
1.4.二、timer

建立一個被觀察者對象,上游延時指定的時間後發送一個事件到下游。

發送的數值爲Long型的0

示意圖:

方法:

// delay:發射單個數據以前的延時
// unit:前者時間單位
// scheduler:指定的調度程序 (默認爲Schedulers.computation())
static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) 
複製代碼

demo:

public void timer() {
        // 延遲5秒後發送Long型值0到下游,可指定Schedulers,默認Schedulers.computation()
        Observable.timer(5, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d(TAG, "accept: aLong=" + aLong);
                    }
                });
    }
複製代碼

結果:

2、變換操做符

2.一、變換

2.1.一、map

對上游發送的每個事件都進行指定的函數處理,從而變換成另外一個事件再發送給下游。

常使用場景:用做數據類型轉換

示意圖:

方法:

// R:輸出類型
// mapper:應用於ObservableSource發出的每一個項目的函數
final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
複製代碼

demo:

    public void map() {
        // 經過just發送整型數值一、二、3
        Observable.just(123).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                // 經過Map操做符對上游的數據進行函數處理,再轉換成指定的事件發送給下游
                return integer + "變換";
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: s=" + s);
            }
        });
    }
複製代碼
2.1.二、flatMap

將一個發送事件的上游Observable變換爲多個發送事件的Observables,而後將它們發射的事件單獨作處理後再合併放進一個單獨的Observable裏發送給下游。

示意圖:

能夠看到上游發送了三個事件(注意顏色),中間對每一個事假數據進行處理後(每個圓變成兩個矩形),再合併成包含六個矩形事件的Observable對象發送給下游,注意矩形顏色,他是無規律,無序的,並非嚴格按照上游發送的順序來發送給下游。

方法:

final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
複製代碼

demo:

public void flatMap() {
    // 被觀察者經過just發送整型數值一、二、3
   Observable.just(123).flatMap(new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Integer integer) throws Exception {
            // 對收到的數值再進行函數處理。
            final List<String> list = new ArrayList<>();
            for (int i = 0; i < 3; i++) {
                list.add("變換後的數據" + integer);
            }
            // 將函數處理後的數據,在包裝成一個Observable對象發送給下游。
            return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d(TAG, "accept: s=" + s);
        }
    });
}
複製代碼
2.1.三、concatMap

flatMap同樣的功能,只是flatMap不能保證轉換後發送給下游事件的時序,concatMap轉換後能嚴格按照上游發送的順序再發送給下游。

示意圖:

flatMap同樣,重點注意顏色,轉換後顏色和上游發送的順序一致,有序發送

方法:

final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch)
複製代碼

demo:

public void concatMap() {
    // 被觀察者經過just發送整型數值一、二、3
    Observable.just(123).concatMap(new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Integer integer) throws Exception {
            // 對收到的數值再進行函數處理。
            final List<String> list = new ArrayList<>();
            for (int i = 0; i < 3; i++) {
                list.add("變換後的數據" + integer);
            }
            // 將函數處理後的數據,在包裝成一個Observable對象發送給下游。
            return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d(TAG, "accept: s=" + s);
        }
    });
}
複製代碼

3、合併操做符

3.一、concat

組合多個被觀察者一塊兒發送數據,合併後 按發送順序串行執行

組合的被觀察者數量要求小於等於4個,從提供的方法參數裏面能夠得知。

示意圖:

方法:

public static <T> Observable<T> concat(
            ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
            ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)

複製代碼

demo:

    public void concat() {
        // 用just操做符建立三個Observable對象
        Observable<String> observable1 = Observable.just("1""2");
        Observable<String> observable2 = Observable.just("A""B""C");
        Observable<String> observable3 = Observable.just("hello""rxjava");
        // 使用concat操做符合並三個Observable對象,並將合併後的數據順序(串行)發送給下游
        Observable.concat(observable1
                , observable2, observable3)
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, "accept: s=" + s);
                    }
                });
    }
複製代碼
3.二、concatArray

concat同樣,組合多個被觀察者一塊兒發送數據,合併後 按發送順序串行執行

concatArray對組合的被觀察者對象沒有個數限制,能夠大於4個。

示意圖:

上游發送的是一個組合的觀察者數組,沒有數量限制(注意顏色)

轉換後串行發送(顏色和上游發送順序對應)

方法:

static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources) 
複製代碼

demo:

public void concatArray() {
    Observable<String> observable1 = Observable.just("1""2");
    Observable<String> observable2 = Observable.just("A""B""C");
    Observable<String> observable3 = Observable.just("D""E");
    Observable<String> observable4 = Observable.just("F");
    Observable<String> observable5 = Observable.just("G");
    Observable.concatArray(observable1, observable2, observable3, observable4, observable5)
            .subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d(TAG, "accept: s=" + s);
                }
            });
}
複製代碼
3.三、concatDelayError、concatArrayDelayError

使用concat操做符時,若是遇到其中一個被觀察者發出onError事件則會立刻終止其餘被觀察者的事件,若是但願onError事件推遲到其餘被觀察者都結束後才觸發,可使用對應的concatDelayError。

方法:

public static <T> Observable<T> concatDelayError(Iterable<? extends ObservableSource<? extends T>> sources) {
        ObjectHelper.requireNonNull(sources, "sources is null");
        return concatDelayError(fromIterable(sources));
    } 

public static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources) {
        return concatDelayError(sources, bufferSize(), true);
    }

public static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources, int prefetch, boolean tillTheEnd)
複製代碼

demo:

    public void concatArrayDelayError() {

        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("A");
                emitter.onNext("B");
                emitter.onNext("C");
                emitter.onError(new NullPointerException(""));
                emitter.onNext("D");
            }
        });


 Observable.concatArrayDelayError(observable, Observable.just("E""F"))
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe: ");
                    }

                    @Override
                    public void onNext(String s) {
                        Log.d(TAG, "onNext: s="+s);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: e" + e.getMessage(), e);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });
    }
複製代碼

結果:

能夠看到,第一個observable發送到c後,手動拋出一個錯誤,可是並滅有影響到Observable.just("E", "F")的執行,咱們依舊打印出了 E,F兩個參數後纔去執行咱們手動拋出的NullPointerException錯誤

。。。。

4、總結

操做符這部份內容比較多,先整理這部分,後面會對其餘操做符再作整理。

相關文章
相關標籤/搜索