當咱們提到 RxJava 時會想到什麼,異步、事件流、響應式編程、觀察者模式、鏈式編程等等。對於 Android 開發者來講,在 kotlin 出現以前,RxJava 的編程方式的確給咱們帶來了很爽的編程體驗,可是在不斷的使用過程,感受 RxJava 其實也沒那麼爽,反而有時候會以爲不是那麼的方便,或者說它並無咱們想象的那麼「強大」。最近也看了幾篇很好的文章,也使我進一步加深麼對 RxJava 的理解,這裏也推薦給你們:
RxJava 沉思錄(一):你認爲 RxJava 真的好用嗎?
我爲何再也不推薦RxJava
More of RxJava
這裏我只是簡單談一談本身對 RxJava 的理解和本身的一些見解,歡迎你們批評指正!!java
看到不少文章都提到 RxJava 處理異步多麼強大,各類線程切換的操做符很好地解決了 Android 上的主線程和子線程的數據傳遞。RxJava 也解決了「回調地獄(Callback Hell)」,異步處理再也不須要回調一層套一層的搞,而是用鏈式調用的方式完成不一樣線程的回調。
首先咱們來看下「異步」爲何這麼讓人頭疼。不少像 Android 同樣的 UI 框架都限制改變 UI 的代碼必須運行在主線程,並且主線程不容許被阻塞,或者乾脆就是單線程模型,這樣也能夠很好的解決 UI 層的同步和性能問題。可是渲染 UI 的數據每每須要進行網絡、文件或數據庫等 IO 操做,或者須要在拿到數據後進行較長時間的後臺運算,這些操做只能放在子線程來作,主線程固然不會等着子線程完成這些,這樣就須要子線程完成操做後「異步」地通知主線程,而這個「異步通知」在目前來看只能經過回調的形式實現。
react
而若是業務比較複雜,涉及屢次異步操做,好比:(請求數據 A) -> (更新 UI) -> (請求數據 B) -> (更新 UI) -> (請求數據 C) -> (更新 UI),那處理起來就會比較複雜。git
假設咱們有如下操做:github
Thread {
val a = getDataA(1)
runOnUiThread {
val resultA = doSomethingOnMainA(a)
Thread {
val b = getDataB(resultA)
runOnUiThread {
val resultB = doSomethingOnMainB(b)
log("doCallback: Result = $resultB")
}
}.start()
}
}.start()
複製代碼
這種方式即不容易理解,也不容易維護,若是再新增一些其餘業務不免會形成「回調地獄」。固然,實際項目中估計也不會有人這麼用,Android 提供的 AsyncTask
也能必定程度上解決這種問題。
而 RxJava 用鏈式調用的方式把上述操做串起來,而後上一級的操做完成後會自動調用下一個操做,上面的代碼也能夠寫成這樣。數據庫
Observable.just(1)
.observeOn(Schedulers.io())
.map { getDataA(it) }
.observeOn(AndroidSchedulers.mainThread())
.map { doSomethingOnMainA(it) }
.observeOn(Schedulers.io())
.map { getDataB(it) }
.observeOn(AndroidSchedulers.mainThread())
.map { doSomethingOnMainB(it) }
.subscribe { log("doRxJava: Result = $it") }
複製代碼
RxJava 的鏈式寫法比單純的回調嵌套要更加清晰,更加容易理解,也更好維護和更改代碼。固然這種連接調用也不是 RxJava 獨有的,Promise
也可使用鏈式調用實現異步,我對 Promise
沒有深刻了解,這裏就很少闡述了。
其實這種鏈式調用寫異步的方式實現起來很簡單,Demo 和一個不到一百行的實現能夠參考下:RunList,用起來是這樣的。編程
RunList.runOnBackground({ integer -> getDataA(integer) }, 1)
.runOnUiThread { string -> doSomethingOnMainA(string) }
.runOnBackground { integer -> getDataB(integer) }
.runOnUiThread { string -> log("doRunList: Result = ${doSomethingOnMainB(string)}") }
.start()
複製代碼
但無論怎麼寫,這種顯式地用回調的方式都讓人感受很不舒服,讓咱們迴歸本質,這其實就是一個「串行」操做,依次調用不一樣的函數而已,只不過要考慮線程切換的問題,讓咱們用同步的代碼寫一下看看(暫不考慮線程切換問題)。設計模式
val a = getDataA(1)
val resultA = doSomethingOnMainA(a)
val b = getDataB(resultA)
val resultB = doSomethingOnMainB(b)
複製代碼
這樣固然是最簡單明瞭的書寫方式,而 kotlin 的協程就是一種用寫同步代碼的方式來寫異步的工具,上面的邏輯用協程實現以下。緩存
GlobalScope.launch(Dispatchers.Main) {
val a = withContext(Dispatchers.IO) { getDataA(1) }
val resultA = doSomethingOnMainA(a)
val b = withContext(Dispatchers.IO) { getDataB(resultA) }
val resultB = doSomethingOnMainB(b)
log("doCoroutine: Result = $resultB")
}
複製代碼
以前也說了,異步操做的回調是少不了的,可是這裏爲何沒有回調呢。其實協程是在編譯期,用狀態機的方式幫咱們作了回調。上面的代碼用狀態機寫大體以下(爲了方便理解,可能與實際反編譯的代碼不一樣)。安全
// SuspendLambda 整塊代碼都是回調,經過狀態機的方式,每次都執行 invoke
class SuspendLambda {
// 表示執行到哪段代碼
int label = 0
public void invoke(Object result) {
switch (label) {
case 0 :
label++
// 執行 getDataA
result = getDataA(result)
// 若是這段代碼須要掛起,就直接返回,等待掛起結束會繼續回調 invoke
// 若是不掛起,就繼續執行下面的,沒有 break
if (result == SUSPEND) { return }
case 1:
label++
// 這裏的參數就是以前上一段 getDataA 的返回值
result = doSomethingOnMainA(result)
if (result == SUSPEND) { return }
case 2 :
label++
result = getDataB(result)
if (result == SUSPEND) { return }
case 3:
label++
result = doSomethingOnMainB(result)
if (result == SUSPEND) { return }
}
}
}
複製代碼
協程經過狀態機的方式,將代碼分段,而後執行時須要掛起就返回,等待掛起結束再次執行 invoke
就行,對於不一樣線程切換的問題,只須要把這段代碼拋到不一樣的 Dispatchers
的線程上就行,能夠認爲整串代碼就是個大的 Runnable
,而後被分段成小的 Runnable
在不一樣的地方回調。可是書寫的時候咱們能夠按照串行執行的思想,不須要考慮回調的問題,清晰易懂。 kotlin 協程的解析能夠參考這篇文章 瞭解Kotlin協程實現原理這篇就夠了網絡
RxJava 處理異步的確比嵌套回調好用,可是異步並不能算是 RxJava 的強項,他能作的別的方式也能作,甚至作得更好,並且隨着鏈的邊長,各類操做符會生成不少臨時對象,這樣仍是挺消耗資源的。
RxJava 的強項不在異步,事件流處理纔是 RxJava 厲害的地方,它以響應式的編程方式完成對各類事件流的複雜處理。RxJava 涉及的事件流處理過程簡單來講能夠分爲:流的創建、訂閱、事件處理和出錯處理,下面以 Observable
爲例簡單分析下。
一條 RxJava 事件流(或者說鏈)的創建其實就是咱們在鏈式調用地使用各類操做符,不斷往代碼後面續的時候,這時候每一個操做符都會生成一個對應的 Observable
來承接上下游,簡圖以下。
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
複製代碼
上面是 map
操做符的代碼,他生成了一個新的 ObservableMap
,保存了上游的 Observable
在這裏是 this
,和一個 mapper
的轉換方法。因此咱們平時用的時候,若是把 RxJava 鏈直接放在某個函數體中的話,每次調用該函數都會從新生成這條鏈,會新生成一堆對象的實例,所以若是某條鏈頻繁使用的話,能夠緩存在成員變量中。
當咱們對某條 RxJava 鏈調用 subscribe
方法時,就會觸發整條鏈的訂閱
過程,從而從下游 subscribe
的地方開始,一級一級往上游傳遞訂閱
的事件,簡圖以下。
Observable
的 subscribe
方法時會作一些其餘處理,而後調用 subscribeActual
方法,仍是以 map
操做符爲例,來看下 ObservableMap
在訂閱過程當中都作了什麼。
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
複製代碼
它調用了上游 source
的 subscribe
訂閱本身的一個靜態內部類 MapObserver
,同時也會把下游的 Observer
保存到 MapObserver
中,而後 source
也會在本身的 subscribe
方法中調 subscribeActual
方法,從將訂閱
一級一級往上游傳遞。和「流的創建」不一樣的是,無論 RxJava 鏈是不是緩存的,每次調用 subscribe
都會從新生成一堆新的對象實例,這也是 RxJava 被人詬病的地方之一,因此儘可能不要在頻繁調用的函數中使用 RxJava。
事件處理能夠說是 RxJava 的精髓所在,說簡單點就是在上游 onNext
發送事件時,事件一級一級被處理,並往下游傳遞的過程。
public void onNext(T t) {
...
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
...
}
複製代碼
這裏簡單看下 MapObserver
的 onNext
,它先利用 mapper
將上游的 t
轉換成 v
,而後再調用下游 downstream
的 onNext(v)
將事件往下游傳遞。固然 map
屬於比較簡單的操做符,flatMap
切換事件流、zip
合併流、buffer
作緩存、sample
採樣、filter
過濾等等好用的操做符組合在一塊兒展示了 RxJava 強大的事件流操做能力。
RxJava 的各類回調中執行的代碼都是很安全的,io.reactivex.functions
包中的各類 Function
接口基本都顯示聲明會 throws Exception
,RxJava 每一個操做符都會增長 try...catch...
從而保證即便代碼執行拋出異常也不會崩潰,只會回調到下游的 onError
中,而 onError
異常在多數狀況下也會被 catch
住,這個取決於具體的操做符和 Observer
。
即便出錯了,RxJava 仍是會有「起死回生」的機會,它的 onErrorResumeNext
能夠在上游 onError
時切換到另一個流上, onErrorReturn
則會替換成一個新的事件繼續往下游傳遞 onNext
。 不過「起死回生」的做用範圍只能是上游,若是 onErrorResumeNext
下游出錯那就沒辦法挽救了。
須要注意的是,RxJava 基本每一個操做符出錯是會斷掉事件流的,仍是以 map
爲例來看下 MapObserver
,「事件處理」中能夠看到 onNext
出錯後會調用 fail
函數,而該函數中會調用 upstream.dispose()
把流斷掉。這對於一次性的網絡請求等操做徹底能夠理解,反正上游也就發一次事件,發完就結束了,出錯了斷掉事件流也無所謂。可是對於相似 PublishSubject
等事件源,他們多是一致存在着的,若是由於某一次出錯就把整條流斷掉會致使沒法再收到後續的事件,這點須要特別注意!
protected final void fail(Throwable t) {
Exceptions.throwIfFatal(t);
upstream.dispose();
onError(t);
}
複製代碼
在 RxJava 的世界裏,一切其餘事件均可以轉化成 Observable
或者更準確的說是可觀察者(若是考慮上 Single
、Maybe
、Flowable
等)。
RxJava 用 just
、merge
、range
、timer
、zip
等等靜態方法和 Observable
的一些子類實現了外部世界到 RxJava 世界的適配工做,還一個典型的例子就是 Retrofit
原生返回的 Call
到 Observable
的適配,具體代碼在 retrofit2.adapter.rxjava2.RxJava2CallAdapter
中,核心函數 adapt()
以下。
public Object adapt(Call<R> call) {
// 這裏是作適配的代碼,根據同步仍是異步把 Call 適配成 Observable
Observable<Response<R>> responseObservable = isAsync
? new CallEnqueueObservable<>(call)
: new CallExecuteObservable<>(call);
Observable<?> observable;
// 這裏根據 isResult 和 isBody 作了另外一層包裝
if (isResult) {
observable = new ResultObservable<>(responseObservable);
} else if (isBody) {
observable = new BodyObservable<>(responseObservable);
} else {
observable = responseObservable;
}
// 設置調度器
if (scheduler != null) {
observable = observable.subscribeOn(scheduler);
}
// Flowable、Single、Maybe 和 Completable 的包裝/適配
if (isFlowable) {
return observable.toFlowable(BackpressureStrategy.LATEST);
}
if (isSingle) {
return observable.singleOrError();
}
if (isMaybe) {
return observable.singleElement();
}
if (isCompletable) {
return observable.ignoreElements();
}
return observable;
}
複製代碼
上游不斷地生產事件,而後下游一級一級地消費上游的事件。這在 Observable
上體現的不是很明顯,可能用帶流控的 Flowable
能更加說明這一點,上游不斷地生產數據,往緩存池裏放,下游則從緩存池中消費。
RxJava 主要是用裝飾者模式實現了整條鏈的創建和事件傳遞。
舉個簡單的例子,map
操做符在創建鏈的時候返回的是 ObservableMap
,和上游的源同樣是 Observable
的一個子類。它在被 subscribe
的時候會去調用 subscribeActual
方法,在這裏調用 source.subscribe(new MapObserver<T, U>(t, function))
,調用到源的 subscribe
方法,從而實現訂閱從下游往上游傳遞,同時也實現了本身的操做邏輯,把上游訂閱給本身的一個靜態內部類 MapObserver
。MapObserver
是對下游觀察者進行了一次裝飾,和其餘觀察者同樣都實現了 Observer
,在收到上游的事件傳遞 onNext(T t)
時,先調用 Function<? super T, ? extends U> mapper
實現本身的邏輯,再分發給下游 downstream.onNext(v)
。
用 RxJava 的基本都知道觀察者模式,但從上面的分析也能夠看出,無論是異步操做,仍是事件流的創建和處理,和觀察者模式的關係並非很密切,這可能也是咱們對 RxJava 的誤解之一。
「觀察者設計模式定義了對象間的一種一對多的組合關係,以便一個對象的狀態發生變化時,全部依賴於它的對象都獲得通知並自動刷新」。
咱們平時用的 Observable
和觀察者模式關係不大,簡單的看代碼就能發現,Observable
只存了一個上下游,也就是說往下游發事件只能有一個下游的接收者,這哪兒是什麼觀察者模式,只是套用了這一名字而已。並且咱們用的基本都是些 Cold Observable
,簡單點說就是「只有當有訂閱者訂閱的時候,Cold Observable 纔開始執行發射數據流的代碼,而且每一個訂閱者訂閱的時候都獨立的執行一遍數據流代碼」,因此這就更說明不是觀察者模式了,而 Hot Observable
則是「無論有沒有訂閱者訂閱,他們建立後就開發發射數據流。一個比較好的示例就是 鼠標事件。無論系統有沒有訂閱者監聽鼠標事件,鼠標事件一直在發生,當有訂閱者訂閱後,從訂閱後的事件開始發送給這個訂閱者,以前的事件這個訂閱者是接受不到的;若是訂閱者取消訂閱了,鼠標事件依然繼續發射。」(Cold/Hot Observable 的區別這裏再也不過多贅述)。
而在 RxJava 裏真正稱得上是觀察者模式的就是 Subject
,大概看下簡介「Represents an {@link Observer} and an {@link Observable} at the same time, allowing multicasting events from a single source to multiple child {@code Observer}s.」
咱們平時用的多的 PublishSubject
實現了 Subject
,內部存儲了 final AtomicReference<PublishDisposable<T>[]> subscribers
來保存全部的觀察者,而後手動 onNext
時遍歷 subscribers
的 onNext
將事件發送出去。
簡單總結一下,RxJava 的事件流處理能力很強大,但 RxJava 並不僅是爲了作異步的(固然 RxJava 作異步也不弱),只不過在 Android 領域,咱們多數狀況下只是把 RxJava 看成異步請求數據的工具,並無發揮它的真實實力,像按鈕去抖動,監聽 EditText 延時請求數據等纔是 RxJava 的正確使用姿式。目前來看,RxJava 「濫用」的狀況仍是很廣泛的,不少地方都是簡單作一下異步而已,隨着 Kotlin 的崛起和協程的發展,這種狀況必定會慢慢改善。