RxJava2系列之建立型操做符

原文首發於微信公衆號:jzman-blog,歡迎關注交流!java

RxJava 是 ReactiveX 在 Java 上的開源的實現,一個用於經過使用可觀察序列來進行異步編程和基於事件的程序的庫,這是官網的介紹,主要關注點是異步編程和鏈式調用以及事件序列。react

  1. 引入RxJava
  2. 概念
  3. 基本實現
  4. Just操做符
  5. from操做符
  6. defer操做符
  7. empty操做符
  8. never操做符
  9. timer操做符
  10. interval 操做符
  11. range操做符
  12. 總結

引入RxJava

implementation "io.reactivex.rxjava2:rxjava:2.2.3"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
複製代碼

概念

RxJava 中的幾個重要概念是:觀察者(Observer) 、被觀察者(Observable)和事件序列,事件序列徹底由被觀察者者本身控制,那麼被觀察者若是在須要時通知觀察者呢,這就須要被觀察者與觀察者之間創建訂閱關係。創建訂閱關係後,當被觀察者發生變化,觀察者就能在第一時間接收被觀察者的變化。android

在 RxJava2 中觀察者(Observer) 的事件回調方法有四個:編程

  • onSubscribe:用於解除訂閱關係
  • onNext:發送事件時觀察者回調該方法接收發送過來的事件序列
  • onError:發送事件時觀察者回調該方法表示發送事件序列異常,將再也不容許發送事件
  • onComplete:發送事件時觀察者回調該方法表示事件序列發送完畢,容許發送事件

注意數組

  1. onError 調用後不容許繼續發送事件,onComplete 調用後容許發送事件,不管是否能夠繼續發送事件,二者被調用觀察者都不會接收消息;
  2. onError 和 onComplete 互斥只容許調用其中一個,若是你在 onComplete 以後調用 onError 程序必然會崩潰,可是 onError 以後調用 onComplete 不崩潰是由於 onError 以後不容許發送事件,天然不會出錯;
  3. 四個回調方法中,觀察者和被觀察者一旦創建訂閱關係 onSubscribe 方法就會被回調,onNext、onError、onComplete 方法的回調徹底由被觀察者決定是否觸發,這裏容易產生誤解。

基本實現

  1. 建立觀察者 Observer ,觀察者決定時間發生的時候該如何處理,具體參考以下:
//觀察者
Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        //解除訂閱
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        //發送事件時觀察者回調
        Log.i(TAG, "onNext--->"+s);
    }

    @Override
    public void onError(Throwable e) {
        //發送事件時觀察者回調(事件序列發生異常)
        Log.i(TAG, "onError--->");
    }

    @Override
    public void onComplete() {
        //發送事件時觀察者回調(事件序列發送完畢)
        Log.i(TAG, "onComplete--->");
    }
};
複製代碼
  1. 建立被觀察者 Observable,被觀察者決定什麼時出觸發事件以及觸發何種事件,具體參考以下:
//被觀察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("Event1");
        emitter.onNext("Event2");
        emitter.onComplete();
        emitter.onNext("Event3");
    }
});
複製代碼
  1. 創建觀察者與被觀察之間的訂閱關係,具體參考以下:
//創建觀察者與被觀察者之間的訂閱關係
observable.subscribe(observer);
複製代碼

上述代碼的輸出結果參考以下:bash

onSubscribe--->
onNext--->Event1
onNext--->Event2
onComplete--->
複製代碼

顯然,因爲在 發送完 Event2 以後就調用了 onComplete 方法,以後發送的事件 Event3 將不會被觀察者收到。微信

上面代碼還能夠這樣寫,結果是同樣的,具體參考以下:併發

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("Event1");
        emitter.onNext("Event2");
        emitter.onComplete();
        emitter.onNext("Event3");
    }
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->"+s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->");
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});
複製代碼

上面代碼中使用了 Observable 的 create 方法來建立 Observable,並以此來進行相關事件的發送,爲幫助理解來看一下官方的關於 create 操做符的示意圖:異步

