以前就寫過一篇關於Rxjava最友好的文章,反響很不錯,因爲那篇文章的定位就是簡單友好,所以儘量的摒棄複雜的概念,只抓住關鍵的東西來說,以保證你們都能看懂。javascript
不過那篇文章寫完以後,我就以爲應該還得有一篇文章給RxJava作一個深刻的講解纔算完美,因而就有了今天的進階篇。由於一個團隊裏可能你們都會用RxJava,可是必需要有一我的很懂這個,否則碰到問題可就麻煩了。java
在前一篇文章中的最後,咱們得出結論:RxJava就是在觀察者模式的骨架下,經過豐富的操做符和便捷的異步操做來完成對於複雜業務的處理。今天咱們仍是就結論中的觀察者模式和操做符來作深刻的拓展。git
在進入正題以前,仍是但願你們先去看看關於Rxjava最友好的文章。github
前一篇文章首先就重點談到了觀察者模式,咱們認爲觀察者模式RxJava的骨架*。在這裏不是要推翻以前的結論,而是但願從深刻它的內部的去了解它的實現。架構
依然使用以前文章中關於開關和檯燈的代碼app
//建立一個被觀察者(開關)
Observable switcher=Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("On");
subscriber.onNext("Off");
subscriber.onNext("On");
subscriber.onNext("On");
subscriber.onCompleted();
}
});
//建立一個觀察者(檯燈)
Subscriber light=new Subscriber<String>() {
@Override
public void onCompleted() {
//被觀察者的onCompleted()事件會走到這裏;
Log.d("DDDDDD","結束觀察...\n");
}
@Override
public void onError(Throwable e) {
//出現錯誤會調用這個方法
}
@Override
public void onNext(String s) {
//處理傳過來的onNext事件
Log.d("DDDDD","handle this---"+s)
}
//訂閱
switcher.subscribe(light);複製代碼
以上就是一個RxJava觀察者架構,
看到這樣的代碼不知道你會不會有一些疑惑:框架
其實,這些問題均可以經過了解OnSubscribe來解決。異步
那咱們先來看看關於OnSubscribe的定義ide
//上一篇文章也提到Acton1這個接口,內部只有一個待實現call()方法
//沒啥特別,人畜無害
public interface Action1<T> extends Action {
void call(T t);
}
//OnSubscribe繼承了這個Action1接口
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// OnSubscribe仍然是個接口
}複製代碼
那麼也就是說,OnSubscribe本質上也是和 Action1同樣的接口,只不過它專門用於Observable內部。函數
而在Observable觀察者的類中,OnSubscribe是它惟一的屬性,同時也是Observable構造函數中惟一必須傳入的參數,也就是說,只要建立了Observable,那麼內部也必定有一個OnSubscribe對象。
固然,Observable是沒有辦法直接new的,咱們只能經過create(),just()等等方法建立,固然,這些方法背後去調用了new Observable(onSubscribe)
public class Observable<T> {
//惟一的屬性
final OnSubscribe<T> onSubscribe;
//構造函數,由於protected,咱們只能使用create函數
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
//create(onSubscribe) 內部調用構造函數。
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
....
....
}複製代碼
當建立了Observable和Subscribe以後,調用subscribe(subscriber)方法時,發生了什麼呢?
//傳入了觀察者對象
public final Subscription subscribe(final Observer<? super T> observer) {
....
//往下調用
return subscribe(new ObserverSubscriber<T>(observer));
}
public final Subscription subscribe(Subscriber<? super T> subscriber) {
//往下調用
return Observable.subscribe(subscriber, this);
}
//調用到這個函數
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// new Subscriber so onStart it
subscriber.onStart();
// add a significant depth to already huge call stacks.
try {
// 在這裏簡單講,對onSubscribe進行封裝,沒必要緊張。
OnSubscribe onSubscribe=RxJavaHooks.onObservableStart(observable, observable.onSubscribe);
//這個纔是重點!!!
//這個調用的具體實現方法就是咱們建立觀察者時
//寫在Observable.create()中的call()呀
//而調用了那個call(),就意味着事件開始發送了
onSubscribe.call(subscriber);
//不信你往回看
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
....
....
}
return Subscriptions.unsubscribed();
}
}複製代碼
代碼看到這裏,咱們就能夠對上面三個問題作統一的回答了:
到這裏,你是否是對於RxJava的觀察者模式瞭解更加清晰了呢?咱們用流程圖複習一下剛纔的過程。
瞭解了上面這些,咱們就能夠更進一步作如下總結:
以上的結論對於下面咱們理解操做符的原理十分有幫助,所以必定要看明白。
觀察者模式介紹到這裏,纔敢說講完了。
上一篇文章講了一些操做符,而且在github上放了不少其餘的操做符使用範例給你們,所以在這裏不會介紹更多操做符的用法,而是講解操做符的實現原理。他是如何攔截事件,而後變換處理以後,最後傳遞到觀察者手中的呢?
相信瞭解相關內容的人可能會想到lift()操做符,它原本是其餘操做符作變換的基礎,不過那已是好幾個版本之前的事情了。可是目前版本中RxJava已經不同了了,直接把lift()的工做下放到每一個操做符中,把lift的弱化了(可是依然保留了lift()操做符)。
所以,咱們在這裏沒必要講解lift,直接拿一個操做符作例子,來了解它的原理便可,由於基本上操做符的實現原理都是同樣的。
以map()爲例,依然拿以前文章裏面的例子:
Observable.just(getFilePath())
//使用map操做來完成類型轉換
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
//顯然自定義的createBitmapFromPath(s)方法,是一個極其耗時的操做
return createBitmapFromPath(s);
}
})
.subscribe(
//建立觀察者,做爲事件傳遞的終點處理事件
new Subscriber<Bitmap>() {
@Override
public void onCompleted() {
Log.d("DDDDDD","結束觀察...\n");
}
@Override
public void onError(Throwable e) {
//出現錯誤會調用這個方法
}
@Override
public void onNext(Bitmap s) {
//處理事件
showBitmap(s)
}
);複製代碼
看看map背後到底作了什麼
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
//建立了全新代理的的Observable,構造函數傳入的參數是OnSubscribe
//OnSubscribeMap顯然是OnSubscribe的一個實現類,
//也就是說,OnSubscribeMap須要實現call()方法
//構造函數傳入了真實的Observable對象
//和一個開發者本身實現的Func1的實例
return create(new OnSubscribeMap<T, R>(this, func));
}複製代碼
看OnSubscribeMap的具體實現:
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
//用於保存真實的Observable對象
final Observable<T> source;
//還有咱們傳入的那個Func1的實例
final Func1<? super T, ? extends R> transformer;
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
//實現了call方法,咱們知道call方法傳入的Subscriber
//就是訂閱以後,外部傳入真實的的觀察者
@Override
public void call(final Subscriber<? super R> o) {
//把外部傳入的真實觀察者傳入到MapSubscriber,構造一個代理的觀察者
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
//讓外部的Observable去訂閱這個代理的觀察者
source.unsafeSubscribe(parent);
}
//Subscriber的子類,用於構造一個代理的觀察者
static final class MapSubscriber<T, R> extends Subscriber<T> {
//這個Subscriber保存了真實的觀察者
final Subscriber<? super R> actual;
//咱們本身在外部本身定義的Func1
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
//外部的Observable發送的onNext()等事件
//都會首先傳遞到代理觀察者這裏
@Override
public void onNext(T t) {
R result;
try {
//mapper其實就是開發者本身建立的Func1,
//call()開始變換數據
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
//調用真實的觀察者的onNext()
//從而在變換數據以後,把數據送到真實的觀察者手中
actual.onNext(result);
}
//onError()方法也是同樣
@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;
actual.onError(e);
}
@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}
@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
}
}複製代碼
map操做符的原理基本上就講完了,其餘的操做符和map在原理上是一致的。
假如你想建立自定義的操做符(這實際上是不建議的),你應該按照上面的步驟
我知道你會有點暈,不要緊,我後面會寫一個自定義操做符放在個人github上,能夠關注下。
下面,咱們先經過一個流程圖鞏固一下前面學習的成果。
下次你使用操做符時,內心應該清楚,每使用一個操做符,都會建立一個代理觀察者和一個代理被觀察者,以及他們之間是如何相互調用的。相信有了這一層的理解,從此使用RxJava應該會更加駕輕就熟。
不過最後,我想給你們留一個思考題:使用一個操做符時,流程圖是這樣的,那麼使用多個操做符呢?
暫無
到這裏,關於RxJava的講解就基本能夠告一段落了,
我相信,兩篇文章讀下來,對於RxJava的理解應該已經到了比較高的一個層次了,個人目標也就達到了。
接下來....
由於RxJava是一個事件的異步處理框架,理論上,他能夠封裝任何其餘的庫,那麼.....