歡迎關注微信公衆號「隨手記技術團隊」,查看更多隨手記團隊的技術文章。
本文做者:周浩源
原文連接:mp.weixin.qq.com/s/OJCEyH1gJ…html
大概從2015年開始,RxJava1.0開始快速流行起來,短短兩年時間,RxJava在Android開發中已經算是無人不知無人不曉了,加之它與Retrofit等流行框架的完美結合,已經成爲Android項目開發的必備利器。隨手記做爲一個大型項目,引入三方框架一直比較慎重,但也從今年初開始,正式引入了RxJava2.0,並配合Retrofit對項目的網絡框架和繁瑣的異步邏輯進行重構。RxJava雖然好用,但伴隨而來的是不可避免的學習成本,爲了讓你們快速的瞭解RxJava的前因後果以及快速上手使用,特意總結該篇文章。本文將詳細講解如何快速理解RxJava的操做符,並從源碼角度來分析RxJava操做符的原理。java
簡單來說RxJava是一個簡化異步調用的庫,但其實它更是一種優雅的編程方式和編程思想,當你熟悉RxJava的使用方式以後,會很容易愛上它。
我總結它的優勢主要有兩個方面:react
關於第一點你們應該都會認同,關於第二點可能有人會有疑惑,由於不少人以爲RxJava大量不明因此的操做符會讓代碼的可讀性變得更差,其實產生這種印象偏偏就是由於沒有掌握RxJava操做符的使用和原理所致使的。
好比隨手記項目中綁定用戶QQ帳號的業務邏輯,這段邏輯的代碼涉及三個異步接口,兩個是QQ登陸SDK的,一個是隨手記後臺的,在使用RxJava重構前,這段代碼使用了3個AsyncTask,也就是三個嵌套的回調,代碼複雜,可讀性很是差。而改造以後,它變成了下面這樣子git
若是你對這裏面的幾個RxJava操做符比較熟悉的話,你會迅速瞭解我這段代碼作了什麼事情,並且不用再去梳理一堆嵌套回調了,這就是RxJava帶來的可讀性。
因此,學習RxJava,理解和掌握操做符是不可避免的第一步。github
從RxJava1.0到RxJava2.0,基本思想沒有變化,但RxJava2.0按照Reactive-Streams規範對整個架構進行了從新設計,並變動了Maven倉庫依賴地址和包名。因此如今RxJava的github網站中,RxJava1.0和RxJava2.0是兩個獨立的分支,不相互兼容 ,也不能同時使用,並且RxJava1.0再過一段時間也將再也不維護。因此,目前還使用RxJava1.0的,建議儘早切換到RxJava2.0,而若是沒有接觸過RxJava1.0,直接使用和學習RxJava2.0就能夠了。若是想了解RxJava1.0和RxJava2.0的詳細區別,請參考官方文檔。
爲行文方便,今後處開始,本文使用Rx來表示RxJava2.x。編程
剛接觸Rx的人面對一堆各式各樣的操做符會以爲不知如何去學習記憶,其實你只須要從總體上了解Rx操做符的類別和掌握一些使用頻率較高的操做符就足夠了,至於其餘的操做符,你只須要知道它的使用場景和掌握如何快速理解一個操做符的方法,就能夠在須要的時候快速拿來用了。
下圖是我根據官方文檔總結的Rx操做符的分類及每一個類別下的表明性操做符
數組
提到Rx操做符,相信不少人都會對描述Rx操做符的花花綠綠的寶石圖有很大印象。
bash
io.reactivex.Flowable
: 事件源(0..N個元素), 支持 Reactive-Streams and 背壓
io.reactivex.Observable
:事件源(0..N個元素), 不支持背壓io.reactivex.Single
: 僅發射一個元素或產生error的事件源,io.reactivex.Completable
: 不發射任何元素,只產生completion或error的事件源io.reactivex.Maybe
: 不發射任何元素,或只發射一個元素,或產生error的事件源Subject
: 既是事件源,也是事件接受者事件源
了,基本上全部的操做符都是針對事件源來進行一些轉換、組合等操做,而咱們最經常使用的事件源就是Observable
了。本文中咱們就以Observable
事件源爲例來說解Rx的操做符,Observable
發射的事件咱們統一稱之爲item。首先咱們須要詳細瞭解一下寶石圖中各個圖像元素的含義:微信
—>
:Observable
的時間線,從左至右流動★
:星星、圓、方塊等表示Observable
發射的item|
:時間線最後的小豎線表示Observable
的事件流已經成功發射完畢了X
:時間線最後的X符合表示因爲某種緣由Observable
非正常終止發射,產生了error上面幾種元素組合在一塊兒表明一個完整的Observable
,也能夠稱爲源Observable
網絡
-->
方向朝下的虛線箭頭表示以及中間的長方框表示正在對上面的源Observable
進行某種轉換。長方框裏的文字展現了轉換的性質。下面的Observable
是對上面的源Observable
轉換後的結果。
掌握了寶石圖的含義,咱們就能夠根據某個操做符的寶石圖快速理解這個操做符了。舉幾個例子:
1. map
Observable
前後發射了一、二、3的三個item,而通過
map
操做符一轉換,就變成了一個發射了十、20、30三個item的新的
Observable
。描述操做符的長方框中也清楚的說明了該
map
操做符進行了何種具體的轉換操做(圖中的10*x只是一個例子,這個具體的轉換函數是能夠自定義的)。
map
操做符的含義和用法,簡單來說,它就是經過一個函數將一個
Observable
發射的item逐個進行某種轉換。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
return integer * 10;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer result) throws Exception {
Log.i(TAG, "accept : " + result +"\n" );
}
});複製代碼
輸出結果:
2. zip
zip
的寶石圖,能夠知道zip操做符的做用是把多個源
Observable
發射的item經過特定函數組合在一塊兒,而後發射組合後的item。從圖中還能夠看到一個重要的信息是,最終發射的item是對上面的兩個源
Observable
發射的item按照發射順序逐個組合的結果,並且最終發射的
1A
等item的發射時間是由組合它的
1
和
A
等item中發射時間較晚的那個item決定的,也正是如此,
zip
操做符常常能夠用在須要同時組合處理多個網絡請求的結果的業務場景中。
Observable.zip(Observable.just(1, 2, 3),
Observable.just("A", "B", "C"),
new BiFunction<Integer, String, String>() {
@Override
public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
return integer + s;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
Log.i(TAG, "zip : accept : " + s + "\n");
}
});複製代碼
輸出結果:
3. concat
concat
操做符的做用就是將兩個源
Observable
發射的item鏈接在一塊兒發射出來。這裏的鏈接指的是總體鏈接,被
concat
操做後產生的
Observable
會先發射第一個源
Observable
的全部item,而後緊接着再發射第二個源
Observable
的全部的item。
Observable.concat(Observable.just(1, 2, 3), Observable.just(4, 5, 6))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.i(TAG, "concat : " + integer + "\n");
}
});複製代碼
輸出結果:
大部分操做符都配有這樣的寶石圖,經過官方文檔或者直接在Rx源碼中查看JavaDoc就能夠找到,再也不過多舉例。你也能夠在rxmarbles這樣的網站上查看更多能夠動態交互的寶石圖。
要了解操做符的原理,確定要從源碼入手嘍。因此咱們先來簡單擼一遍Rx的最基本的Create操做符的源碼。
Rx的源碼目錄結構是比較清晰的,咱們先從Observable.create
方法來分析
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("s");
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
// 建立的Observer中多了一個回調方法onSubscribe,傳遞參數爲Disposable ,Disposable至關於RxJava1.x中的Subscription,用於解除訂閱。
}
@Override
public void onNext(@NonNull String s) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});複製代碼
create
方法以下
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}複製代碼
代碼很簡單,第一行判空不用管,第二行調用RxJavaPlugins
的方法是爲了實現Rx的hook功能,咱們暫時也無需關注,在通常狀況下,第二行代碼會直接返回它的入參即ObservableCreate
對象,ObservableCreate
是Observable
的子類,實現了Observable
的一些抽象方法好比subscribeActual
。事實上Rx的每一個操做符都對應Observable
的一個子類。
這裏create
方法接受的是一個ObservableOnSubscribe
的接口實現類:
/**
* A functional interface that has a {@code subscribe()} method that receives
* an instance of an {@link ObservableEmitter} instance that allows pushing
* events in a cancellation-safe manner.
*
* @param <T> the value type pushed
*/
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param e the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}複製代碼
經過註釋能夠知道這個接口的做用是經過一個subscribe
方法接受一個ObservableEmitter
類型的實例,俗稱發射器。Observable.create
方法執行時,咱們傳入的就是一個ObservableOnSubscribe
類型的匿名內部類,並實現了它的subscribe
方法,而後它又被傳入create
方法的返回對象ObservableCreate
,最終成爲ObservableCreate
的成員source
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
...複製代碼
接着咱們來看Observable
的subscribe
方法,它的入參是一個Observer
(即觀察者,也就是事件接收者)
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}複製代碼
最終它會調用它的子類ObservableCreate
的subscribeActual
方法:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}複製代碼
在subscribeActual
裏首先建立了用於發射事件的CreateEmitter
對象parent
,CreateEmitter
實現了接口Emitter
和Disposable
,並持有observer
。
這段代碼的關鍵語句是source.subscribe(parent)
,這行代碼執行後,就會觸發事件源進行發射事件,即e.onNext("s")
會被調用。細心的同窗也會注意到這行代碼以前,parent
先被傳入了observer
的onSubscribe()
方法,而在上面咱們說過,observer
的onSubscribe()
方法接受一個Disposable
類型的參數,能夠用於解除訂閱,之因此可以解除訂閱,正是由於在觸發事件發射以前調用了observer
的onSubscribe()
,給了咱們調用CreateEmitter
的解除訂閱的方法dispose()
的機會。
繼續來看CreateEmitter
的onNext()
方法,它最終是經過調用observer
的onNext()
方法將事件發射出去的
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
// 在真正發射以前,會先判斷該CreateEmitter是否已經解除訂閱
if (!isDisposed()) {
observer.onNext(t);
}
}
...
}複製代碼
至此,Rx事件源的建立和訂閱的流程就走通了。
下面咱們從map
操做符來入手看一下Rx操做符的原理,map
方法以下
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}複製代碼
map
方法接受一個Function類型的參數mapper
,返回了一個ObservableMap
對象,它也是繼承自Observable
,而mapper
被傳給了ObservableMap
的成員function
,同時當前的源Observable
被傳給ObservableMap
的成員source
,進入ObservableMap
類
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}複製代碼
能夠看到這裏用到了裝飾者模式,ObservableMap
持有來自它上游的事件源source
,MapObserver
持有來自它下游的事件接收者和咱們實現的轉換方法function
,在subscribeActual()
方法中完成ObservableMap
對source
的訂閱,觸發MapObserver
的onNext()
方法,繼而未來自source
的原始數據通過函數mapper
轉換後再發射給下游的事件接收者,從而實現map這一功能。
如今咱們終於可以來總結一下包含多個操做符時的訂閱流程了,如下面這段代碼爲例
Observable.
create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("holen");
}
})
.map(new Function<String, Integer>() {
@Override
public Integer apply(@NonNull String s) throws Exception {
return s.length();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});複製代碼
執行代碼時,自上而下每一步操做符都會建立一個新的Observable
(均爲Observable
的子類,對應不一樣的操做符),當執行create
時,建立並返回了ObservableCreate
,當執行map
時,建立並返回了ObservableMap
,而且每個新的Observable
都持有它上游的源Observable
(即source
)及當前涉及到的操做函數function
。當最後一步執行訂閱方法subscribe
時會觸發ObservableMap
的subscribeActual()
方法,並將最下游的Observer
包裝成MapObserver
,同時該方法又會繼續調用它所持有ObservableCreate
的訂閱方法(即執行source.subscribe
),由此也會觸發ObservableCreate
的subscribeActual()
方法,此時咱們的發射器CreateEmitter
纔會調用它的onNext()
方法發射事件,再依次調用MapObserver
的操做函數mapper
和onNext()
方法,最終將事件傳遞給了最下游的Observer
的onNext()
方法。
我簡單的將這段邏輯用下面這幅圖來表示
lift
和compose
lift
和compose
在Rx中是兩個比較特殊的操做符。lift
讓咱們能夠對Observer
進行封裝,在RxJava1.0中大部分變換都基於lift
這個神奇的操做符。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> lift(ObservableOperator<? extends R, ? super T> lifter) {
ObjectHelper.requireNonNull(lifter, "onLift is null");
return RxJavaPlugins.onAssembly(new ObservableLift<R, T>(this, lifter));
}複製代碼
lift
操做符接受一個ObservableOperator
對象
/**
* Interface to map/wrap a downstream observer to an upstream observer.
*
* @param <Downstream> the value type of the downstream
* @param <Upstream> the value type of the upstream
*/
public interface ObservableOperator<Downstream, Upstream> {
/**
* Applies a function to the child Observer and returns a new parent Observer.
* @param observer the child Observer instance
* @return the parent Observer instance
* @throws Exception on failure
*/
@NonNull
Observer<? super Upstream> apply(@NonNull Observer<? super Downstream> observer) throws Exception;
}複製代碼
看註釋能夠知道,這是一個將下游訂閱者包裝成一個上游訂閱者的接口。相似Map操做符中的MapObserver。
而compose
操做符讓咱們能夠對Observable
進行封裝
@SuppressWarnings("unchecked")
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
}複製代碼
wrap
方法以下,僅僅是走了RxJavaPlugins
的流程
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> wrap(ObservableSource<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
if (source instanceof Observable) {
return RxJavaPlugins.onAssembly((Observable<T>)source);
}
return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source));
}複製代碼
compose
方法接受一個ObservableTransformer
對象
/**
* Interface to compose Observables.
*
* @param <Upstream> the upstream value type
* @param <Downstream> the downstream value type
*/
public interface ObservableTransformer<Upstream, Downstream> {
/**
* Applies a function to the upstream Observable and returns an ObservableSource with
* optionally different element type.
* @param upstream the upstream Observable instance
* @return the transformed ObservableSource instance
*/
@NonNull
ObservableSource<Downstream> apply(@NonNull Observable<Upstream> upstream);
}複製代碼
ObservableSource
即爲咱們的基類Observable
繼承的惟一接口。看註釋能夠知道,ObservableTransformer
是一個組合多個Observable
的接口,它經過一個apply()
方法接收上游的Observable
,進行一些操做後,返回新的Observable
。
這裏組合多個Observable
的意思其實就是組合多個操做符,好比咱們常常會須要在使用Rx進行網絡異步請求時進行線程變化,這個操做通常都是差很少的,每次都寫會比較煩,這時咱們就可使用compose
把經常使用的線程變換的幾個操做符組合起來
private final ObservableTransformer schedulersObservable = new ObservableTransformer() {
@Override
public ObservableSource apply(Observable upstream) {
return upstream.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
protected void testCompose() {
getNetObservable()
.compose(schedulersObservable)
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
mRxOperatorsText.append(s);
}
});
}複製代碼
關於compose
的典型應用,你們有興趣還能夠去看一下開源項目RxLifecycle,它就是巧妙地利用compose
操做符來解決了使用Rx可能會出現的內存泄露問題。
說了這麼多,其實咱們最關心的仍是Rx操做符的應用場景。其實只要存在異步的地方,均可以優雅地使用Rx操做符。好比不少流行的Rx周邊開源項目
而針對本身想要實現的功能情景,如何去選擇特定的操做符,官網的文檔中也列出了一些指導——Rx操做符決策樹。
固然除了這些,咱們在開發項目時,還會有各類具體的業務場景須要選擇合適的操做符,這裏我總結了一些常常遇到的場景以及適合它們的操做符
只要咱們理解了Rx操做符的原理,熟練掌握了一些使用頻率較高的操做符,就可以在以上場景中輕鬆地使用,再也不讓本身的代碼被複雜的業務邏輯搞得混亂。
以上就是本文的所有內容,關於Rx還有不少東西值得深刻地學習研究,後續有機會再跟你們分享更多Rx的使用心得。