create

Observable 中還提供了不少的靜態方法來建立 Observable,下文將會介紹這些經常使用方法。ide

Just 操做符

使用 just 能夠建立一個發送指定事件的 Observable,just 發送事件的上限 10,即最多發送 10 個事件,相較 create 在必定程度上簡化了處理流程,just 重載的方法以下:

public static <T> Observable<T> just(T item) public static <T> Observable<T> just(T item1, T item2) public static <T> Observable<T> just(T item1, T item2, T item3) public static <T> Observable<T> just(T item1, T item2, T item3, T item4) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10) 複製代碼

下面是 just 操做符的簡單使用:

//just操做符的簡單使用
Observable.just("Event1", "Event2", "Event3")
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe--->");
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext--->" + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError--->");
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete--->");
            }
        });
複製代碼

上述代碼的輸出結果以下:

onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->
複製代碼

來看一下官方的關於 just 操做符的示意圖,下面是 just 發送四個事件的示意圖,具體以下:

create

from 操做符

使用 from 相關的操做符能夠建立發送數組(array)、集合(Iterable) 以及異步任務(future)的 Observable,可將 from 相關的操做符分爲以下幾類:

//數組
public static <T> Observable<T> fromArray(T... items) //集合 public static <T> Observable<T> fromIterable(Iterable<? extends T> source) //異步任務 public static <T> Observable<T> fromFuture(Future<? extends T> future) //異步任務+超時時間 public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit) //異步任務+超時時間+線程調度器 public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler) //異步任務+線程調度器 public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler) //Reactive Streams中的發佈者,使用方式相似create操做符,事件的發送由發佈者(被觀察者)自行決定 public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher) 複製代碼
fromArray/fromIterable

下面是 fromArray 的使用方式,具體以下:

//fromArray操做符的簡單使用
String[] events = {"Event1", "Event2", "Event3"};
Observable.fromArray(events).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->" + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->");
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});
複製代碼

看一下 fromArray 的官方示意圖,具體以下:

fromArray

下面是 fromIterable 的使用方式,具體以下:

//fromIterable操做符的簡單使用
List<String> list = new ArrayList<>();
list.add("Event1");
list.add("Event2");
list.add("Event3");
Observable.fromIterable(list).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->" + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->" + e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});
複製代碼

看一下 fromIterable 的官方示意圖,具體以下:

fromIterable

上述代碼的輸出參考以下:

onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->
複製代碼
fromCallable

Callable 位於 java.util.concurrent 包下,和 Runnable 相似,可是帶有返回值,使用 fromCallable 發出的事件是從主線程發出的,若是不訂閱則不會執行 call 裏面的操做,使用 fromCallable 要注意如下幾點:

  1. 涉及耗時任務要使用 subscribeOn 切換訂閱線程;
  2. 執行耗時任務是接收 Observable 的發射值要使用 observeOn 切換到 Main 線程接收;
  3. 爲避免內存泄漏等問題,在相應的onDestroy方法中取消訂閱。

下面是 fromCallable 的簡單使用,參考以下:

//fromCallable操做符的簡單使用
Observable.fromCallable(new Callable<String>() {
    @Override
    public String call() throws Exception {
        //其餘操做...
        return "call";
    }
}).subscribe(new Observer<String>() {

    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->" + s+Thread.currentThread());
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->" + e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});
複製代碼

上述到執行結果以下:

onSubscribe--->
onNext--->call
onComplete--->
複製代碼

看一下 fromCallable 的官方示意圖,具體以下:

fromCallable

fromFuture

從上面可知 fromFuture 有四個重載方法,參數中能夠指定異步任務、任務超時時間、線程調度器等,先來認識一下 Future 接口,Future 接口位於 java.util.concurrent 包下,主要做用是對 Runnable 和 Callable 的異步任務執行進行任務是否執行的判斷、任務結果的獲取、具體任務的取消等,而 Runnable 和 Callable 伴隨着線程的執行,這就意味着使用 fromFuture 發出的事件是從非 Main 線程發出,若是執行耗時任務要記得使用 subscribeOn 切換訂閱線程,下面以 FutureTask 爲例來講明 fromFuture 的使用方式。

建立一個 Callable 用來執行異步任務,參考以下:

//異步任務
private class MCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        Log.i(TAG, "任務執行開始--->");
        Thread.sleep(5000);
        Log.i(TAG, "任務執行結束--->");
        return "MCallable";
    }
}
複製代碼

