Author: Dorae
Date: 2018年12月3日17:10:31 轉載請註明出處html
首先問本身幾個問題,若是很是清楚這幾個問題的目的與答案,那麼恭喜你,不用繼續往下看了-_-。java
如圖1-1所示react
參見這裏api
一款爲了簡化異步調用,且功能比較全面的框架。網絡
參見Java8中的sink鏈,在RxJava中一樣實現了鏈式處理。如代碼片斷code1-1所示,咱們對其結構進行分析:框架
code1-1異步
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Dorae");
}
})
.filter(e -> e.contains("o"))
.map(e -> "AfterMap: " + e)
.filter(e -> e.contains("D"))
.subscribe(new Observer<String>() {
@Override
public void onNext(@NonNull String o) {
System.out.println("觀察者 onNext: " + o);
}
@Override
public void onSubscribe(Disposable d) {
System.out.println("觀察者onSubscribe: " + d + "### " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
複製代碼
觀察者onSubscribe: io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver@52d455b8### mainide
觀察者 onNext: AfterMap: Dorae源碼分析
首先了解下RxJava中的幾種基本角色:spa
是否是感受上邊的一堆廢話很是枯燥?先上一張RxJava的核心結構,如圖3-1所示。
如今咱們再來看看code1-1,其最終造成的Observable鏈如圖3-2所示,每次調用map、filter等操做,都會生成一個新對象,而且保持了一個對上游的引用(用於生成Observer鏈)。
Observer鏈如圖3-3所示,整個事件流程由CreateEmitter觸發,最終交由咱們的實現Observer$1處理。
看了上邊幾張圖以後,是否是感受清晰了不少?那麼讓咱們進一步看下Rxjava如何完成了一鍵線程切換。
一般咱們使用RxJava的線程切換功能時,只須要在調用鏈中加上一句subscribeOn()或observeOn(),其中Scheduler如上所述,其實就是一個包裝了ThreadPool的調度器。那麼咱們先來看下相關源碼。
一、subscribeOn
如代碼code4-1所示,爲subscribeOn的核心代碼。很明顯,其中在新線程中只是簡單的直接調用了source,也就是說這裏以後的全部操做均在一個新線程中進行,和單線程並無什麼區別。
code 4-1
public final Observable<T> subscribeOn(Scheduler scheduler) {
return new ObservableSubscribeOn<T>() {
@Override
public void subscribeActual(final Observer<? super T> observer) {
scheduler.createWorker().schedule(new SubscribeTask() {
@Override
public void run() {
source.subscribe(e);
}
});
}
};
}
複製代碼
二、observeOn
如代碼段code4-2所示,爲observeOn的核心邏輯,能夠看出其在訂閱階段(生成Observer鏈的階段)仍是在當前線程執行,只有觸發以後,到了ObserverOn的Observer的節點時纔會真正的切換到新線程。
code 4-2
public final Observable<T> observeOn(Scheduler scheduler) {
return new ObservableOnSubscribe<T>() {
@Override
public void subscribeActual(@NonNull Observer<Object> e) {
source.subscribe(new Observer<T>() {
@Override
public void onNext(T var1) {
scheduler.createWorker().schedule(new Runnable() {
@Override
public void run() {
e.onNext(var1);
}
});
}
});
}
};
}
複製代碼
經過上述code4-一、code4-2的分析,是否是能夠推斷出當屢次subscribeOn時會發生什麼?沒錯,雖然每次subscribeOn都會產生一次線程切換,可是真正起做用的只有最開始的一次subscribeOn,也就至關於只在最初的位置調用了subscribeOn;對於observeOn也是相似,每次都會產生新線程,可是每次都會產生必定的影響,也就是每一個線程都承擔了一部分工做。
經過本文,咱們能夠簡要了解到RxJava的基本原理,可是對於其豐富的api還須要在實踐中進行磨合。可是,RxJava既然做爲一個異步框架,其必然有必定的侷限,好比其切換線程時沒法阻塞當前線程(這種對於Android等須要渲染或者網絡IO的需求來講很是適用),可是對於常見的服務端業務來講,還須要額外引入阻塞當前線程的操做(由於大部分的server代碼仍是單線程模型),假若徹底不用線程切換在服務端強行引入,可能會得不償失。我的更推薦Java8的CompletableFuture。