瞭解 RxJava 的應該都知道是一個基於事務驅動的庫,響應式編程的典範。提到事務驅動和響應就不得不說說,設計模式中觀察者模式,已經瞭解的朋友,能夠直接跳過觀察者模式的介紹,直接到 RxJava 源碼中對於觀察者的應用。html
該部分結合自扔物線的 《給 Android 開發者的 RxJava 詳解》, 強烈推薦剛接觸 RxJava 的朋友閱讀。java
觀察者模式面向的需求是:A 對象(觀察者)對 B 對象(被觀察者)的某種變化高度敏感,須要在 B 變化的一瞬間作出反應。舉個例子,新聞裏喜聞樂見的警察抓小偷,警察須要在小偷伸手做案的時候實施抓捕。在這個例子裏,警察是觀察者,小偷是被觀察者,警察須要時刻盯着小偷的一舉一動,才能保證不會漏過任何瞬間。程序的觀察者模式和這種真正的『觀察』略有不一樣,觀察者不須要時刻盯着被觀察者(例如 A 不須要每過 2ms 就檢查一次 B 的狀態),而是採用註冊( Register )或者稱爲訂閱( Subscribe )的方式,告訴被觀察者:我須要你的某某狀態,你要在它變化的時候通知我。 Android 開發中一個比較典型的例子是點擊監聽器 OnClickListener
。對設置 OnClickListener
來講, View
是被觀察者, OnClickListener
是觀察者,兩者經過 setOnClickListener()
方法達成訂閱關係。訂閱以後用戶點擊按鈕的瞬間,Android Framework 就會將點擊事件發送給已經註冊的 OnClickListener 。採起這樣被動的觀察方式,既省去了反覆檢索狀態的資源消耗,也可以獲得最高的反饋速度。固然,這也得益於咱們能夠隨意定製本身程序中的觀察者和被觀察者,而警察叔叔明顯沒法要求小偷『你在做案的時候務必通知我』。編程
OnClickListener 的模式大體以下圖:設計模式
如圖所示,經過 setOnClickListener()
方法,Button
持有 OnClickListener
的引用(這一過程沒有在圖上畫出);當用戶點擊時,Button
自動調用 OnClickListener
的 onClick()
方法。另外,若是把這張圖中的概念抽象出來(Button
-> 被觀察者、OnClickListener
-> 觀察者、setOnClickListener()
-> 訂閱,onClick()
-> 事件),就由專用的觀察者模式(例如只用於監聽控件點擊)轉變成了通用的觀察者模式。以下圖:app
而 RxJava 做爲一個工具庫,使用的就是通用形式的觀察者模式。ide
RxJava 有四個基本概念:Observable
(可觀察者,即被觀察者)、 Observer
(觀察者)、 subscribe
(訂閱)、事件。Observable 和 Observer 經過 subscribe() 方法實現訂閱關係,從而 Observable 能夠在須要的時候發出事件來通知 Observer。函數
與傳統觀察者模式不一樣, RxJava 的事件回調方法除了普通事件 onNext()
(至關於 onClick() / onEvent())以外,還定義了兩個特殊的事件:onCompleted()
和 onError()
。工具
onCompleted()
: 事件隊列完結。RxJava 不只把每一個事件單獨處理,還會把它們看作一個隊列。RxJava 規定,當不會再有新的 onNext()
發出時,須要觸發 onCompleted()
方法做爲標誌。onError()
: 事件隊列異常。在事件處理過程當中出異常時,onError()
會被觸發,同時隊列自動終止,不容許再有事件發出。onCompleted()
和 onError()
有且只有一個,而且是事件序列中的最後一個。須要注意的是,onCompleted()
和 onError()
兩者也是互斥的,即在隊列中調用了其中一個,就不該該再調用另外一個。而且只要onCompleted()
和 onError()
中有一個調用了,都會停止 onNext()
的調用。RxJava 的觀察者模式大體以下圖:
post
基於以上觀點, RxJava 的基本實現主要有三點:性能
Observer 即觀察者,它決定事件觸發的時候將有怎樣的行爲。 RxJava 中的 Observer 接口的實現方式:
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};複製代碼
除了 Observer 接口以外,RxJava 還內置了一個實現了 Observer 的抽象類:Subscriber
。 Subscriber 對 Observer 接口進行了一些擴展,但他們的基本使用方式是徹底同樣的:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};複製代碼
不只基本使用方式同樣,實質上,在 RxJava 的 subscribe 過程當中,Observer 也老是會先被轉換成一個 Subscriber 再使用。
// Observable.java 源碼
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) { // 若是是 Subscriber 的子類,直接轉化爲 Subscriber
return subscribe((Subscriber<? super T>)observer);
}
if (observer == null) {
throw new NullPointerException("observer is null");
}
return subscribe(new ObserverSubscriber<T>(observer));
}複製代碼
// ObserverSubscriber.java
public final class ObserverSubscriber<T> extends Subscriber<T> {
...
}複製代碼
經過源碼能夠看到,傳入的
Observer
最終仍是會轉化爲Subscriber
來使用。
因此若是你只想使用基本功能,選擇 Observer 和 Subscriber 是徹底同樣的。它們的區別對於使用者來講主要有兩點:
onStart()
: 這是 Subscriber 增長的方法。它會在 subscribe 剛開始,而事件還未發送以前被調用,能夠用於作一些準備工做,例如數據的清零或重置。這是一個可選方法,默認狀況下它的實現爲空。須要注意的是,若是對準備工做的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執行), onStart() 就不適用了,由於它老是在 subscribe 所發生的線程被調用,而不能指定線程。要在指定的線程來作準備工做,可使用 doOnSubscribe() 方法。// Subscriber.java
public void onStart() {
// do nothing by default
}複製代碼
unsubscribe()
: 這是 Subscriber 所實現的另外一個接口 Subscription 的方法,用於取消訂閱。在這個方法被調用後,Subscriber 將再也不接收事件。通常在這個方法調用前,可使用 isUnsubscribed() 先判斷一下狀態。 unsubscribe()
這個方法很重要,由於在 subscribe() 以後, Observable 會持有 Subscriber 的引用,這個引用若是不能及時被釋放,將有內存泄露的風險。因此最好保持一個原則:要在再也不使用的時候儘快在合適的地方(例如 onPause() onStop() 等方法中)調用 unsubscribe()
來解除引用關係,以免內存泄露的發生。// Subscriber.java
@Override
public final void unsubscribe() {
subscriptions.unsubscribe();
}
@Override
public final boolean isUnsubscribed() {
return subscriptions.isUnsubscribed();
}複製代碼
Observable 即被觀察者,它決定何時觸發事件以及觸發怎樣的事件。例如 create()
方法
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});複製代碼
能夠看到,這裏傳入了一個 OnSubscribe
對象做爲參數。OnSubscribe
會被存儲在返回的 Observable
對象中,它的做用至關於一個計劃表,當 Observable
被訂閱的時候,OnSubscribe
的 call() 方法會自動被調用,事件序列就會依照設定依次觸發(對於上面的代碼,就是觀察者Subscriber
將會被調用三次 onNext() 和一次 onCompleted()。這樣,由被觀察者調用了觀察者的回調方法,就實現了由被觀察者向觀察者的事件傳遞,即觀察者模式。
create()
方法是 RxJava 最基本的創造事件序列的方法。基於這個方法, RxJava 還提供了一些方法用來快捷建立事件隊列,例如 just()
, from()
建立了 Observable 和 Observer 以後,再用 subscribe() 方法將它們聯結起來,整條鏈子就能夠工做了。代碼形式很簡單:
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);複製代碼
有人可能會注意到,
subscribe()
這個方法有點怪:它看起來是『observalbe
訂閱了observer / subscriber
』而不是『observer / subscriber
訂閱了observalbe
』,這看起來就像『雜誌訂閱了讀者』同樣顛倒了對象關係。這讓人讀起來有點彆扭,不過若是把 API 設計成observer.subscribe(observable) / subscriber.subscribe(observable)
,雖然更加符合思惟邏輯,但對流式 API 的設計就形成影響了,比較起來明顯是得不償失的。
整個過程當中對象間的關係以下圖:
// 例子
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println("value: " + s);
}
});複製代碼
log 信息
value: Hello
value: Hi
value: Aloha
onCompleted複製代碼
看到上面代碼,可能會有人跟我同樣不明白, create()
中的 OnSubscribe
中 call()
的 Subscriber
是怎麼樣最終就變成了 subscribe()
中的 Subscriber
。
下面來一下 Observable.subscribe(Subscriber)
的內部實現是這樣的(僅核心代碼):
// 注意:這不是 subscribe() 的源碼,而是將源碼中與性能、兼容性、擴展性有關的代碼剔除後的核心代碼。
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
// 能夠用於作一些準備工做,例如數據的清零或重置, 默認狀況下它的實現爲空
subscriber.onStart();
if (!(subscriber instanceof SafeSubscriber)) {
// 強制轉化爲 SafeSubscriber 是爲了保證 onCompleted 或 onError 調用的時候會停止 onNext 的調用
subscriber = new SafeSubscriber<T>(subscriber);
}
...
// // onObservableStart() 默認返回的就是 observable.onSubscribe
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
// onObservableReturn() 默認也是返回 subscriber
return RxJavaHooks.onObservableReturn(subscriber);
...
}複製代碼
經過源碼能夠看到,subscriber()
實際就作了 4 件事情
Subscriber.onStart()
。這個方法在前面已經介紹過,是一個可選的準備方法。Subscriber
轉化爲 SafeSubscriber
, 爲了保證 onCompleted 或 onError 調用的時候會停止 onNext 的調用。// 注意:這不是 SafeSubscriber 的源碼,而是將源碼中與性能、兼容性、擴展性有關的代碼剔除後的核心代碼。
public class SafeSubscriber<T> extends Subscriber<T> {
private final Subscriber<? super T> actual;
boolean done; // 經過改標誌來保證 onCompleted 或 onError 調用的時候會停止 onNext 的調用
public SafeSubscriber(Subscriber<? super T> actual) {
super(actual);
this.actual = actual;
}
@Override
public void onCompleted() {
if (!done) {
done = true;
...
actual.onCompleted();
...
unsubscribe(); // 取消訂閱,結束事務
}
}
@Override
public void onError(Throwable e) {
...
if (!done) {
done = true;
_onError(e);
}
}
@Override
public void onNext(T t) {
if (!done) { // done 爲 true 時,停止傳遞
actual.onNext(t);
}
}
@SuppressWarnings("deprecation")
protected void _onError(Throwable e) {
...
actual.onError(e);
...
unsubscribe();
...
}
}複製代碼
經過代碼能夠看出來,經過 SafeSubscriber
中的布爾變量 done
來作標記保證上文提到的 onCompleted()
和 onError()
兩者的互斥性,即在隊列中調用了其中一個,就不該該再調用另外一個。而且只要 onCompleted()
和 onError()
中有一個調用了,都會停止 onNext()
的調用。
Subscriber
做爲 Subscription 返回。這是爲了方便 unsubscribe()
.以上就是 RxJava 最基本的一個經過觀察者模式,來響應事件的原理。下面來看看 RxJava 中一些基本操做符的實現原理又是怎樣的。
爲了能更好的理解源碼,須要對 RxJava 有基本的使用基礎,對 RxJava 不太熟悉的朋友請先一步到《給 Android 開發者的 RxJava 詳解》
Observable.interval(1, TimeUnit.SECONDS)
.map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return aLong * 5;
}
})
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
System.out.println("value: " + aLong);
}
});複製代碼
log 信息
value: 0
value: 5
value: 10
...複製代碼
上面的列子會每秒生成一個從 0 依次遞增的整數,而後經過 map()
變換操做符後,變成了 5 的倍數的一個整數列。
第一次看到該例子時,就喜歡上了 RxJava,這種鏈式函數的交互模式真的很簡潔,終於能夠從回調地獄裏逃出來了。喜歡的同時難免也會想 RxJava 是如何實現的。這種鏈式的函數流能夠算是建造者模式的一種變形,只不過省去了中間 Builder
而直接返回當前對象來實現。 更讓我興奮的是內部這些操做符的實現原理。
上文也已經說過了在 RxJava 中, Observable 並非在建立的時候就當即開始發送事件,而是在它被訂閱的時候,即當 subscribe() 方法執行的時候。 因此對於上面一段的代碼咱們要從 subscribe()
往前屢,首先看一下 map()
這個函數的內部實現。
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
// 新建了一個 Observable 並使用新的 OnSubscribeMap 來封裝傳入的數據
return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
}複製代碼
不用說你們也猜到了 OnSubscribeMap
是 OnSubscribe
的子類
// 注意:這不是 OnSubscribeMap 的源碼,而是將源碼中與性能、兼容性、擴展性有關的代碼剔除後的核心代碼。
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
final Observable<T> source;
final Func1<? super T, ? extends R> transformer;
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source; // 列子中通過 Observable.interval() 函數生成的 Observable
this.transformer = transformer;
}
// 傳入的 o 就是例子中 `subscribe()` 出入的 Subscribe
// 具體結合 Observable.subscribe() 源碼來理解
@Override
public void call(final Subscriber<? super R> o) {
// 對傳入的 Subscriber 進行再次封裝成 MapSubscriber
// 具體 Observable.map() 的邏輯是在 MapSubscriber 中
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent); // 加入到 SubscriptionList 中,爲以後取消訂閱
// Observable.interval() 返回的 Observable 進行訂閱(關鍵點)
source.unsafeSubscribe(parent);
}
...
}複製代碼
能夠看到 call()
方法的邏輯很簡單,只是將例子中 Observable.subscribe()
傳入的 Subscriber
進行封裝後,再將上流傳入的 Observable
進行訂閱
// 注意:這不是 MapSubscriber 的源碼
// 而是將源碼中與性能、兼容性、擴展性有關的代碼剔除後的核心代碼。
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual; // Observable.subscribe() 傳入的 Subscriber
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
...
result = mapper.call(t); // 數據進行了變換
...
actual.onNext(result); // 往下流傳
}
...
}複製代碼
經過以上就完成了
map()
對數據的變換,這裏最終的就是理解OnSubscribeMap
的call()
中 source.unsafeSubscribe(parent);source
指的是例子中Observable.interval()
生成的對象。
再來看一下 RxJava 中對 Observable.interval()
的實現
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
return unsafeCreate(new OnSubscribeTimerPeriodically(initialDelay, period, unit, scheduler));
}複製代碼
能夠看出 interval()
和 map()
同樣都是經過生成新的 Observable
並向 Observable
中傳入與之對應的 OnSubscribe
的子類來完成具體操做。
// 注意:這不是 OnSubscribeTimerPeriodically 的源碼
// 而是將源碼中與性能、兼容性、擴展性有關的代碼剔除後的核心代碼。
public final class OnSubscribeTimerPeriodically implements OnSubscribe<Long> {
final long initialDelay;
final long period;
final TimeUnit unit;
final Scheduler scheduler;
public OnSubscribeTimerPeriodically(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
this.initialDelay = initialDelay;
this.period = period;
this.unit = unit;
this.scheduler = scheduler;
}
// 傳入的 Subscriber 爲上文提到的 OnSubscribeMap.call() 方法中 source.unsafeSubscribe(parent);
@Override
public void call(final Subscriber<? super Long> child) {
final Worker worker = scheduler.createWorker();
child.add(worker);
worker.schedulePeriodically(new Action0() {
long counter;
@Override
public void call() {
...
child.onNext(counter++);
...
}
}, initialDelay, period, unit);
}
}複製代碼
以上就是 RxJava 總體的邏輯結構,能夠看到 RxJava 將觀察者模式發揮的淋漓盡致。總體邏輯的處理有點像遞歸函數的原理。而 map()
則像一種代理機制,經過事件攔截和處理實現事件序列的變換。
總結: 精簡掉細節的話,也能夠這麼說:在 Observable 執行了各類操做符( map, interval 等)以後 方法以後,會返回一個新的 Observable,這個新的 Observable 會像一個代理同樣,負責接收原始的 Observable 發出的事件,並在處理後發送給 Subscriber。