RxJava是一個神奇的框架,用法很簡單,但內部實現有點複雜,代碼邏輯有點繞。我讀源碼時,確實有點似懂非懂的感受。網上關於RxJava源碼分析的文章,源碼貼了一大堆,代碼邏輯繞來繞去的,讓人看得雲裏霧裏的。既然用拆輪子的方式來分析源碼比較難啃,不如換種方式,以造輪子的方式,將源碼中與性能、兼容性、擴展性有關的代碼剔除,留下核心代碼帶你們揭祕 RxJava 的實現原理。java
什麼是RxJavareact
將上面的例子進行代碼抽象,步驟以下:android
上面示例的演示代碼以下:
//1.建立被觀察者
Observable<String> observable =
Observable.create(new Observable.OnSubscribe<String>() {br/>@Override
public void call(Subscriber<? super String> subscriber) {
//4.開始發送事件
//事件有3個類型 分別是onNext() onCompleted() onError()
//onCompleted() onError() 通常都是用來通知觀察者 事件發送完畢了,二者只取其一。
subscriber.onNext("Hello Android !");
subscriber.onNext("Hello Java !");
subscriber.onNext("Hello C !");
subscriber.onCompleted();
}
});編程
//2.建立觀察者 Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onCompleted() { Log.i(TAG, "onCompleted "); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: "+e.getLocalizedMessage()); } @Override public void onNext(String s) { Log.i(TAG, "onNext: "+s); } }; //3.訂閱 observable.subscribe(subscriber);
輸出以下:
com.m520it.rxjava I/IT520: onNext: Hello Android !
com.m520it.rxjava I/IT520: onNext: Hello Java !
com.m520it.rxjava I/IT520: onNext: Hello C !
com.m520it.rxjava I/IT520: onCompleted
代碼運行的原理markdown
• onError - 若是沒法發射須要的值,Single發射一個Throwable對象到這個方法
Single只會調用這兩個方法中的一個,並且只會調用一次,調用了任何一個方法以後,訂閱關係終止。
final Single<String> single = Single.create(new Single.OnSubscribe<String>() {br/>@Override
public void call(SingleSubscriber<? super String> singleSubscriber) {
//先調用onNext() 最後調用onCompleted()
//singleSubscriber.onSuccess("Hello Android !");
//只調用onError();
singleSubscriber.onError(new NullPointerException("mock Exception !"));
}
});架構
Observer<String> observer = new Observer<String>() {br/>@Override
public void onCompleted() {
Log.i(TAG, "onCompleted ");
}框架
@Override public void onError(Throwable e) { Log.i(TAG, "onError: "+e.getLocalizedMessage()); } @Override public void onNext(String s) { Log.i(TAG, "onNext: "+s); }
};
single.subscribe(observer);
6.觀察者變種
Observer觀察者對象,上面咱們用Subscriber對象代替。由於該對象自己就是繼承了Observer。
該對象實現了onNext()&onCompleted()&onError()事件,咱們若是對哪一個事件比較關心,只須要實現對應的方法便可,代碼以下:
//建立觀察者
Subscriber<String> subscriber = new Subscriber<String>() {br/>@Override
public void onCompleted() {
Log.i(TAG, "onCompleted ");
}編程語言
@Override public void onError(Throwable e) { Log.i(TAG, "onError: "+e.getLocalizedMessage()); } @Override public void onNext(String s) { Log.i(TAG, "onNext: "+s); }
};ide
//訂閱
observable.subscribe(subscriber);
上面的代碼中,若是你只關心onNext()事件,但卻不得不實現onCompleted()&onError()事件.這樣的代碼就顯得很臃腫。鑑於這種需求,RxJava框架在訂閱方面作了特定的調整,代碼以下:
//爲指定的onNext事件建立獨立的接口
Action1<String> onNextAction = new Action1<String>() {br/>@Override
public void call(String s) {
Log.i(TAG, "call: "+s);
}
};函數
//訂閱
observable.subscribe(onNextAction);
不知道你們注意到沒有,subscribe()訂閱的再也不是觀察者,而是特定的onNext接口對象。相似的函數以下,咱們能夠根據須要實現對應的訂閱:
public Subscription subscribe(final Observer observer)
public Subscription subscribe(final Action1 onNext)
public Subscription subscribe(final Action1 onNext, Action1 onError)
public Subscription subscribe(final Action1 onNext, Action1 onError, Action0 onCompleted)
這裏還有一個forEach函數有相似的功能:
public void forEach(final Action1 onNext)
public void forEach(final Action1 onNext, Action1 onError)
public void forEach(final Action1 onNext, Action1 onError, Action0 onComplete)
##7.Subject變種
上面2節中既介紹了被觀察者變種,又介紹了觀察者變種,這裏再介紹一種雌雄同體的對象(既做爲被觀察者使用,也能夠做爲觀察者)。
針對不一樣的場景一共有四種類型的Subject。他們並非在全部的實現中所有都存在。
###AsyncSubject
一個AsyncSubject只在原始Observable完成後,發射來自原始Observable的最後一個值。它會把這最後一個值發射給任何後續的觀察者。
如下貼出代碼:
//建立被觀察者final AsyncSubject<String> subject = AsyncSubject.create();//建立觀察者
Subscriber<String> subscriber = new Subscriber<String>() {br/>@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}
@Override public void onError(Throwable e) { Log.i(TAG, "onError"); } @Override public void onNext(String s) { Log.i(TAG, "s:" + s); }
};//訂閱事件
subject.subscribe(subscriber);//被觀察者發出事件 若是調用onCompleted(),onNext()則會打印最後一個事件;若是沒有,onNext()則不打印任何事件。
subject.onNext("Hello Android ");
subject.onNext("Hello Java ");
subject.onCompleted();
輸出:
s:Hello Java onCompleted
然而,若是原始的Observable由於發生了錯誤而終止,AsyncSubject將不會發射任何數據,只是簡單的向前傳遞這個錯誤通知。
上面的觀察者被觀察者代碼相同,如今發出一系列信號,並在最後發出異常 代碼以下:
subject.onNext("Hello Android ");
subject.onNext("Hello Java ");//由於發送了異常 因此onNext()沒法被打印
subject.onError(null);
###BehaviorSubject
當觀察者訂閱BehaviorSubject時,他會將訂閱前最後一次發送的事件和訂閱後的全部發送事件都打印出來,若是訂閱前無發送事件,則會默認接收構造器create(T)裏面的對象和訂閱後的全部事件,代碼以下:
BehaviorSubject subject=BehaviorSubject.create("NROMAL");
Subscriber subscriber = new Subscriber() {br/>@Override
public void onCompleted() {
Log.i(TAG, "onCompleted");
}
@Override public void onError(Throwable e) { Log.i(TAG, "onError"); } @Override public void onNext(Object o) { Log.i(TAG, "onNext: " + o); }
};
//subject.onNext("Hello Android !");//subject.onNext("Hello Java !");//subject.onNext("Hello C !");//這裏開始訂閱 若是上面的3個註釋沒去掉,則Hello C的事件和訂閱後面的事件生效//若是上面的三個註釋去掉 則打印構造器NORMAL事件生效後和訂閱後面的事件生效
subject.subscribe(subscriber);
subject.onNext("Hello CPP !");
subject.onNext("Hello IOS !");
PublishSubject
PublishSubject只會把在訂閱發生的時間點以後來自原始Observable的數據發射給觀察者。
須要注意的是,PublishSubject可能會一建立完成就馬上開始發射數據,所以這裏有一個風險:在Subject被建立後到有觀察者訂閱它以前這個時間段內,一個或多個數據可能會丟失。
代碼以下:
PublishSubject subject= PublishSubject.create();
Action1<String> onNextAction1 = new Action1<String>(){
@Override public void call(String s) { Log.i(TAG, "onNextAction1 call: "+s); }
};
Action1<String> onNextAction2 = new Action1<String>(){
@Override public void call(String s) { Log.i(TAG, "onNextAction2 call: "+s); }
};
subject.onNext("Hello Android !");
subject.subscribe(onNextAction1);
subject.onNext("Hello Java !");
subject.subscribe(onNextAction2);
subject.onNext("Hello IOS !");
輸出以下:
onNextAction1 call: Hello Java !
onNextAction1 call: Hello IOS !
onNextAction2 call: Hello IOS !
ReplaySubject
ReplaySubject會發射全部來自原始Observable的數據給觀察者,不管它們是什麼時候訂閱的。
代碼以下:
ReplaySubject subject= ReplaySubject.create();
Action1<String> onNextAction1 = new Action1<String>(){
@Override public void call(String s) { Log.i(TAG, "onNextAction1 call: "+s); }
};
Action1<String> onNextAction2 = new Action1<String>(){
@Override public void call(String s) { Log.i(TAG, "onNextAction2 call: "+s); }
};
subject.onNext("Hello Android !");
subject.subscribe(onNextAction1);
subject.onNext("Hello Java !");
subject.subscribe(onNextAction2);
subject.onNext("Hello IOS !");
輸出以下:
onNextAction1 call: Hello Android !
onNextAction1 call: Hello Java !
onNextAction2 call: Hello Android !
onNextAction2 call: Hello Java !
onNextAction1 call: Hello IOS !
onNextAction2 call: Hello IOS !
###Subject總結
AsyncSubject不管什麼時候訂閱 只會接收最後一次onNext()事件,若是最後出現異常,則不會打印任何onNext()BehaviorSubject會從訂閱前最後一次oNext()開始打印直至結束。若是訂閱前無調用onNext(),則調用默認creat(T)傳入的對象。若是異常後才調用,則不打印onNext()PublishSubject只會打印訂閱後的任何事件。ReplaySubject不管訂閱在什麼時候都會調用發送的事件。