你是否是看過了不少分析Rxjava源碼的文章,但依舊沒法在心中勾勒出Rxjava原理的樣貌。是什麼讓咱們閱讀Rxjava源碼變得如此艱難?是Rxjava的代碼封裝,以及各類細節問題的解決。本文我把Rxjava的各類封裝、抽象通通剝去,只專一於基本的事件變換。在理解了事件變換大概是作了件什麼事情時,再去看源碼,考慮一些其它問題就會更加容易。java
項目源碼git
咱們先來看一個最簡單調用github
MainActivity.java
Observable.create(new Observable<String>() {
@Override
public void subscribe(Observer<String> observer) {
observer.onNext("hello");
observer.onNext("world");
observer.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onNext(String s) {
Log.e("yxj",s);
}
@Override
public void onComplete() {
Log.e("yxj","onComplete");
}
});
複製代碼
Observable.java
public abstract class Observable<T> {
public abstract void subscribe(Observer<T> observer);
public static <T> Observable<T> create(Observable<T> observable){
return observable;
}
}
複製代碼
Observer.java
public interface Observer<T> {
void onNext(T t);
void onComplete();
}
複製代碼
Observable調用create方法建立一個本身,重寫subscribe方法說:若是 我有一個處理者Observer,我就把「hello」,「world」交給它處理。編程
Observable調用了subscribe方法,真的找到了Observer。因而兌現承諾,完成整個調用邏輯。bash
這裏是「若是」有處理者,須要subscribe方法被調用時,「若是」才成立。Rxjava就是創建在一系列的「若是」(回調)操做上的。微信
1.建立一個observable
2.調用空map操做符作變換
3.交給observer處理
MainActivity.java
Observable.create(new Observable<String>() {
@Override
public void subscribe(Observer<String> observer) {
observer.onNext("hello");
observer.onNext("world");
observer.onComplete();
}
})
.nullMap()
.subscribe(new Observer<String>() {
@Override
public void onNext(String s) {
Log.e("yxj",s);
}
@Override
public void onComplete() {
Log.e("yxj","onComplete");
}
});
複製代碼
nullMap()等價於 下面這段代碼
即把上個節點的數據不作任何修改的傳遞給下一節點的map操做
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s;
}
})
複製代碼
"nullMap"操做符在Rxjava源碼裏並不存在,是我方便你們理解Rxjava運行機制寫出來的。 由於nullMap操做是一個 base變換操做,map,flatMap,subscribeOn,observeOn操做符都是在nullMap上修改而來。因此Rxjava的變換的基礎就是nullMap操做符。app
Observable.java
// 這就是Rxjava的變換核心
public Observable<T> nullMap() {
return new Observable<T>() {
@Override
public void subscribe(final Observer<T> observerC) {
Observer<T> observerB = new Observer<T>() {
@Override
public void onNext(T t) {
observerC.onNext(t);
}
@Override
public void onComplete() {
observerC.onComplete();
}
};
Observable.this.subscribe(observerB);
}
};
}
複製代碼
「nullMap」操做符作了件什麼事情:異步
請注意2中的」若是「。意味着,當節點B中的subscribe方法沒有被調用的時候,2,3步驟都不會執行(他們都是回調),沒有Observer B,節點A也不會調用subscribe方法。 接下來分兩種狀況:ide
概況一下就是:源碼分析
Observable每調用一次操做符,其實就是建立一個新的Observable。新Observable內部經過subscribe方法「逆向的」與上一Observable關聯。在新Observable中的new出來的Observer內的onNext方法中作了和下一個Observer之間的關聯。
接下來讓咱們看看這4個操做符,僅僅是在nullMap中作了小改動而已。 操做符源碼
Observable.java
public <R> Observable<R> map(final Function<T, R> function) {
return new Observable<R>() {
@Override
public void subscribe(final Observer<R> observer1) {
Observable.this.subscribe(new Observer<T>() {
@Override
public void onNext(T t) {
R r = function.apply(t); // 僅僅在這裏加了變換操做
observer1.onNext(r);
}
@Override
public void onComplete() {
observer1.onComplete();
}
});
}
};
}
複製代碼
和「nullMap」相比,僅僅加了一行代碼function.apply() 方法的調用。
Observable.java
public Observable<T> observeOn() {
return new Observable<T>() {
@Override
public void subscribe(final Observer<T> observer) {
Observable.this.subscribe(new Observer<T>() {
@Override
public void onNext(final T t) {
//模擬切換到主線程(一般上個節點是運行在子線程的狀況)
handler.post(new Runnable() {
@Override
public void run() {
observer.onNext(t);
}
});
}
@Override
public void onComplete() {
}
});
}
};
}
複製代碼
與「nullMap」相比,修改了最內部的onNext方法執行所在的線程。Rxjava源碼會更加靈活,observerOn方法參數讓你能夠指定切換到的線程,其實就是傳入了一個線程調度器,用於指定observer.onNext()方法要在哪一個線程執行。原理是同樣的。我這裏就簡寫,直接寫了切換到主線程,這你確定能看明白。
Observable.java
public Observable<T> subscribeOn() {
return new Observable<T>() {
@Override
public void subscribe(final Observer<T> observer) {
new Thread() {
@Override
public void run() {
// 這裏簡寫了,沒有new Observer作中轉,github上有完整代碼
Observable.this.subscribe(observer);
}
}.start();
}
};
}
複製代碼
將上一個節點切換到新的線程,修改了Observable.this.subscribe()運行的線程,Observable.this指的是調用subscribeOn()的Observable,即上一個節點。所以subscribeOn操做符修改了上一個節點的運行所在的線程
public <R> Observable<R> flatMap(final Function<T, Observable<R>> function) {
return new Observable<R>() {
@Override
public void subscribe(final Observer<R> observer) {
Observable.this.subscribe(new Observer<T>() {
@Override
public void onNext(T t) {
try {
Observable<R> observable = function.apply(t);
observable.subscribe(observer);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onComplete() {
}
});
}
};
}
複製代碼
flatmap和map極爲類似,只不過function.apply()的返回值是一個Observable。
Observable是一個節點,既能夠用來封裝異步操做,也能夠用來封裝同步操做(封裝同步操做 == map操做符)。因此這樣就能夠很方便的寫出一個 耗時1操做 —> 耗時2操做 —> 耗時3操做...的操做
響應式編程是一種面向數據流和變化傳播的編程範式。
直接看這句話其實不太容易理解。讓咱們換個說法,實際編程中是什麼會干擾咱們,使咱們沒法專一於數據流和變化傳播呢?答案是:異步,它會讓咱們的代碼造成嵌套,不夠順序化。
由於異步,咱們的業務邏輯會寫成回調嵌套的形式,致使過一段時間看本身代碼看不懂,語義化不強,不是按着順序一個節點一個節點的往下執行的。
Rxjava將全部的業務操做變成一步一步,每一步無論你是同步、異步,通通用一個節點包裹起來,節點與節點之間是同步調用的關係。如此,整個代碼的節點都是按順序執行的。