而後,建立一個 FutureTask ,參考以下:

//建立FutureTask
MCallable mCallable = new MCallable();
FutureTask<String> mFutureTask = new FutureTask<>(mCallable);
複製代碼

而後,使用 Thread 執行上面建立的 Future,參考以下:

//執行FutureTask
new Thread(mFutureTask).start();
複製代碼

最後,使用 fromFuture 建立與之對應的 Observeable 並訂閱,參考以下:

//fromFuture
Observable.fromFuture(mFutureTask)
        .subscribeOn(Schedulers.io()) //切換訂閱線程
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe--->");
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext--->" + s+Thread.currentThread());
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError--->" + e);
            }

            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete--->");
            }
        });
複製代碼

上述代碼的只想結果以下:

任務執行開始--->
onSubscribe--->
任務執行結束--->
onNext--->MCallable
onComplete--->
複製代碼

看一下 fromFuture 的官方示意圖,下面的示意圖是 fromFuture 方法攜帶一個參數 Future 的示意圖,具體以下:

fromFuture

上面的異步任務延時 5 秒,若是使用 fromFuture 的重載方法指定超時時間爲 4 秒,參考以下:

//指定超時時間爲4s
Observable.fromFuture(mFutureTask,4, TimeUnit.SECONDS,Schedulers.io())
//...
複製代碼

此時,因爲異步任務不能在 4 秒內完成,Observer 會相應的被觸發 onError 方法,執行結果參考以下:

任務執行開始--->
onSubscribe---> 
onError--->java.util.concurrent.TimeoutException
任務執行結束--->
複製代碼

那麼如何取消這個異步任務呢,這也正是 Future 的優勢所在,能夠隨意的取消這個任務,具體參考以下:

//異步任務的取消
public void cancelTask(View view) {
    if (mFutureTask.isDone()) {
        Log.i(TAG, "任務已經完成--->");
    } else {
        Log.i(TAG, "任務正在執行--->");
        boolean cancel = mFutureTask.cancel(true);
        Log.i(TAG, "任務取消是否成功--cancel->" + cancel);
        Log.i(TAG, "任務取消是否成功--isCancelled->" + mFutureTask.isCancelled());
    }
}
複製代碼

下面是在任務執行過程當中取消任務的執行結果,參考以下:

任務執行開始--->
onSubscribe--->
任務正在執行--->
任務取消是否成功--cancel->true
任務取消是否成功--isCancelled->true
onError--->java.util.concurrent.CancellationException
複製代碼

這樣就取消了正在執行的異步任務,這部份內容更多的是關於 Java Future 相關的知識。

defer 操做符

使用 defer 建立 Observable 時,只有在訂閱時去纔會建立 Observable 併發送相關的事件,下面是 defer 操做符的使用,參考以下:

//defer
defer = "old";
Observable<String> observable = Observable.defer(new Callable<ObservableSource<String>>() {
    @Override
    public ObservableSource<String> call() throws Exception {
        return Observable.just(defer);
    }
});

defer = "new";
observable.subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext--->"+s);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});
複製代碼

上述代碼的執行結果以下:

onSubscribe--->
onNext--->new
onComplete--->
複製代碼

顯然,最終在訂閱以前 Observable 工廠又建立了最新的 Observable,onNext 中接收的數據也是最新的,爲了理解 defer 操做符,來看一下官方 defer 操做符的示意圖:

defer

empty 操做符

