一般狀況下,若是咱們想要使用 RxJava 首先會想到的是使用Observable,若是要考慮到Backpressure的狀況,在 RxJava2.x 時代咱們會使用Flowable。除了Observable和Flowable以外,在 RxJava2.x 中還有三種類型的Observables:Single、Completable、Maybe。java
類型 | 描述 |
---|---|
Observable
|
可以發射0或n個數據,並以成功或錯誤事件終止。 |
Flowable
|
可以發射0或n個數據,並以成功或錯誤事件終止。 支持Backpressure,能夠控制數據源發射的速度。 |
Single
|
只發射單個數據或錯誤事件。 |
Completable | 它歷來不發射數據,只處理 onComplete 和 onError 事件。能夠當作是Rx的Runnable。 |
Maybe
|
可以發射0或者1個數據,要麼成功,要麼失敗。有點相似於Optional |
從上面的表格能夠看出,這五種被觀察者類型中只有Flowable能支持Backpressure,若是有須要Backpressure的狀況,仍是必需要使用Flowable。react
從SingleEmitter的源碼能夠看出,Single 只有 onSuccess 和 onError 事件。express
/** * Copyright (c) 2016-present, RxJava Contributors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */
package io.reactivex;
import io.reactivex.annotations.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Cancellable;
/** * Abstraction over an RxJava {@link SingleObserver} that allows associating * a resource with it. * <p> * All methods are safe to call from multiple threads. * <p> * Calling onSuccess or onError multiple times has no effect. * * @param <T> the value type to emit */
public interface SingleEmitter<T> {
/** * Signal a success value. * @param t the value, not null */
void onSuccess(@NonNull T t);
/** * Signal an exception. * @param t the exception, not null */
void onError(@NonNull Throwable t);
/** * Sets a Disposable on this emitter; any previous Disposable * or Cancellation will be unsubscribed/cancelled. * @param s the disposable, null is allowed */
void setDisposable(@Nullable Disposable s);
/** * Sets a Cancellable on this emitter; any previous Disposable * or Cancellation will be unsubscribed/cancelled. * @param c the cancellable resource, null is allowed */
void setCancellable(@Nullable Cancellable c);
/** * Returns true if the downstream cancelled the sequence. * @return true if the downstream cancelled the sequence */
boolean isDisposed();
}複製代碼
其中,onSuccess()用於發射數據(在Observable/Flowable中使用onNext()來發射數據)。並且只能發射一個數據,後面即便再發射數據也不會作任何處理。apache
Single的SingleObserver中只有onSuccess、onError,並無onComplete。這是 Single 跟其餘四種被觀察者最大的區別。api
Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(@NonNull SingleEmitter<String> e) throws Exception {
e.onSuccess("test");
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println(s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
throwable.printStackTrace();
}
});複製代碼
上面的代碼,因爲Observer中有兩個Consumer,還能夠進一步簡化成網絡
Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(@NonNull SingleEmitter<String> e) throws Exception {
e.onSuccess("test");
}
}).subscribe(new BiConsumer<String, Throwable>() {
@Override
public void accept(String s, Throwable throwable) throws Exception {
System.out.println(s);
}
});複製代碼
Single 能夠經過toXXX方法轉換成Observable、Flowable、Completable以及Maybe。架構
Completable在建立後,不會發射任何數據。從CompletableEmitter的源碼能夠看到app
/** * Copyright (c) 2016-present, RxJava Contributors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in * compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the License is * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See * the License for the specific language governing permissions and limitations under the License. */
package io.reactivex;
import io.reactivex.annotations.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Cancellable;
/** * Abstraction over an RxJava {@link CompletableObserver} that allows associating * a resource with it. * <p> * All methods are safe to call from multiple threads. * <p> * Calling onComplete or onError multiple times has no effect. */
public interface CompletableEmitter {
/** * Signal the completion. */
void onComplete();
/** * Signal an exception. * @param t the exception, not null */
void onError(@NonNull Throwable t);
/** * Sets a Disposable on this emitter; any previous Disposable * or Cancellation will be disposed/cancelled. * @param d the disposable, null is allowed */
void setDisposable(@Nullable Disposable d);
/** * Sets a Cancellable on this emitter; any previous Disposable * or Cancellation will be disposed/cancelled. * @param c the cancellable resource, null is allowed */
void setCancellable(@Nullable Cancellable c);
/** * Returns true if the downstream disposed the sequence. * @return true if the downstream disposed the sequence */
boolean isDisposed();
}複製代碼
Completable 只有 onComplete 和 onError 事件,同時 Completable 並無map、flatMap等操做符,它的操做符比起 Observable/Flowable 要少得多。less
咱們能夠經過fromXXX操做符來建立一個Completable。這是一個Completable版本的Hello World。ide
Completable.fromAction(new Action() {
@Override
public void run() throws Exception {
System.out.println("Hello World");
}
}).subscribe();複製代碼
Completable 常常會結合andThen操做符
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(@NonNull CompletableEmitter emitter) throws Exception {
try {
TimeUnit.SECONDS.sleep(1);
emitter.onComplete();
} catch (InterruptedException e) {
emitter.onError(e);
}
}
}).andThen(Observable.range(1, 10))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
System.out.println(integer);
}
});複製代碼
在這裏emitter.onComplete()執行完以後,代表Completable已經徹底執行完畢,接下來是執行andThen裏的操做。
打印結果以下:
1
2
3
4
5
6
7
8
9
10複製代碼
在Completable中,andThen有多個重載的方法,正好對應了五種被觀察者的類型。
Completable andThen(CompletableSource next) <T> Maybe<T> andThen(MaybeSource<T> next) <T> Observable<T> andThen(ObservableSource<T> next) <T> Flowable<T> andThen(Publisher<T> next) <T> Single<T> andThen(SingleSource<T> next)複製代碼
Completable 也能夠經過toXXX方法轉換成Observable、Flowable、Single以及Maybe。
在網絡操做中,若是遇到更新的狀況,也就是Restful架構中的PUT操做,通常要麼返回原先的對象要麼只提示更新成功。下面兩個接口使用了Retrofit,分別是用於獲取短信驗證碼和更新用戶信息,其中更新用戶信息若是用PUT會更符合Restful的API。
/** * 獲取短信驗證碼 * @param param * @return */
@POST("v1/user-auth")
Completable getVerificationCode(@Body VerificationCodeParam param);
/** * 用戶信息更新接口 * @param param * @return */
@POST("v1/user-update")
Completable update(@Body UpdateParam param);複製代碼
在model類中大體會這樣寫。
/** * Created by Tony Shen on 2017/7/24. */
public class VerificationCodeModel extends HttpResponse {
/** * 獲取驗證碼 * @param activity * @param param * @return */
public Completable getVerificationCode(AppCompatActivity activity, VerificationCodeParam param) {
return apiService
.getVerificationCode(param)
.compose(RxJavaUtils.<VerificationCodeModel>completableToMain())
.compose(RxLifecycle.bind(activity).<VerificationCodeModel>toLifecycleTransformer());
}
}複製代碼
特別要注意的是getVerificationCode返回的是Completable而不是Completable
獲取驗證碼成功則給出相應地toast提示,失敗能夠作出相應地處理。
VerificationCodeModel model = new VerificationCodeModel();
model.getVerificationCode(RegisterActivity.this,param)
.subscribe(new Action() {
@Override
public void run() throws Exception {
showShort(RegisterActivity.this,"發送驗證碼成功");
}
},new RxException<Throwable>(){
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
throwable.printStackTrace();
......
}
});複製代碼
Maybe 是 RxJava2.x 以後纔有的新類型,能夠當作是Single和Completable的結合。
Maybe建立以後,MaybeEmitter 和 SingleEmitter 同樣並無onNext()方法,一樣須要經過onSuccess()方法來發射數據。
Maybe.create(new MaybeOnSubscribe<String>() {
@Override
public void subscribe(@NonNull MaybeEmitter<String> e) throws Exception {
e.onSuccess("testA");
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("s="+s);
}
});複製代碼
打印出來的結果是
s=testA複製代碼
Maybe也只能發射0或者1個數據,即便發射多個數據,後面發射的數據也不會處理。
Maybe.create(new MaybeOnSubscribe<String>() {
@Override
public void subscribe(@NonNull MaybeEmitter<String> e) throws Exception {
e.onSuccess("testA");
e.onSuccess("testB");
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("s="+s);
}
});複製代碼
打印出來的結果仍然是
s=testA複製代碼
跟第一次執行的結果是一致的。
若是MaybeEmitter先調用了onComplete(),即便後面再調用了onSuccess()也不會發射任何數據。
Maybe.create(new MaybeOnSubscribe<String>() {
@Override
public void subscribe(@NonNull MaybeEmitter<String> e) throws Exception {
e.onComplete();
e.onSuccess("testA");
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("s="+s);
}
});複製代碼
此次就沒有打印任何數據了。
咱們對上面的代碼再作一下修改,在subscribe()中也加入onComplete(),看看打印出來的結果會是這樣的?由於SingleObserver中是沒有onComplete()方法。
Maybe.create(new MaybeOnSubscribe<String>() {
@Override
public void subscribe(@NonNull MaybeEmitter<String> e) throws Exception {
e.onComplete();
e.onSuccess("testA");
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("s=" + s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("Maybe onComplete");
}
});複製代碼
此次打印的結果是
Maybe onComplete複製代碼
經過查看Maybe相關的源碼
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onSuccess, Consumer<? super Throwable> onError, Action onComplete) {
ObjectHelper.requireNonNull(onSuccess, "onSuccess is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
return subscribeWith(new MaybeCallbackObserver<T>(onSuccess, onError, onComplete));
}複製代碼
咱們能夠獲得,Maybe在沒有數據發射時候subscribe會調用MaybeObserver的onComplete()。若是Maybe有數據發射或者調用了onError(),是不會再執行MaybeObserver的onComplete()。
咱們也能夠將 Maybe 轉換成Observable、Flowable、Single,只需相應地調用toObservable()、toFlowable()、toSingle()。
接下來咱們再來看看 Maybe 跟 Retrofit 是怎樣結合使用的?
下面的網絡請求,最初返回的類型是Flowable,可是這個網絡請求並非一個連續事件流,咱們只會發起一次 Post 請求返回數據而且只收到一個事件。所以,能夠考慮將 onComplete() 能夠跟 onNext() 合併。在這裏,嘗試咱們將Flowable改爲Maybe。
@POST("v1/contents")
Maybe<ContentModel> loadContent(@Body ContentParam param);複製代碼
在model類中,咱們大體會這樣寫。
public class ContentModel extends HttpResponse {
public List<ContentItem> data;
/** * 獲取內容 * @param fragment * @param param * @param cacheKey * @return */
public Maybe<ContentModel> getContent(Fragment fragment,ContentParam param,String cacheKey) {
return apiService.loadContent(param)
.compose(RxLifecycle.bind(fragment).<ContentModel>toLifecycleTransformer())
.compose(RxJavaUtils.<ContentModel>maybeToMain())
.compose(RxUtils.<ContentModel>toCacheTransformer(cacheKey,App.getInstance().cache));
}
......
}複製代碼
其中,maybeToMain()方法是用Kotlin編寫的工具方法,這些工具方法由Kotlin來編寫會顯得比較簡單和清晰,特別是lambda表達式更加直觀。
@JvmStatic
fun <T> maybeToMain(): MaybeTransformer<T, T> {
return MaybeTransformer{
upstream ->
upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
}
}複製代碼
最後是真正地使用model類,若是網絡請求成功則將數據展現到recyclerview上,若是失敗也會作出相應地處理。
model.getContent(this,param,cacheKey)
.subscribe(new Consumer<ContentModel>() {
@Override
public void accept(@io.reactivex.annotations.NonNull ContentModel model) throws Exception {
adapter = new NewsAdapter(mContext, model);
recyclerview.setAdapter(adapter);
spinKitView.setVisibility(View.GONE);
}
}, new RxException<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
throwable.printStackTrace();
spinKitView.setVisibility(View.GONE);
......
}
});複製代碼
RxJava 有五種不一樣類型的被觀察者,合理地使用它們可以寫出更簡潔優雅的代碼。這些被觀察者在必定程度上也可以做一些相互轉換。值得注意的是,只有Flowable是支持Backpressure的,其他四種都不支持。