原文首發於微信公衆號:jzman-blog,歡迎關注交流!java
RxJava 是 ReactiveX 在 Java 上的開源的實現,一個用於經過使用可觀察序列來進行異步編程和基於事件的程序的庫,這是官網的介紹,主要關注點是異步編程和鏈式調用以及事件序列。react
implementation "io.reactivex.rxjava2:rxjava:2.2.3"
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
複製代碼
RxJava 中的幾個重要概念是:觀察者(Observer) 、被觀察者(Observable)和事件序列,事件序列徹底由被觀察者者本身控制,那麼被觀察者若是在須要時通知觀察者呢,這就須要被觀察者與觀察者之間創建訂閱關係。創建訂閱關係後,當被觀察者發生變化,觀察者就能在第一時間接收被觀察者的變化。android
在 RxJava2 中觀察者(Observer) 的事件回調方法有四個:編程
注意:數組
//觀察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//解除訂閱
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
//發送事件時觀察者回調
Log.i(TAG, "onNext--->"+s);
}
@Override
public void onError(Throwable e) {
//發送事件時觀察者回調(事件序列發生異常)
Log.i(TAG, "onError--->");
}
@Override
public void onComplete() {
//發送事件時觀察者回調(事件序列發送完畢)
Log.i(TAG, "onComplete--->");
}
};
複製代碼
//被觀察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Event1");
emitter.onNext("Event2");
emitter.onComplete();
emitter.onNext("Event3");
}
});
複製代碼
//創建觀察者與被觀察者之間的訂閱關係
observable.subscribe(observer);
複製代碼
上述代碼的輸出結果參考以下:bash
onSubscribe--->
onNext--->Event1
onNext--->Event2
onComplete--->
複製代碼
顯然,因爲在 發送完 Event2 以後就調用了 onComplete 方法,以後發送的事件 Event3 將不會被觀察者收到。微信
上面代碼還能夠這樣寫,結果是同樣的,具體參考以下:併發
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Event1");
emitter.onNext("Event2");
emitter.onComplete();
emitter.onNext("Event3");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->"+s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
複製代碼
上面代碼中使用了 Observable 的 create 方法來建立 Observable,並以此來進行相關事件的發送,爲幫助理解來看一下官方的關於 create 操做符的示意圖:異步
Observable 中還提供了不少的靜態方法來建立 Observable,下文將會介紹這些經常使用方法。ide
使用 just 能夠建立一個發送指定事件的 Observable,just 發送事件的上限 10,即最多發送 10 個事件,相較 create 在必定程度上簡化了處理流程,just 重載的方法以下:
public static <T> Observable<T> just(T item) public static <T> Observable<T> just(T item1, T item2) public static <T> Observable<T> just(T item1, T item2, T item3) public static <T> Observable<T> just(T item1, T item2, T item3, T item4) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9) public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10) 複製代碼
下面是 just 操做符的簡單使用:
//just操做符的簡單使用
Observable.just("Event1", "Event2", "Event3")
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
複製代碼
上述代碼的輸出結果以下:
onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->
複製代碼
來看一下官方的關於 just 操做符的示意圖,下面是 just 發送四個事件的示意圖,具體以下:
使用 from 相關的操做符能夠建立發送數組(array)、集合(Iterable) 以及異步任務(future)的 Observable,可將 from 相關的操做符分爲以下幾類:
//數組
public static <T> Observable<T> fromArray(T... items) //集合 public static <T> Observable<T> fromIterable(Iterable<? extends T> source) //異步任務 public static <T> Observable<T> fromFuture(Future<? extends T> future) //異步任務+超時時間 public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit) //異步任務+超時時間+線程調度器 public static <T> Observable<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler) //異步任務+線程調度器 public static <T> Observable<T> fromFuture(Future<? extends T> future, Scheduler scheduler) //Reactive Streams中的發佈者,使用方式相似create操做符,事件的發送由發佈者(被觀察者)自行決定 public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher) 複製代碼
下面是 fromArray 的使用方式,具體以下:
//fromArray操做符的簡單使用
String[] events = {"Event1", "Event2", "Event3"};
Observable.fromArray(events).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
複製代碼
看一下 fromArray 的官方示意圖,具體以下:
下面是 fromIterable 的使用方式,具體以下:
//fromIterable操做符的簡單使用
List<String> list = new ArrayList<>();
list.add("Event1");
list.add("Event2");
list.add("Event3");
Observable.fromIterable(list).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
複製代碼
看一下 fromIterable 的官方示意圖,具體以下:
上述代碼的輸出參考以下:
onSubscribe--->
onNext--->Event1
onNext--->Event2
onNext--->Event3
onComplete--->
複製代碼
Callable 位於 java.util.concurrent 包下,和 Runnable 相似,可是帶有返回值,使用 fromCallable 發出的事件是從主線程發出的,若是不訂閱則不會執行 call 裏面的操做,使用 fromCallable 要注意如下幾點:
下面是 fromCallable 的簡單使用,參考以下:
//fromCallable操做符的簡單使用
Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
//其餘操做...
return "call";
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s+Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
複製代碼
上述到執行結果以下:
onSubscribe--->
onNext--->call
onComplete--->
複製代碼
看一下 fromCallable 的官方示意圖,具體以下:
從上面可知 fromFuture 有四個重載方法,參數中能夠指定異步任務、任務超時時間、線程調度器等,先來認識一下 Future 接口,Future 接口位於 java.util.concurrent 包下,主要做用是對 Runnable 和 Callable 的異步任務執行進行任務是否執行的判斷、任務結果的獲取、具體任務的取消等,而 Runnable 和 Callable 伴隨着線程的執行,這就意味着使用 fromFuture 發出的事件是從非 Main 線程發出,若是執行耗時任務要記得使用 subscribeOn 切換訂閱線程,下面以 FutureTask 爲例來講明 fromFuture 的使用方式。
建立一個 Callable 用來執行異步任務,參考以下:
//異步任務
private class MCallable implements Callable<String> {
@Override
public String call() throws Exception {
Log.i(TAG, "任務執行開始--->");
Thread.sleep(5000);
Log.i(TAG, "任務執行結束--->");
return "MCallable";
}
}
複製代碼
而後,建立一個 FutureTask ,參考以下:
//建立FutureTask
MCallable mCallable = new MCallable();
FutureTask<String> mFutureTask = new FutureTask<>(mCallable);
複製代碼
而後,使用 Thread 執行上面建立的 Future,參考以下:
//執行FutureTask
new Thread(mFutureTask).start();
複製代碼
最後,使用 fromFuture 建立與之對應的 Observeable 並訂閱,參考以下:
//fromFuture
Observable.fromFuture(mFutureTask)
.subscribeOn(Schedulers.io()) //切換訂閱線程
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->" + s+Thread.currentThread());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->" + e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
複製代碼
上述代碼的只想結果以下:
任務執行開始--->
onSubscribe--->
任務執行結束--->
onNext--->MCallable
onComplete--->
複製代碼
看一下 fromFuture 的官方示意圖,下面的示意圖是 fromFuture 方法攜帶一個參數 Future 的示意圖,具體以下:
上面的異步任務延時 5 秒,若是使用 fromFuture 的重載方法指定超時時間爲 4 秒,參考以下:
//指定超時時間爲4s
Observable.fromFuture(mFutureTask,4, TimeUnit.SECONDS,Schedulers.io())
//...
複製代碼
此時,因爲異步任務不能在 4 秒內完成,Observer 會相應的被觸發 onError 方法,執行結果參考以下:
任務執行開始--->
onSubscribe--->
onError--->java.util.concurrent.TimeoutException
任務執行結束--->
複製代碼
那麼如何取消這個異步任務呢,這也正是 Future 的優勢所在,能夠隨意的取消這個任務,具體參考以下:
//異步任務的取消
public void cancelTask(View view) {
if (mFutureTask.isDone()) {
Log.i(TAG, "任務已經完成--->");
} else {
Log.i(TAG, "任務正在執行--->");
boolean cancel = mFutureTask.cancel(true);
Log.i(TAG, "任務取消是否成功--cancel->" + cancel);
Log.i(TAG, "任務取消是否成功--isCancelled->" + mFutureTask.isCancelled());
}
}
複製代碼
下面是在任務執行過程當中取消任務的執行結果,參考以下:
任務執行開始--->
onSubscribe--->
任務正在執行--->
任務取消是否成功--cancel->true
任務取消是否成功--isCancelled->true
onError--->java.util.concurrent.CancellationException
複製代碼
這樣就取消了正在執行的異步任務,這部份內容更多的是關於 Java Future 相關的知識。
使用 defer 建立 Observable 時,只有在訂閱時去纔會建立 Observable 併發送相關的事件,下面是 defer 操做符的使用,參考以下:
//defer
defer = "old";
Observable<String> observable = Observable.defer(new Callable<ObservableSource<String>>() {
@Override
public ObservableSource<String> call() throws Exception {
return Observable.just(defer);
}
});
defer = "new";
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext--->"+s);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
複製代碼
上述代碼的執行結果以下:
onSubscribe--->
onNext--->new
onComplete--->
複製代碼
顯然,最終在訂閱以前 Observable 工廠又建立了最新的 Observable,onNext 中接收的數據也是最新的,爲了理解 defer 操做符,來看一下官方 defer 操做符的示意圖:
使用 empty 操做符能夠建立一個不發生任何數據但正常終止的 Observable,參考以下:
//empty
Observable.empty().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Object o) {
Log.i(TAG, "onNext--->"+o);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
複製代碼
上述代碼的輸出結果以下:
onSubscribe--->
onComplete--->
複製代碼
爲了方便理解 empty 操做符的使用,來看一些 empty 操做符的官方示意圖:
使用 never 操做符能夠建立一個不發生任何數據也不終止的 Observable,參考以下:
//never
Observable.never().subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Object o) {
Log.i(TAG, "onNext--->"+o);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
複製代碼
上述代碼的輸出結果以下:
onSubscribe--->
複製代碼
爲了方便理解 never 操做符的使用,來看一些 never 操做符的官方示意圖:
timer 操做符能夠建立一個帶延時的發送固定數值 0 的 Observable,還能夠指定線程調度器,timer 重載方法以下:
//延時
public static Observable<Long> timer(long delay, TimeUnit unit) //延時+線程調度器 public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) 複製代碼
下面是 timer 的使用方式:
//timer
Observable.timer(3, TimeUnit.SECONDS, Schedulers.io()).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Long s) {
Log.i(TAG, "onNext--->"+s);
Log.i(TAG, "當前線程--->"+Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
複製代碼
上述代碼的執行結果以下:
onSubscribe--->
//延時3秒收到數據
onNext--->0
當前線程--->RxCachedThreadScheduler-1
onComplete--->
複製代碼
爲了方便理解 timer 操做符的使用,來看一些 timer 操做符的官方示意圖,下面以 timer 指定延時器和線程調度器的方式爲例,具體以下:
使用 interval 操做符能夠建立一個能夠以固定時間間隔發送整數值的一個 Observable,interval 能夠指定初始延時時間、時間間隔、線程調度器等,interval 重載方法以下:
//初始延時+時間間隔
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) //初始延時+時間間隔+線程調度器 public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) //時間間隔 public static Observable<Long> interval(long period, TimeUnit unit) //時間間隔+線程調度器 public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler) 複製代碼
下面是 interval 的使用方式:
//interval
Observable.interval(3,TimeUnit.SECONDS).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Long aLong) {
Log.i(TAG, "onNext--->"+aLong);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
複製代碼
上述代碼執行後就會以每一個 3 秒持續發送值爲整數的事件,執行結果以下:
onSubscribe--->
onNext--->0
onNext--->1
onNext--->2
...
複製代碼
爲了方便理解 interval 操做符的使用,來看一些 interval 操做符的官方示意圖,下面以 interval 指定間隔時間和時間單位的方式爲例,具體以下:
使用 range 操做符能夠建立一個能夠發送指定整數範圍值的一個 Observable,range 相關的方法有兩個,只是數值的範圍表示不一樣,兩個方法聲明以下:
// int
public static Observable<Integer> range(final int start, final int count) // long public static Observable<Long> rangeLong(long start, long count) 複製代碼
下面是 range 的使用方式,具體參考以下:
//range
Observable.range(1,5).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe--->");
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext--->"+integer);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError--->"+e);
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete--->");
}
});
複製代碼
上述代碼的執行結果以下:
onSubscribe--->
onNext--->1
onNext--->2
onNext--->3
onNext--->4
onNext--->5
onComplete--->
複製代碼
爲了方便理解 range 操做符的使用,來看一些 range 操做符的官方示意圖:
這篇文章主要介紹了 RxJava2 相關基礎知識以及 RxJava2 中建立型操做符的理解和使用。能夠選擇關注我的微信公衆號:jzman-blog 獲取最新更新,一塊兒交流學習!