使用 empty 操做符能夠建立一個不發生任何數據但正常終止的 Observable,參考以下:

//empty
Observable.empty().subscribe(new Observer<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Object o) {
        Log.i(TAG, "onNext--->"+o);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});
複製代碼

上述代碼的輸出結果以下:

onSubscribe--->
onComplete--->
複製代碼

爲了方便理解 empty 操做符的使用,來看一些 empty 操做符的官方示意圖:

empty

never 操做符

使用 never 操做符能夠建立一個不發生任何數據也不終止的 Observable,參考以下:

//never
Observable.never().subscribe(new Observer<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Object o) {
        Log.i(TAG, "onNext--->"+o);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});
複製代碼

上述代碼的輸出結果以下:

onSubscribe--->
複製代碼

爲了方便理解 never 操做符的使用,來看一些 never 操做符的官方示意圖:

never

timer 操做符

timer 操做符能夠建立一個帶延時的發送固定數值 0 的 Observable,還能夠指定線程調度器,timer 重載方法以下:

//延時
public static Observable<Long> timer(long delay, TimeUnit unit) //延時+線程調度器 public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) 複製代碼

下面是 timer 的使用方式:

//timer
Observable.timer(3, TimeUnit.SECONDS, Schedulers.io()).subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Long s) {
        Log.i(TAG, "onNext--->"+s);
        Log.i(TAG, "當前線程--->"+Thread.currentThread().getName());
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});
複製代碼

上述代碼的執行結果以下:

onSubscribe--->
//延時3秒收到數據
onNext--->0
當前線程--->RxCachedThreadScheduler-1
onComplete--->
複製代碼

爲了方便理解 timer 操做符的使用,來看一些 timer 操做符的官方示意圖,下面以 timer 指定延時器和線程調度器的方式爲例,具體以下:

timer

interval 操做符

使用 interval 操做符能夠建立一個能夠以固定時間間隔發送整數值的一個 Observable,interval 能夠指定初始延時時間、時間間隔、線程調度器等,interval 重載方法以下:

//初始延時+時間間隔
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) //初始延時+時間間隔+線程調度器 public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) //時間間隔 public static Observable<Long> interval(long period, TimeUnit unit) //時間間隔+線程調度器 public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler) 複製代碼

下面是 interval 的使用方式:

//interval
Observable.interval(3,TimeUnit.SECONDS).subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Long aLong) {
        Log.i(TAG, "onNext--->"+aLong);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});
複製代碼

上述代碼執行後就會以每一個 3 秒持續發送值爲整數的事件,執行結果以下:

onSubscribe--->
onNext--->0
onNext--->1
onNext--->2
...
複製代碼

爲了方便理解 interval 操做符的使用,來看一些 interval 操做符的官方示意圖,下面以 interval 指定間隔時間和時間單位的方式爲例,具體以下:

interval

range 操做符

使用 range 操做符能夠建立一個能夠發送指定整數範圍值的一個 Observable,range 相關的方法有兩個,只是數值的範圍表示不一樣,兩個方法聲明以下:

// int
public static Observable<Integer> range(final int start, final int count) // long public static Observable<Long> rangeLong(long start, long count) 複製代碼

下面是 range 的使用方式,具體參考以下:

//range
Observable.range(1,5).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe--->");
    }

    @Override
    public void onNext(Integer integer) {
        Log.i(TAG, "onNext--->"+integer);
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError--->"+e);
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete--->");
    }
});
複製代碼

上述代碼的執行結果以下:

onSubscribe--->
onNext--->1
onNext--->2
onNext--->3
onNext--->4
onNext--->5
onComplete--->
複製代碼

爲了方便理解 range 操做符的使用,來看一些 range 操做符的官方示意圖:

range

總結

這篇文章主要介紹了 RxJava2 相關基礎知識以及 RxJava2 中建立型操做符的理解和使用。能夠選擇關注我的微信公衆號:jzman-blog 獲取最新更新,一塊兒交流學習!

相關文章
相關標籤/搜索