RxJava
核心功能是一個用來完成異步操做的庫,相對於其它異步操做的方法,RxJava
的API
使用更加的簡潔。而且RxJava
中還提供了不少功能強大的操做符,幫助咱們解決不少本來複雜繁瑣的代碼邏輯,提升了代碼質量。RxJava
的實現是基於觀察者模式,觀察者模式中如下有三個比較重要的概念:git
被觀察者是事件的發起者,被觀察者與觀察者創建訂閱關係後,被觀察者發送事件,觀察者才能接收到事件。github
RxJava
的基礎使用也很簡單,分爲三個步驟,分別是建立被觀察者,建立觀察者和創建訂閱關係,具體代碼以下。bash
// 1. 建立被觀察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe");
emitter.onNext("string1");
emitter.onNext("string2");
emitter.onNext("string3");
emitter.onComplete();
}
});
// 2. 建立觀察者
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(getClass().getName(), Thread.currentThread().getName() + " onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(getClass().getName(), Thread.currentThread().getName() + " onNext "+s);
}
@Override
public void onError(Throwable e) {
Log.d(getClass().getName(), Thread.currentThread().getName() + " onError");
}
@Override
public void onComplete() {
Log.d(getClass().getName(), Thread.currentThread().getName() + " onComplete");
}
};
Log.d(getClass().getName(), Thread.currentThread().getName() + " observable:"+observable.getClass().getName());
// 3. 創建訂閱關係
observable.subscribe(observer);
複製代碼
運行日誌: app
本文中全部源碼基於RxJava2
的2.2.11
版本。首先來看看這個基本的訂閱流程源碼是怎麼實現的。異步
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
複製代碼
使用RxJava
能夠經過Observable
的create
方法建立一個被觀察者對象。create
方法從參數中傳入一個ObservableOnSubscribe
類型的source
,而後方法中先校驗了source
是否爲空,接着將傳入的source
封裝成一個ObservableCreate
對象,而後調用了RxJavaPlugins.onAssembly
方法返回建立的好的Observable
。接着進入onAssembly
方法查看。async
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
複製代碼
onAssembly
方法中首先是一個Hook
實現,這裏能夠理解爲一個代理。能夠看到這裏先判斷onObservableAssembly
是否爲空,爲空就直接返回傳入的source
,不然再調用apply
方法。這裏能夠繼續跟蹤一下onObservableAssembly
。ide
@SuppressWarnings("rawtypes")
@Nullable
static volatile Function<? super Observable, ? extends Observable> onObservableAssembly;
/**
* Sets the specific hook function.
* @param onObservableAssembly the hook function to set, null allowed
*/
@SuppressWarnings("rawtypes")
public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
RxJavaPlugins.onObservableAssembly = onObservableAssembly;
}
複製代碼
它是RxJavaPlugins
中的成員變量,默認爲空,而且提供了一個set
方法來設置它。由於默認爲空,因此默認返回的就是傳入的source
。這裏的代理默認是不會對Observable
作什麼操做,若是須要有特殊的需求能夠調用set
方法實現本身的代理。而默認返回的source
類型爲ObservableCreate
對象也實現了Observable
接口。函數
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
......
}
複製代碼
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
複製代碼
觀察者Observer
是一個接口,其中提供了一些方法,使用時建立接口的實現,並根據需求在方法中作本身的實現。oop
創建訂閱關係調用了Observable
的subscribe
方法。ui
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
......
} catch (Throwable e) {
......
}
}
複製代碼
方法中仍是先判斷了傳入參數observer
是否爲空,接着仍是一個Hook
實現,這裏就不細究了,得到Hook
返回的observer
後再次判斷是否爲空,以後調用了subscribeActual
方法。
protected abstract void subscribeActual(Observer<? super T> observer);
複製代碼
Observable
的subscribeActual
方法是個抽象方法,以前看過這裏的Observable
實際實現是個ObservableCreate
對象,因此再進入ObservableCreate
類查看對應方法。
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
複製代碼
ObservableCreate
中的subscribeActual
方法中先建立了一個CreateEmitter
發射器對象,並將observer
對象傳入。接着調用了observer
的onSubscribe
方法,此時觀察者的onSubscribe
方法執行。最後調用了source
的subscribe
方法。
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe");
emitter.onNext("string1");
emitter.onNext("string2");
emitter.onNext("string3");
emitter.onComplete();
}
});
複製代碼
這個source
就是在create
方法中傳入的ObservableOnSubscribe
。它的subscribe
方法中經過調用ObservableEmitter
的方法發送事件,這裏的ObservableEmitter
就是以前建立的CreateEmitter
對象,因此再來進一步看看它其中的方法。
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
複製代碼
CreateEmitter
的構造函數接收了觀察者對象,而後在調用onNext
方法時先作了空判斷,再對isDisposed
進行取消訂閱的判斷,以後調用了observer
的onNext
方法,也就是觀察者的onNext
方法。一樣的onComplete
中最終也是調用了observer
的onComplete
方法。至此RxJava
中的基本訂閱流程的源碼就梳理完了。
RxJava
中有個很重要的功能,就是能方便的切換線程,來看下它的使用,仍是以前基礎使用中的例子進行修改。
Observable<String> observable0 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe");
emitter.onNext("string1");
emitter.onNext("string2");
emitter.onNext("string3");
emitter.onComplete();
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(getClass().getName(), Thread.currentThread().getName() + " onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(getClass().getName(), Thread.currentThread().getName() + " onNext "+s);
}
@Override
public void onError(Throwable e) {
Log.d(getClass().getName(), Thread.currentThread().getName() + " onError");
}
@Override
public void onComplete() {
Log.d(getClass().getName(), Thread.currentThread().getName() + " onComplete");
}
};
Observable<String> observable1 = observable0.subscribeOn(Schedulers.newThread());
Log.d(getClass().getName(), Thread.currentThread().getName() + " observable1:"+observable1.getClass().getName());
Observable<String> observable2 = observable1.observeOn(AndroidSchedulers.mainThread());
Log.d(getClass().getName(), Thread.currentThread().getName() + " observable2:"+observable2.getClass().getName());
observable2.subscribe(observer);
複製代碼
被觀察者和觀察者的建立和以前同樣,在創建訂閱關係時調用subscribeOn
和observeOn
方法進行線程的切換。這裏每一個方法返回的都是Observable
類型,因此能夠採用鏈式調用,這也是RxJava
的一個特色,可是這裏沒有采用這種寫法,而是將其拆分開來寫而且日誌打印出每一個Observable
的具體類型,這是爲了方便以後源碼理解。 運行結果日誌:
Observable<String> observable1 = observable0.subscribeOn(Schedulers.newThread());
Log.d(getClass().getName(), Thread.currentThread().getName() + " observable1:"+observable1.getClass().getName());
observable1.subscribe(observer);
複製代碼
運行結果:
subscribeOn
方法運行查看結果,發現不只被觀察者發射事件運行在了子線程,觀察者接收事件也運行在子線程,那麼進入
subscribeOn
方法查看它的實現。
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
複製代碼
能夠看到subscribeOn
方法和subscribe
方法有些相似。首先是判斷傳入的scheduler
是否爲空,而後一樣調用RxJavaPlugins.onAssembly
方法,此次構建了一個ObservableSubscribeOn
對象返回。而subscribeOn
方法以後仍是調用了subscribe
方法,根據以前的分析,subscribe
方法最終會調用到subscribeActual
方法,不過此時的subscribeActual
方法再也不是ObservableCreate
中的而是ObservableSubscribeOn
中的subscribeActual
方法。
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
複製代碼
ObservableSubscribeOn
的subscribeActual
方法中流程和以前的也很相似,此次是先建立了一個SubscribeOnObserver
對象,將觀察者對象傳入,接着一樣先調用了observer.onSubscribe
方法,而後將傳入的SubscribeOnObserver
封裝入了一個SubscribeTask
對象中,接着調用了scheduler.scheduleDirect
方法再將返回結果獲得的Disposable
設置到SubscribeOnObserver
中。下面一個方法一個方法看。首先是建立SubscribeTask
。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
複製代碼
SubscribeTask
是ObservableSubscribeOn
的內部類,其實現很簡單就是實現了一個Runnable
接口,構造方法中傳入了SubscribeOnObserver
對象,在其run
方法中調用了ObservableSubscribeOn
中的成員變量source
的subscribe
方法。這個source
是在建立ObservableSubscribeOn
時傳入的,根據前面的代碼能夠找到是在subscribeOn
方法中建立的對象而且這個source
對應傳入的是當前這個Observable
對象即經過Observable.create
得到的被觀察者對象,其實現以前看過是一個ObservableCreate
因此這裏就和以前同樣又會走到了其父類Observable
的subscribe
方法中,繼而調用ObservableCreate
的subscribeActual
方法,以後最終會調用到觀察者的對應onNext
等方法,不過此時的觀察者不直接是在使用時建立傳入的Observer
,而是以前看到的SubscribeOnObserver
類型,不過其中的onNext
等方法仍是調用了在使用時建立傳入的Observer
的對應方法。
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
}
@Override
public void onNext(T t) {
downstream.onNext(t);
}
@Override
public void onError(Throwable t) {
downstream.onError(t);
}
@Override
public void onComplete() {
downstream.onComplete();
}
......
}
複製代碼
下面接着看到scheduleDirect
這個方法,在建立好SubscribeTask
以後調用了scheduleDirect
方法。這裏的scheduler
就是subscribeOn
中傳入的,對應開始例子中的Schedulers.newThread
。
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
// 靜態成員變量NEW_THREAD
static final Scheduler NEW_THREAD;
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
複製代碼
進入Schedulers.newThread
一步步跟蹤,看到newThread
方法返回靜態成員變量中的NEW_THREAD
,而NEW_THREAD
又是經過NewThreadTask
建立。
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}
static final Scheduler DEFAULT = new NewThreadScheduler();
複製代碼
繼續跟蹤查看發現NewThreadTask
實際是實現了Callable
接口,其call
方法中返回了靜態內部類中的NewThreadHolder.DEFAULT
。這個DEFAULT
的實現類型爲NewThreadScheduler
。至此終於找到了咱們傳入的Scheduler
的真正實現類。因而繼續看其scheduleDirect
方法。
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
複製代碼
scheduleDirect
方法是在其父類中實現的,看到其中進而調用了同名重載方法,方法中首先是調用createWorker
方法建立一個Worker
。這個方法的實現就是在NewThreadScheduler
中了。
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
複製代碼
createWorker
方法中只作了一件事就是建立返回了一個NewThreadWorker
。
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
......
}
複製代碼
NewThreadWorker
中看到建立了一個線程池,再回到scheduleDirect
方法,建立完Worker
後將傳入的Runnable
即SubscribeTask
進行一個裝飾獲得新的Runnable
對象。接着將Worker
和新的Runnable
封裝到一個DisposeTask
對象中。
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
@NonNull
final Runnable decoratedRun;
@NonNull
final Worker w;
@Nullable
Thread runner;
DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
......
}
複製代碼
DisposeTask
一樣實現了Runnable
接口,在run
方法中調用了從構造傳入的decoratedRun
的run
方法執行任務。回到最後一步,調用Worker
的schedule
方法,這裏就對應的NewThreadWorker
的schedule
方法。
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}
複製代碼
schedule
方法中又進一步調用了其scheduleActual
方法。
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
複製代碼
scheduleActual
方法裏看到又將decoratedRun
和DisposableContainer
封裝成ScheduledRunnable
最後將這個ScheduledRunnable
交給構造函數中建立的線程池去運行,最終就會執行到前面看過的SubscribeTask
中的run
方法完成訂閱邏輯,調用觀察者的onNext
等方法。到這裏就看出最終的source.subscribe
是會經過線程池切換到子線程中去執行了。
經過查看subscribeOn
方法源碼能夠發現,方法裏其實是在前一個建立的ObservableCreate
外面包了一層,把它包成一個ObservableSubscribeOn
對象,一樣的原先的Observer
也被包了一層包成一個SubscribeOnObserver
對象,而線程切換的工做是由Scheduler
完成的。
接着再來看看切換回主線程的方法observeOn
,仍是先修改使用代碼,查看運行日誌。
Observable<String> observable2 = observable0.observeOn(AndroidSchedulers.mainThread());
Log.d(getClass().getName(), Thread.currentThread().getName() + " observable2:"+observable2.getClass().getName());
observable2.subscribe(observer);
複製代碼
運行日誌:
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
複製代碼
這裏看到observeOn
方法裏調用了重載方法,方法中仍是同一個套路,不過這裏建立的又是另外一個對象ObservableObserveOn
了。根據前面的經驗這裏就又是將前一個Observable
傳遞到ObservableObserveOn
中的成員變量source
上,這裏看到就是構造函數中的第一個參數。接着仍是會調用subscribe
與觀察者創建訂閱關係進而會執行到ObservableObserveOn
對象的subscribeActual
方法。
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
複製代碼
subscribeActual
方法中判斷了scheduler
的類型,這裏的scheduler
就是由AndroidSchedulers.mainThread()
傳入的,因而先來看一下這個方法。
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
複製代碼
從mainThread
開始看,發現代碼調用邏輯和以前的Schedulers.newThread
方法相似,最終會返回一個HandlerScheduler
而這個Scheduler
中的Handler
則是主線程的Handler
,看到這裏就能猜測到了,後面觀察者的對應方法必定是由這個Handler
來切換到主線程執行的。回到subscribeActual
方法。
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
複製代碼
這裏判斷完類型會走else
中的方法首先仍是會調用HandlerScheduler
的createWorker
方法建立一個Worker
。
@Override
public Worker createWorker() {
return new HandlerWorker(handler, async);
}
複製代碼
這裏是個HandlerWorker
其中具體方法後面再看。接着上面建立完Worker
後一樣仍是同樣調用source.subscribe
建立了一個ObserveOnObserver
對象傳入。這裏的source
就仍是以前的ObservableCreate
,因此這裏仍是會調用ObservableCreate
中的subscribeActual
方法。
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
複製代碼
ObservableCreate
中的subscribeActual
方法中的邏輯以前看過,不過此時傳入的observer
仍然再也不是在使用時建立的觀察者對象了,而是傳過來的ObserveOnObserver
對象,此時建立的CreateEmitter
中的observer
也就是這個ObserveOnObserver
對象。和以前邏輯同樣,接着就會調用observer
的onNext
等方法,此時調用的便是ObserveOnObserver
中的onNext
等方法。因此進入ObserveOnObserver
查看。
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
複製代碼
查看ObserveOnObserver
中的代碼會發現onNext
方法中先將傳入的參數放入了一個隊列,而後不管是onNext
仍是onComplete
方法最後都調用了schedule
方法,進而再進入查看,發現schedule
方法中又調用了worker.schedule
方法。這裏的worker
就是以前建立的HandlerWorker
,這時再來看它的schedule
方法。
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
複製代碼
單個參數schedule
方法是在其父類中的,而這個方法中又調用另外一個三個參數的schedule
方法,這個方法父類中是抽象方法因此實現就在子類HandlerWorker
裏了。
@Override
@SuppressLint("NewApi") // Async will only be true when the API is available to call.
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
// 判斷是否取消訂閱
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
// 建立ScheduledRunnable
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
// 建立消息,並將主線程Handler和ScheduledRunnable
Message message = Message.obtain(handler, scheduled);
message.obj = this;
// 判斷設置異步消息
if (async) {
message.setAsynchronous(true);
}
// 發送消息執行callback
handler.sendMessageDelayed(message, unit.toMillis(delay));
// 檢查是否取消訂閱
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
複製代碼
在子類的這個方法裏在作了取消訂閱的判斷後將方法傳入的Runnable
和Handler
又封裝到一個ScheduledRunnable
對象中。接着建立了一個Message
並將ScheduledRunnable
放入Message
,最後調用handler.sendMessageDelayed
方法經過這個主線程的Handler
執行這個ScheduledRunnable
。
最後來追溯下ScheduledRunnable
到底執行了什麼,不過猜也知道最後必定調用到觀察者中的對應方法。
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed; // Tracked solely for isDisposed().
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
RxJavaPlugins.onError(t);
}
}
......
}
複製代碼
ScheduledRunnable
中的run
方法很簡單就是調用了構造中傳入的Runnable
的run
方法。而根據以前看過得建立ScheduledRunnable
時傳入的Runnable
又是從scheduleDirect
方法中傳入的,而scheduleDirect
方法中的Runnable
又是從worker.schedule(this)
方法時傳入的,根據上下文代碼發現這個this
指代的是ObserveOnObserver
對象,因而進一步進入它的run
方法查看。
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
......
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
......
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
......
}
複製代碼
能夠看到run
方法中判斷了outputFused
的真假,而後分別調用了drainFused
和drainNormal
方法。這裏的outputFused
是與RxJava2
中的背壓處理相關暫時先無論,根據方法名也能知道正常調用會執行drainNormal
方法,因而直接來看drainNormal
方法。
void drainNormal() {
int missed = 1;
// 存放onNext傳入的事件對象隊列
final SimpleQueue<T> q = queue;
// 傳入的觀察者對象
final Observer<? super T> a = downstream;
// 循環check事件是否完成或者發生錯誤
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
// 從隊列中取出發送事件傳入的對象
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
// 再次判斷是否完成或者發生錯誤
if (checkTerminated(d, empty, a)) {
return;
}
// 判斷隊列中取出的發送事件傳入的對象v是否爲空
if (empty) {
break;
}
// 執行觀察者對象的onNext方法
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
複製代碼
drainNormal
方法中先經過checkTerminated
方法校驗發送事件是否完成或者發生異常,接着從隊列中取出事件對象,再次判斷是否完成或者發生錯誤和取出的對象是否爲空,沒有問題的話就會執行觀察者的onNext
方法。而發送完成和出現異常的方法則是在checkTerminated
方法處理。
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (disposed) {
queue.clear();
return true;
}
if (d) {
Throwable e = error;
if (delayError) {
if (empty) {
disposed = true;
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
if (e != null) {
disposed = true;
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
disposed = true;
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
複製代碼
在checkTerminated
方法里根據delayError
判斷是否設置了超時的錯誤,接着再根據得到的錯誤e
是否爲空再決定調用的是觀察者的onError()
方法仍是onComplete
方法。至此observeOn
切換線程的流程也梳理結束。
RxJava
中有不少功能強大的操做符,經過使用這些操做符,能夠很容易的解決代碼編寫時遇到的一些複雜繁瑣的問題。這裏就用map
操做符來做爲一個例子,來看看操做符是怎樣工做的。首先仍是來了解map
操做符的使用方法和做用。
final Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.d(getClass().getName(), Thread.currentThread().getName() + " ObservableOnSubscribe subscribe");
emitter.onNext("5");
emitter.onComplete();
}
});
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(getClass().getName(), Thread.currentThread().getName() + " onSubscribe");
}
@Override
public void onNext(Integer i) {
Log.d(getClass().getName(), Thread.currentThread().getName() + " onNext "+i);
}
@Override
public void onError(Throwable e) {
Log.d(getClass().getName(), Thread.currentThread().getName() + " onError");
}
@Override
public void onComplete() {
Log.d(getClass().getName(), Thread.currentThread().getName() + " onComplete");
}
};
Observable<Integer> mapObservable = observable.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
});
Log.d(getClass().getName(), Thread.currentThread().getName() + " mapObservable:"+mapObservable.getClass().getName());
mapObservable.subscribe(observer);
複製代碼
運行日誌:
map
操做符做用是能夠將被觀察者發送事件的數據類型轉換成其餘的數據類型。它的使用方法很簡單,例如上面這個例子就將一開始發送的String
類型轉換成觀察者接收到的Integer
類型。下面開始看map
方法的源碼。
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
複製代碼
看到map
方法中依舊仍是一樣的套路,經過RxJavaPlugins.onAssembly
方法返回一個被觀察者對象,只不過此次構建傳入的類型又是另外一個ObservableMap
類型的對象。訂閱的流程前面已經看過了,這裏和以前的同樣最終會走到ObservableMap
的subscribeActual
方法,因此直接來看這個方法。
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
複製代碼
ObservableMap
的subscribeActual
方法裏看到很熟悉仍是會調用source.subscribe
方法,只是這裏傳入的Observer
對象是一個MapObserver
對象。接下來的邏輯又和以前同樣,根據以前的經驗source.subscribe
方法最終會調用Observer
的onNext
方法,因此接下來直接來看MapObserver
的onNext
方法。
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
複製代碼
MapObserver
的onNext
方法裏的邏輯很簡單,在作了一些的判斷後調用mapper.apply(t)
方法得到類型轉換後的事件傳遞對象,最後就會調用觀察者的downstream.onNext
方法,這裏的downstream
就是訂閱方法傳入的觀察者對象。跟蹤mapper
能夠找到,它是從MapObserver
構造時傳入的一個Function
類型,便是在使用map
操做符時傳入的那個Function
對象,又由於在使用時實現了Function
的apply
方法完成了數據的類型轉換邏輯,因此這裏調用mapper.apply(t)
方法就能夠得到到轉換後的數據。
以上就是關於RxJava
源碼工做流程的相關總結,總而言之,觀察者模式仍是其核心設計思想。除此以外,經過源碼閱讀還發現,不管在線程切換方面仍是其它功能的操做符的實現,根本上來講都是在其原有的被觀察者或觀察者基礎上包裝成一個新的對象,功能邏輯由新對象中的方法來實現完成。