轉載請標明出處:
juejin.im/post/58c5f8…
本文出自:【張旭童的稀土掘金】(juejin.im/user/56de21…)javascript
最近事情太多了,如今公司內部的變更,本身崗位的變化,以及最近決定找工做。因此博客耽誤了,準備面試中,打算看一看RxJava2的源碼,遂有了這篇文章。java
不會對RxJava2的源碼逐字逐句的閱讀,只尋找關鍵處,咱們平時接觸獲得的那些代碼。
背壓實際中接觸較少,故只分析了Observable
.
分析的源碼版本爲:2.0.1面試
咱們的目的:app
Observable
)是如何將數據發送出去的。Observer
)是如何接收到數據的。本文先達到目的1 ,2 ,3。
我我的認爲主要仍是適配器模式的體現,咱們接觸的就只有Observable
和Observer
,其實內部有大量的中間對象在適配:將它們兩聯繫起來,加入一些額外功能,例如考慮dispose和hook等。ide
這是一段不涉及操做符和線程切換的簡單例子:post
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext() called with: value = [" + value + "]");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError() called with: e = [" + e + "]");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete() called");
}
});複製代碼
拿 create來講, ui
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//.....
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}複製代碼
返回值是Observable
,參數是ObservableOnSubscribe
,定義以下:this
public interface ObservableOnSubscribe<T> {
void subscribe(ObservableEmitter<T> e) throws Exception;
}複製代碼
ObservableOnSubscribe
是一個接口,裏面就一個方法,也是咱們實現的那個方法:
該方法的參數是 ObservableEmitter
,我認爲它是關聯起 Disposable
概念的一層:spa
public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(Disposable d);
void setCancellable(Cancellable c);
boolean isDisposed();
ObservableEmitter<T> serialize();
}複製代碼
ObservableEmitter
也是一個接口。裏面方法不少,它也繼承了 Emitter<T>
接口。線程
public interface Emitter<T> {
void onNext(T value);
void onError(Throwable error);
void onComplete();
}複製代碼
Emitter<T>
定義了 咱們在ObservableOnSubscribe
中實現subscribe()
方法裏最經常使用的三個方法。
好,咱們回到原點,create()
方法裏就一句話return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
,其中提到RxJavaPlugins.onAssembly()
:
/** * Calls the associated hook function. * @param <T> the value type * @param source the hook's input value * @return the value returned by the hook */
@SuppressWarnings({ "rawtypes", "unchecked" })
public static <T> Observable<T> onAssembly(Observable<T> source) {
Function<Observable, Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}複製代碼
能夠看到這是一個關於hook的方法,關於hook咱們暫且不表,不影響主流程,咱們默認使用中都沒有hook,因此這裏就是直接返回source
,即傳入的對象,也就是new ObservableCreate<T>(source)
.
ObservableCreate
我認爲算是一種適配器的體現,create()
須要返回的是Observable
,而我如今有的是(方法傳入的是)ObservableOnSubscribe
對象,ObservableCreate
將ObservableOnSubscribe
適配成Observable
。
其中subscribeActual()
方法表示的是被訂閱時真正被執行的方法,放後面解析:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}複製代碼
OK,至此,建立流程結束,咱們獲得了Observable<T>
對象,其實就是ObservableCreate<T>
.
subscribe()
:
public final void subscribe(Observer<? super T> observer) {
...
try {
//1 hook相關,略過
observer = RxJavaPlugins.onSubscribe(this, observer);
...
//2 真正的訂閱處
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
//3 錯誤處理,
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
//4 hook錯誤相關,略過
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}複製代碼
關於hook的代碼:
能夠看到若是沒有hook,即相應的對象是null,則是傳入什麼返回什麼的。
/** * Calls the associated hook function. * @param <T> the value type * @param source the hook's input value * @param observer the observer * @return the value returned by the hook */
@SuppressWarnings({ "rawtypes", "unchecked" })
public static <T> Observer<? super T> onSubscribe(Observable<T> source, Observer<? super T> observer) {
//1 默認onObservableSubscribe(可理解爲一個flatmap的操做)是null
BiFunction<Observable, Observer, Observer> f = onObservableSubscribe;
//2 因此這句跳過,不會對其進行apply
if (f != null) {
return apply(f, source, observer);
}
//3 返回參數2
return observer;
}複製代碼
我也是驗證了一下 三個Hook相關的變量,確實是null:
Consumer<Throwable> errorHandler = RxJavaPlugins.getErrorHandler();
BiFunction<Observable, Observer, Observer> onObservableSubscribe = RxJavaPlugins.getOnObservableSubscribe();
Function<Observable, Observable> onObservableAssembly = RxJavaPlugins.getOnObservableAssembly();
Log.e(TAG, "errorHandler = [" + errorHandler + "]");
Log.e(TAG, "onObservableSubscribe = [" + onObservableSubscribe + "]");
Log.e(TAG, "onObservableAssembly = [" + onObservableAssembly + "]");複製代碼
因此訂閱時的重點就是:
//2 真正的訂閱處
subscribeActual(observer);複製代碼
咱們將第一節提到的ObservableCreate
裏的subscribeActual()
方法拿出來看看:
@Override
protected void subscribeActual(Observer<? super T> observer) {
//1 建立CreateEmitter,也是一個適配器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//2 onSubscribe()參數是Disposable ,因此CreateEmitter能夠將Observer->Disposable 。還有一點要注意的是`onSubscribe()`是在咱們執行`subscribe()`這句代碼的那個線程回調的,並不受線程調度影響。
observer.onSubscribe(parent);
try {
//3 將ObservableOnSubscribe(源頭)與CreateEmitter(Observer,終點)聯繫起來
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
//4 錯誤回調
parent.onError(ex);
}
}複製代碼
Observer
是一個接口,裏面就四個方法,咱們在開頭的例子中已經所有實現(打印Log)。
public interface Observer<T> {
void onSubscribe(Disposable d);
void onNext(T value);
void onError(Throwable e);
void onComplete();
}複製代碼
重點在這一句:
//3 將ObservableOnSubscribe(源頭)與CreateEmitter(Observer,終點)聯繫起來
source.subscribe(parent);複製代碼
source
即ObservableOnSubscribe
對象,在本文中是:
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onComplete();
}
}複製代碼
則會調用parent.onNext()
和parent.onComplete()
,parent
是CreateEmitter
對象,以下:
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
...
//若是沒有被dispose,會調用Observer的onNext()方法
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
...
//1 若是沒有被dispose,會調用Observer的onError()方法
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
//2 必定會自動dispose()
dispose();
}
} else {
//3 若是已經被dispose了,會拋出異常。因此onError、onComplete彼此互斥,只能被調用一次
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
//1 若是沒有被dispose,會調用Observer的onComplete()方法
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
//2 必定會自動dispose()
dispose();
}
}
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}複製代碼
總結重點:
Observable
和Observer
的關係沒有被dispose
,纔會回調Observer
的onXXXX()
方法Observer
的onComplete()
和onError()
互斥只能執行一次,由於CreateEmitter
在回調他們兩中任意一個後,都會自動dispose()
。根據第一點,驗證此結論。Observable
和Observer
關聯時(訂閱時),Observable
纔會開始發送數據。ObservableCreate
將ObservableOnSubscribe
(真正的源)->Observable
.ObservableOnSubscribe
(真正的源)須要的是發射器ObservableEmitter
.CreateEmitter
將Observer
->ObservableEmitter
,同時它也是Disposable
.error
後complete
,complete
不顯示。 反之會crash,感興趣的能夠寫以下代碼驗證。e.onNext("1");
//先error後complete,complete不顯示。 反之 會crash
//e.onError(new IOException("sb error"));
e.onComplete();
e.onError(new IOException("sb error"));複製代碼
DisposableHelper
本來到這裏,最簡單的一個流程咱們算是搞清了。
還值得一提的是,DisposableHelper.dispose(this);
DisposableHelper
頗有趣,它是一個枚舉,這是利用枚舉實現了一個單例disposed state
,即是否disposed,若是Disposable
類型的變量的引用等於DISPOSED
,則起點和終點已經斷開聯繫。
其中大多數方法 都是靜態方法,因此isDisposed()
方法的實現就很簡單,直接比較引用便可.
其餘的幾個方法,和AtomicReference
類攪基在了一塊兒。
這是一個實現引用原子操做的類,對象引用的原子更新
,經常使用方法以下:
//返回當前的引用。
V get()
//若是當前值與給定的expect引用相等,(注意是引用相等而不是equals()相等),更新爲指定的update值。
boolean compareAndSet(V expect, V update)
//原子地設爲給定值並返回舊值。
V getAndSet(V newValue)複製代碼
OK,鋪墊完了咱們看看源碼吧:
public enum DisposableHelper implements Disposable {
/** * The singleton instance representing a terminal, disposed state, don't leak it. */
DISPOSED
;
public static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}
public static boolean dispose(AtomicReference<Disposable> field) {
//1 經過斷點查看,默認狀況下,field的值是"null",並不是引用是null哦!大坑大坑大坑
//可是current是null引用
Disposable current = field.get();
Disposable d = DISPOSED;
//2 null不等於DISPOSED
if (current != d) {
//3 field是DISPOSED了,current仍是null
current = field.getAndSet(d);
if (current != d) {
//4 默認狀況下 走不到這裏,這裏是在設置了setCancellable()後會走到。
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}複製代碼
subscribeActual()
方法中,源頭和終點關聯起來。source.subscribe(parent);
這句代碼執行時,纔開始從發送ObservableOnSubscribe
中利用ObservableEmitter
發送數據給Observer
。即數據是從源頭push給終點的。CreateEmitter
中,只有Observable
和Observer
的關係沒有被dispose
,纔會回調Observer
的onXXXX()
方法Observer
的onComplete()
和onError()
互斥只能執行一次,由於CreateEmitter
在回調他們兩中任意一個後,都會自動dispose()
。根據上一點,驗證此結論。error
後complete
,complete
不顯示。 反之會crashonSubscribe()
是在咱們執行subscribe()
這句代碼的那個線程回調的,並不受線程調度影響。轉載請標明出處:
juejin.im/post/58c5f8…
本文出自:【張旭童的稀土掘金】(juejin.im/user/56de21…)