轉載請標明出處:
juejin.im/post/58ce8c…
本文出自:【張旭童的稀土掘金】(gold.xitu.io/user/56de21…)javascript
承接上一篇RxJava2 源碼解析(一),
本系列咱們的目的:java
Observable
)是如何將數據發送出去的。Observer
)是如何接收到數據的。本篇計劃講解一下4,5.app
RxJava最強大的莫過於它的線程調度 和 花式操做符。異步
map是一個高頻的操做符,咱們首先拿他開刀。
例子以下,源頭Observable
發送的是String類型的數字,利用map轉換成int型,最終在終點Observer
接受到的也是int類型數據。:ide
final Observable<String> testCreateObservable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onComplete()
}
});複製代碼
Observable<Integer> map = testCreateObservable.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
});
map.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "onNext() called with: value = [" + value + "]");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError() called with: e = [" + e + "]");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete() called");
}
});複製代碼
咱們看一下map
函數的源碼:函數
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
//判空略過
ObjectHelper.requireNonNull(mapper, "mapper is null");
//RxJavaPlugins.onAssembly()是hook 上文提到過
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}複製代碼
RxJavaPlugins.onAssembly()
是hook 上文提到過,因此咱們只要看ObservableMap
,它就是返回到咱們手裏的Observable
:post
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
//將function變換函數類保存起來
final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
//super()將上游的Observable保存起來 ,用於subscribeActual()中用。
super(source);
this.function = function; } @Override public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function)); }複製代碼
它繼承自AbstractObservableWithUpstream
,該類繼承自Observable
,很簡單,就是將上游的ObservableSource
保存起來,作一次wrapper,因此它也算是裝飾者模式的提現,以下:ui
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
//將上游的`ObservableSource`保存起來
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}複製代碼
關於ObservableSource
,表明了一個標準的無背壓的 源數據接口,能夠被Observer
消費(訂閱),以下:this
public interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);
}複製代碼
全部的Observable
都已經實現了它,因此咱們能夠認爲Observable
和ObservableSource
在本文中是相等的:spa
public abstract class Observable<T> implements ObservableSource<T> {複製代碼
因此咱們獲得的ObservableMap
對象也很簡單,就是將上游的Observable
和變換函數類Function
保存起來。Function
的定義超級簡單,就是一個接口,給我一個T,還你一個R.
public interface Function<T, R> {
R apply(T t) throws Exception;
}複製代碼
本例寫的是將String->int.
重頭戲,subscribeActual()
是訂閱真正發生的地方,ObservableMap
以下編寫,就一句話,用MapObserver訂閱上游Observable。:
@Override
public void subscribeActual(Observer<? super U> t) {
//用MapObserver訂閱上游Observable。
source.subscribe(new MapObserver<T, U>(t, function)); }複製代碼
MapObserver
也是裝飾者模式,對終點(下游)Observer
修飾。
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
//super()將actual保存起來
super(actual);
//保存Function變量
this.mapper = mapper;
}
@Override
public void onNext(T t) {
//done在onError 和 onComplete之後纔會是true,默認這裏是false,因此跳過
if (done) {
return;
}
//默認sourceMode是0,因此跳過
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
//下游Observer接受的值
U v;
//這一步執行變換,將上游傳過來的T,利用Function轉換成下游須要的U。
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//變換後傳遞給下游Observer
actual.onNext(v);
}複製代碼
到此咱們梳理一下流程:
訂閱的過程,是從下游到上游依次訂閱的。
Observer
訂閱了 map
返回的ObservableMap
。map
的Observable
(ObservableMap
)在被訂閱時,會訂閱其內部保存上游Observable
,用於訂閱上游的Observer
是一個裝飾者(MapObserver
),內部保存了下游(本例是終點)Observer
,以便上游發送數據過來時,能傳遞給下游。Observable
被訂閱,根據上節課內容,它開始向Observer發送數據。數據傳遞的過程,固然是從上游push到下游的,
Observable
傳遞數據給下游Observer
(本例就是MapObserver
)MapObserver
接收到數據,對其變換操做後(實際的function在這一步執行),再調用內部保存的下游Observer
的onNext()
發送數據給下游Observer
。簡化問題,代碼以下:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d(TAG, "subscribe() called with: e = [" + e + "]" + Thread.currentThread());
e.onNext("1");
e.onComplete();
}
//只是在Observable和Observer之間增長了一句線程調度代碼
}).subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext() called with: value = [" + value + "]");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError() called with: e = [" + e + "]");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete() called");
}
});複製代碼
只是在Observable
和Observer
之間增長了一句線程調度代碼:.subscribeOn(Schedulers.io())
.
查看subscribeOn()
源碼:
public final Observable<T> subscribeOn(Scheduler scheduler) {
//判空略過
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//拋開Hook,重點仍是ObservableSubscribeOn
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}複製代碼
等等,怎麼有種似曾相識的感受,你們能夠把文章向上翻,看看map()
的源碼。
和subscribeOn()
的套路一模一樣,那麼咱們根據上面的結論,
先猜想ObservableSubscribeOn
類也是一個包裝類(裝飾者),點進去查看:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
//保存線程調度器
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
//map的源碼中咱們分析過,super()只是簡單的保存ObservableSource
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
//1 建立一個包裝Observer
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//2 手動調用 下游(終點)Observer.onSubscribe()方法,因此onSubscribe()方法執行在 訂閱處所在的線程
s.onSubscribe(parent);
//3 setDisposable()是爲了將子線程的操做加入Disposable管理中
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
//4 此時已經運行在相應的Scheduler 的線程中
source.subscribe(parent);
}
}));
}複製代碼
和map套路大致一致,ObservableSubscribeOn
自身一樣是個包裝類,同樣繼承AbstractObservableWithUpstream
。
建立了一個SubscribeOnObserver
類,該類按照套路,應該也是實現了Observer
、Disposable
接口的包裝類,讓咱們看一下:
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
//真正的下游(終點)觀察者
final Observer<? super T> actual;
//用於保存上游的Disposable,以便在自身dispose時,連同上游一塊兒dispose
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
//onSubscribe()方法由上游調用,傳入Disposable。在本類中賦值給this.s,加入管理。
DisposableHelper.setOnce(this.s, s);
}
//直接調用下游觀察者的對應方法
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
//取消訂閱時,連同上游Disposable一塊兒取消
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
//這個方法在subscribeActual()中被手動調用,爲了將Schedulers返回的Worker加入管理
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}複製代碼
這兩個類根據上一節的鋪墊加上註釋,其餘都好理解,稍微很差理解的應該是下面兩句代碼:
//ObservableSubscribeOn類
//3 setDisposable()是爲了將子線程的操做加入Disposable管理中
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
//4 此時已經運行在相應的Scheduler 的線程中
source.subscribe(parent);
}
}));
//SubscribeOnObserver類
//這個方法在subscribeActual()中被手動調用,爲了將Schedulers返回的Worker加入管理
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}複製代碼
其中scheduler.scheduleDirect(new Runnable()..)
方法源碼以下:
/** * Schedules the given task on this scheduler non-delayed execution. * ..... */
public Disposable scheduleDirect(Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}複製代碼
從註釋和方法名咱們能夠看出,這個傳入的Runnable
會馬上執行。
再繼續往裏面看:
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
//class Worker implements Disposable ,Worker自己是實現了Disposable
final Worker w = createWorker();
//hook略過
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//開始在Worker的線程執行任務,
w.schedule(new Runnable() {
@Override
public void run() {
try {
//調用的是 run()不是 start()方法執行的線程的方法。
decoratedRun.run();
} finally {
//執行完畢會 dispose()
w.dispose();
}
}
}, delay, unit);
//返回Worker對象
return w;
}複製代碼
createWorker()
是一個抽象方法,由具體的Scheduler
類實現,例如IoScheduler
對應的Schedulers.io()
.
public abstract Worker createWorker();複製代碼
初看源碼,爲了瞭解大體流程,不宜過入深刻,先點到爲止。
OK,如今咱們總結一下scheduler.scheduleDirect(new Runnable()..)
的重點:
Runnable
是馬上執行的。Worker
對象就是一個Disposable
對象,Runnable
執行時,是直接手動調用的 run()
,而不是 start()
方法.run()
結束後(包括異常終止),都會自動執行Worker
.dispose()
.而返回的Worker
對象也會被parent.setDisposable(...)
加入管理中,以便在手動dispose()
時能取消線程裏的工做。
咱們總結一下subscribeOn(Schedulers.xxx())
的過程:
ObservableSubscribeOn
包裝類對象subscribeActual()
方法,在其中會馬上將線程切換到對應的Schedulers.xxx()
線程。source.subscribe(parent);
,對上游(終點)Observable
訂閱Observable
開始發送數據,根據RxJava2 源碼解析(一),上游發送數據僅僅是調用下游觀察者對應的onXXX()
方法而已,因此此時操做是在切換後的線程中進行。一點擴展,
你們可能看過一個結論:subscribeOn(Schedulers.xxx())
切換線程N次,老是以第一次爲準,或者說離源Observable最近的那次爲準,而且對其上面的代碼生效(這一點對比的ObserveOn()
)。
爲何?
subscribeActual()
裏開啓了Scheduler的工做,source.subscribe(parent);
,從這一句開始切換了線程,因此在這之上的代碼都是在切換後的線程裏的了。subscribeOn(xxxx)
指定的線程,subscribeOn(xxxx)
的線程裏push數據(onXXX()
)給下游。可寫以下代碼驗證:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.d(TAG, "subscribe() called with: e = [" + e + "]" + Thread.currentThread());
e.onNext("1");
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
//依然是io線程
Log.d(TAG, "apply() called with: s = [" + s + "]" + Thread.currentThread());
return s;
}
})
.subscribeOn(Schedulers.computation())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");
}
@Override
public void onNext(String value) {
Log.d(TAG, "onNext() called with: value = [" + value + "]");
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError() called with: e = [" + e + "]");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete() called");
}
});複製代碼
在上一節的基礎上,增長一個observeOn(AndroidSchedulers.mainThread())
,就完成了觀察者線程的切換。
.subscribeOn(Schedulers.computation())
//在上一節的基礎上,增長一個ObserveOn
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {複製代碼
繼續看源碼吧,我已經能猜出來了,hook+new XXXObservable();
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
....
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}複製代碼
果真,查看ObservableObserveOn
,:
高能預警,這部分的代碼 有些略多,建議讀者打開源碼邊看邊讀。
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
//本例是 AndroidSchedulers.mainThread()
final Scheduler scheduler;
//默認false
final boolean delayError;
//默認128
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
// false
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//1 建立出一個 主線程的Worker
Scheduler.Worker w = scheduler.createWorker();
//2 訂閱上游數據源,
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}複製代碼
本例中,就是兩步:
AndroidSchedulers.mainThread()
對應的Worker
ObserveOnObserver
訂閱上游數據源。這樣當數據從上游push下來,會由ObserveOnObserver
對應的onXXX()
處理。static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
//下游的觀察者
final Observer<? super T> actual;
//對應Scheduler裏的Worker
final Scheduler.Worker worker;
//上游被觀察者 push 過來的數據都存在這裏
SimpleQueue<T> queue;
Disposable s;
//若是onError了,保存對應的異常
Throwable error;
//是否完成
volatile boolean done;
//是否取消
volatile boolean cancelled;
// 表明同步發送 異步發送
int sourceMode;
....
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
//省略大量無關代碼
//建立一個queue 用於保存上游 onNext() push的數據
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//回調下游觀察者onSubscribe方法
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
//1 執行過error / complete 會是true
if (done) {
return;
}
//2 若是數據源類型不是異步的, 默認不是
if (sourceMode != QueueDisposable.ASYNC) {
//3 將上游push過來的數據 加入 queue裏
queue.offer(t);
}
//4 開始進入對應Workder線程,在線程裏 將queue裏的t 取出 發送給下游Observer
schedule();
}
@Override
public void onError(Throwable t) {
//已經done 會 拋異常 和 上一篇文章裏提到的同樣
if (done) {
RxJavaPlugins.onError(t);
return;
}
//給error存個值
error = t;
done = true;
//開始調度
schedule();
}
@Override
public void onComplete() {
//已經done 會 返回 不會crash 和上一篇文章裏提到的同樣
if (done) {
return;
}
done = true;
//開始調度
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
//該方法須要傳入一個線程, 注意看本類實現了Runnable的接口,因此查看對應的run()方法
worker.schedule(this);
}
}
//從這裏開始,這個方法已是在Workder對應的線程裏執行的了
@Override
public void run() {
//默認是false
if (outputFused) {
drainFused();
} else {
//取出queue裏的數據 發送
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
// 1 若是已經 終止 或者queue空,則跳出函數,
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
//2 從queue裏取出一個值
v = q.poll();
} catch (Throwable ex) {
//3 異常處理 並跳出函數
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
return;
}
boolean empty = v == null;
//4 再次檢查 是否 終止 若是知足條件 跳出函數
if (checkTerminated(d, empty, a)) {
return;
}
//5 上游還沒結束數據發送,可是這邊處理的隊列已是空的,不會push給下游 Observer
if (empty) {
//僅僅是結束此次循環,不發送這個數據而已,並不會跳出函數
break;
}
//6 發送給下游了
a.onNext(v);
}
//7 對不起這裏我也不是很明白,大體猜想是用於 同步原子操做 若有人知道 煩請告知
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
//檢查 是否 已經 結束(error complete), 是否沒數據要發送了(empty 空),
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
//若是已經disposed
if (cancelled) {
queue.clear();
return true;
}
// 若是已經結束
if (d) {
Throwable e = error;
//若是是延遲發送錯誤
if (delayError) {
//若是空
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
//中止worker(線程)
worker.dispose();
return true;
}
} else {
//發送錯誤
if (e != null) {
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
//發送complete
if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
}複製代碼
核心處都加了註釋,總結起來就是,
ObserveOnObserver
實現了Observer
和Runnable
接口。onNext()
裏,先不切換線程,將數據加入隊列queue
。而後開始切換線程,在另外一線程中,從queue
裏取出數據,push
給下游Observer
onError()
onComplete()
除了和RxJava2 源碼解析(一)提到的同樣特性以外,也是將錯誤/完成信息先保存,切換線程後再發送。observeOn()
影響的是其下游的代碼,且屢次調用仍然生效。 Observer
裏onXXX()
作的,這是一個主動的push行爲(影響下游)。subscribeOn()
切換線程是在subscribeActual()
裏作的,只是主動切換了上游的訂閱線程,從而影響其發射數據時所在的線程。而直到真正發射數據以前,任何改變線程的行爲,都會生效(影響發射數據的線程)。因此subscribeOn()
只生效一次。observeOn()
是一個主動的行爲,而且切換線程後會馬上發送數據,因此會生效屢次.轉載請標明出處:
juejin.im/post/58ce8c…
本文出自:【張旭童的稀土掘金】(gold.xitu.io/user/56de21…)
本文帶你們走讀分析了三個東西:
map操做符原理:
Observable
進行訂閱Observer
.Observable
和其內部訂閱者、是裝飾者模式的體現。線程調度subscribeOn()
:
Observable
進行訂閱,這樣上游發送數據時就是處於被切換後的線程裏了。Observer
.Observable
中。線程調度observeOn()
:
Observer
對上游Observable
進行訂閱Observer
中onXXX()
方法裏,將待發送數據存入隊列,同時請求切換線程處理真正push數據給下游。源碼裏那些實現了Runnable
的類或者匿名內部類,最終並無像往常那樣被丟給Thread
類執行。
而是先切換線程,再直接執行Runnable
的run()
方法。
這也加深了我對面向對象,對抽象、Runnable
的理解,它就是一個簡簡單單的接口,裏面就一個簡簡單單的run()
,
我認爲,之因此有Runnable
,只是抽象出 一個可運行的任務的概念。也許這句話很平淡,書上也會提到,各位大佬早就知道,可是現在我順着RxJava2的源碼這麼走讀了一遍,確真真切切的感覺到了這些設計思想的美妙。