咱們在使用RxJava的時候最經常使用的功能就是寫一個被觀察者、一個觀察者。在被觀察者中發射數據,在觀察者中接收數據,最後用subscribe將二者給訂閱起來實現最基礎的功能。例以下面這種:git
//被觀察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("aa");
e.onNext("bb");
e.onNext("cc");
e.onNext("dd");
e.onComplete();
}
});
//觀察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
//TODO 初始化數據
d.dispose();
}
@Override
public void onNext(String value) {
//TODO 接收被觀察者發送的數據
Log.i("result:",value);
}
@Override
public void onError(Throwable e) {
//TODO 錯誤
}
@Override
public void onComplete() {
//TODO 完成以後回調
}
};
//訂閱
observable.subscribe(observer);
複製代碼
那麼在這種狀況下,被觀察者是如何發送數據給觀察者?觀察者又是如何接收數據?二者又是如何被subscribe訂閱起來的呢?下面咱們經過源碼的分析來查看這一切的操做流程。github
首先,在建立被觀察者的時候,通常來講都是經過Observable.creat()來建立。那麼咱們進入creat()方法去看看裏面作了什麼操做。bash
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//非空判斷
ObjectHelper.requireNonNull(source, "source is null");
//返回一個Observable
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
複製代碼
咱們跳過非空判斷邏輯,直接查看return。這裏會經過RxJavaPlugins.onAssembly()返回一個Observable對象。那麼咱們跳進onAssembly去查看裏面是如何進行Observable的建立的。ide
public static <T> Observable<T> onAssembly(Observable<T> source) {
...
//返回傳入的參數對象
return source;
}
複製代碼
這裏其實沒作什麼很特別的操做,僅僅只是返回了參數對象。也就是說咱們如今應該返回去重點研究的是這個參數對象source。而這個source根據前面的源碼查看,能夠看到實際上是new ObservableCreate()。咱們跳進這裏查看源碼分析
//ObservableCreat繼承了Observable,爲被觀察者Observable的子類
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
//構造方法,傳入參數ObservableOnSubscribe,也就是咱們在裏面進行onNext、onComplete與onError的方法。
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
//此方法是在被訂閱subscribe時候才調用,具體後面再說。
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
}
複製代碼
代碼進行了一些刪減。學習
經過上面代碼邏輯註釋能夠看到:ui
1.ObservableCreat爲Observable的子類,由於他擁有Observable的所有特性。this
2.在ObservableCreat構造方法中傳入了ObservableOnSubscribe,這個具體的做用咱們下面講。spa
3.有一個subscribeActual方法,這個方法實際上是後面用做訂閱的方法subscribe來實現的具體方式。線程
如今咱們再來看看傳入ObservableCreat中參數ObservableOnSubscribe
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param e the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(ObservableEmitter<T> e) throws Exception;
}
複製代碼
發現ObservableOnSubscribe實際上是一個接口,而這個接口裏有一個方法,專門用來實現咱們的向觀察者發送消息的方法。到此,被觀察者在進行creat()的源碼分析完畢,咱們來總結一下。
總結:
1.在進行creat的時候,內部返回了一個Observable,而這個Observable其實是一個繼承了ObserVable的ObservableCreat類
2.ObserVable的ObservableCreat構造方法中傳入了一個接口ObservableOnSubscribe,咱們通常進行數據的發送都是經過這個接口中的subscribe方法裏的ObservableEmitter來進行onNext、onComplete與onError。
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//獲取observer
observer = RxJavaPlugins.onSubscribe(this, observer);
//非空判斷
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//訂閱方法
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
複製代碼
拋開上面的邏輯判斷不談,咱們直接看訂閱方法subscribeActual()。不知道你們還記不記得,在咱們講Observable.creat的時候,在ObservableCreat這個類裏面有兩個用到的方法,一個是構造方法,傳入接口ObservableOnSubscribe,另一個是subscribeActual。而如今Observable.subscribe實際上就是在執行這個方法。
@Override
protected void subscribeActual(Observer<? super T> observer) {
//實例化CreatEmitter對象
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//執行觀察者中的onSubscribe方法
//onSubscribe()回調所在的線程是ObservableCreate執行subscribe()所在的線程
//和subscribeOn()與observeOn()無關!
observer.onSubscribe(parent);
try {
//真正的訂閱方法
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
複製代碼
咱們在來看下這方法中實例化CreatEmitter對象的這個類。
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
//當沒有被取消訂閱的時候就執行onNext()方法用於發送數據
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
//出現錯誤時調用這個方法,用於拋出異常,而且在拋出以後的finally中調用dispose用於取消訂閱
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
//判斷若是沒有執行disposed方法就調用onComplete而且dispose。這也就是爲何onComplete與onError爲何只會執行其中一個
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
複製代碼
裏面的邏輯能夠說是一點都不復雜,就是咱們平時常常使用的onNext、onComplete與onError方法。
onNext():先判斷髮送的消息是否爲null,若是爲空則調用onError方法來拋出異常。若不爲空而且並未取消訂閱,則發送數據。
onError():出現錯誤的時候執行這個方法。當拋去異常以後經過finally強行執行dispose()方法,來強制結束掉訂閱。
onComplete():判斷若是沒有執行disposed方法就調用onComplete而且dispose。這也就是爲何onComplete與onError爲何只會執行其中一個。
分析一下各個類的職責:
Observable :我的理解是裝飾器模式下的基類,實際上全部操做都是Observable的子類進行的實現
ObservableOnSubscribe: 接口,定義了數據源的發射行爲
ObservableCreate: 裝飾器模式的具體體現,內部存儲了數據源的發射事件,和subscribe訂閱事件
ObservableEmitter: 數據源發射器,內部存儲了observer
Observer: 接收到數據源後的回調(好比打印數據等)
1.Observable.create(),實例化ObservableCreate和ObservableOnSubscribe,並存儲數據源發射行爲,準備發射(我已經準備好數據源,等待被訂閱)
2.Observable.subscribe(),實例化ObservableEmitter(發射器ObservableEmitter準備好!數據發射後,數據處理方式Observer已準備好!)
3.執行Observer.onSubscribe()回調,ObservableEmitter做爲Disposable參數傳入
4.執行ObservableOnSubscribe.subscribe()方法(ObservableEmitter發射數據,ObservableEmitter內部的Observer處理數據)
具體其餘的一些操做符的用法,請參考個人github:RxJavaDemo
有興趣能夠關注個人小專欄,學習更多知識:小專欄