到目前爲止筆者分析了Android中最熱門的網絡底層和封裝框架:Android主流三方庫源碼分析(1、深刻理解OKHttp源碼)和Android主流三方庫源碼分析(2、深刻理解Retrofit源碼),Android中使用最普遍的圖片加載框架Glide的加載流程:Android主流三方庫源碼分析(3、深刻理解Glide源碼)以及Android中性能最好的數據庫框架Android主流三方庫源碼分析(4、深刻理解GreenDao源碼)。本篇,我將會對近幾年比較熱門的函數式編程框架RxJava的源碼進行詳細的分析。git
RxJava是基於Java虛擬機上的響應式擴展庫,它經過使用可觀察的序列將異步和基於事件的程序組合起來。 與此同時,它擴展了觀察者模式來支持數據/事件序列,而且添加了操做符,這些操做符容許你聲明性地組合序列,同時抽象出要關注的問題:好比低級線程、同步、線程安全和併發數據結構等。github
從RxJava的官方定義來看,咱們若是要想真正地理解RxJava,就必須對它如下兩個部分進行深刻的分析:數據庫
固然,RxJava操做符的源碼也是很不錯的學習資源,特別是FlatMap、Zip等操做符的源碼,有不少能夠借鑑的地方,可是它們內部的實現比較複雜,限於篇幅,本文只講解RxJava的訂閱流程和線程切換原理。接下來,筆者一一對以上RxJava的兩個關鍵部分來進行詳細地講解。編程
首先給出RxJava消息訂閱的例子:json
Observable.create(newObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String>emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
複製代碼
能夠看到,這裏首先建立了一個被觀察者,而後建立一個觀察者訂閱了這個被觀察者,所以下面分兩個部分對RxJava的訂閱流程進行分析:緩存
首先,上面使用了Observable類的create()方法建立了一個被觀察者,看看裏面作了什麼。安全
// 省略一些檢測性的註解
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
複製代碼
在Observable的create()裏面其實是建立了一個新的ObservableCreate對象,同時,把咱們定義好的ObservableOnSubscribe對象傳入了ObservableCreate對象中,最後調用了RxJavaPlugins.onAssembly()方法。接下來看看這個ObservableCreate是幹什麼的。微信
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
...
}
複製代碼
這裏僅僅是把ObservableOnSubscribe這個對象保存在ObservableCreate中了。而後看看RxJavaPlugins.onAssembly()這個方法的處理。網絡
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
// 應用hook函數的一些處理,通常用到不到
...
return source;
}
複製代碼
最終僅僅是把咱們的ObservableCreate給返回了。數據結構
從以上分析可知,Observable.create()方法僅僅是先將咱們自定義的ObservableOnSubscribe對象從新包裝成了一個ObservableCreate對象。
接着,看看Observable.subscribe()的訂閱過程是如何實現的。
public final void subscribe(Observer<? super T> observer) {
...
// 1
observer = RxJavaPlugins.onSubscribe(this,observer);
...
// 2
subscribeActual(observer);
...
}
複製代碼
在註釋1處,在Observable的subscribe()方法內部首先調用了RxJavaPlugins的onSubscribe()方法。
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
// 應用hook函數的一些處理,通常用到不到
...
return observer;
}
複製代碼
除去hook應用的邏輯,這裏僅僅是將observer返回了。接着來分析下注釋2處的subscribeActual()方法,
protected abstract void subscribeActual(Observer<? super T> observer);
複製代碼
這是一個抽象的方法,很明顯,它對應的具體實現類就是咱們在第一步建立的ObservableCreate類,接下來看到ObservableCreate的subscribeActual()方法。
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 1
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 2
observer.onSubscribe(parent);
try {
// 3
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
複製代碼
在註釋1處,首先新建立了一個CreateEmitter對象,同時傳入了咱們自定義的observer對象進去。
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
...
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
...
}
複製代碼
從上面能夠看出,CreateEmitter經過繼承了Java併發包中的原子引用類AtomicReference保證了事件流切斷狀態Dispose的一致性(這裏不理解的話,看到後面講解Dispose的時候就明白了),並實現了ObservableEmitter接口和Disposable接口,接着咱們分析下注釋2處的observer.onSubscribe(parent),這個onSubscribe回調的含義其實就是告訴觀察者已經成功訂閱了被觀察者。再看到註釋3處的source.subscribe(parent)這行代碼,這裏的source實際上是ObservableOnSubscribe對象,咱們看到ObservableOnSubscribe的subscribe()方法。
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public voidsubscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
});
複製代碼
這裏面使用到了ObservableEmitter的onNext()方法將事件流發送出去,最後調用了onComplete()方法完成了訂閱過程。ObservableEmitter是一個抽象類,實現類就是咱們傳入的CreateEmitter對象,接下來咱們看看CreateEmitter的onNext()方法和onComplete()方法的處理。
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
...
@Override
public void onNext(T t) {
...
if (!isDisposed()) {
//調用觀察者的onNext()
observer.onNext(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
...
}
複製代碼
在CreateEmitter的onNext和onComplete方法中首先都要通過一個isDisposed的判斷,做用就是看當前的事件流是否被切斷(廢棄)掉了,默認是不切斷的,若是想要切斷,能夠調用Disposable的dispose()方法將此狀態設置爲切斷(廢棄)狀態。咱們繼續看看這個isDisposed內部的處理。
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
複製代碼
注意到這裏經過get()方法首先從ObservableEmitter的AtomicReference中拿到了保存的Disposable狀態。而後交給了DisposableHelper進行判斷處理。接下來看看DisposableHelper的處理。
public enum DisposableHelper implements Disposable {
DISPOSED;
public static boolean isDisposed(Disposable d) {
// 1
return d == DISPOSED;
}
public static boolean set(AtomicReference<Disposable> field, Disposable d) {
for (;;) {
Disposable current = field.get();
if (current == DISPOSED) {
if (d != null) {
d.dispose();
}
return false;
}
// 2
if (field.compareAndSet(current, d)) {
if (current != null) {
current.dispose();
}
return true;
}
}
}
...
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
// ...
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
...
}
複製代碼
DisposableHelper是一個枚舉類,內部只有一個值即DISPOSED, 從上面的分析可知它就是用來標記事件流被切斷(廢棄)狀態的。先看到註釋2和註釋3處的代碼field.compareAndSet(current, d)和field.getAndSet(d),這裏使用了原子引用AtomicReference內部包裝的CAS方法處理了標誌Disposable的併發讀寫問題。最後看到註釋3處,將咱們傳入的CreateEmitter這個原子引用類保存的Dispable狀態和DisposableHelper內部的DISPOSED進行比較,若是相等,就證實數據流被切斷了。爲了更進一步理解Disposed的做用,再來看看CreateEmitter中剩餘的關鍵方法。
@Override
public void onNext(T t) {
...
// 1
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
// 2
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
...
// 3
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
// 4
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
// 5
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
// 6
dispose();
}
}
}
複製代碼
在註釋一、三、5處,onNext()和onError()、onComplete()方法首先都會判斷事件流是否被切斷,若是事件流此時被切斷了,那麼onNext()和onComplete()則會退出方法體,不作處理,onError()則會執行到RxJavaPlugins.onError(t)這句代碼,內部會直接拋出異常,致使崩潰。若是事件流沒有被切斷,那麼在onError()和onComplete()內部最終會調用到註釋四、6處的這句dispose()代碼,將事件流進行切斷,由此可知,onError()和onComplete()只能調用一個,若是先執行的是onComplete(),再調用onError()的話就會致使異常崩潰。
首先給出RxJava線程切換的例子:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public voidsubscribe(ObservableEmitter<String>emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext : " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError : " +e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
複製代碼
能夠看到,RxJava的線程切換主要分爲subscribeOn()和observeOn()方法,首先,來分析下subscribeOn()方法。
在Schedulers.io()方法中,咱們須要先傳入一個Scheduler調度類,這裏是傳入了一個調度到io子線程的調度類,咱們看看這個Schedulers.io()方法內部是怎麼構造這個調度器的。
static final Scheduler IO;
...
public static Scheduler io() {
// 1
return RxJavaPlugins.onIoScheduler(IO);
}
static {
...
// 2
IO = RxJavaPlugins.initIoScheduler(new IOTask());
}
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
// 3
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
// 4
static final Scheduler DEFAULT = new IoScheduler();
}
複製代碼
Schedulers這個類的代碼不少,這裏我只拿出有關Schedulers.io這個方法涉及的邏輯代碼進行講解。首先,在註釋1處,同前面分析的訂閱流程的處理同樣,只是一個處理hook的邏輯,最終返回的仍是傳入的這個IO對象。再看到註釋2處,在Schedulers的靜態代碼塊中將IO對象進行了初始化,其實質就是新建了一個IOTask的靜態內部類,在IOTask的call方法中,也就是註釋3處,能夠了解到使用了靜態內部類的方式把建立的IOScheduler對象給返回出去了。繞了這麼大圈子,Schedulers.io方法其實質就是返回了一個IOScheduler對象。
public final Observable<T> subscribeOn(Scheduler scheduler) {
...
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
複製代碼
在subscribeOn()方法裏面,又將ObservableCreate包裝成了一個ObservableSubscribeOn對象。咱們關注到ObservableSubscribeOn類。
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
// 1
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
// 2
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
// 3
observer.onSubscribe(parent);
// 4
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
...
}
複製代碼
首先,在註釋1處,將傳進來的source和scheduler保存起來。接着,等到實際訂閱的時候,就會執行到這個subscribeActual方法,在註釋2處,將咱們自定義的Observer包裝成了一個SubscribeOnObserver對象。在註釋3處,通知觀察者訂閱了被觀察者。在註釋4處,內部先建立了一個SubscribeTask對象,來看看它的實現。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
複製代碼
SubscribeTask是ObservableSubscribeOn的內部類,它實質上就是一個任務類,在它的run方法中會執行到source.subscribe(parent)的訂閱方法,這個source其實就是咱們在ObservableSubscribeOn構造方法中傳進來的ObservableCreate對象。接下來看看scheduler.scheduleDirect()內部的處理。
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// 1
final Worker w = createWorker();
// 2
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 3
DisposeTask task = new DisposeTask(decoratedRun, w);
// 4
w.schedule(task, delay, unit);
return task;
}
複製代碼
這裏最後會執行到上面這個scheduleDirect()重載方法。首先,在註釋1處,會調用createWorker()方法建立一個工做者對象Worker,它是一個抽象類,這裏的實現類就是IoScheduler,下面,咱們看看IoScheduler類的createWorker()方法。
final AtomicReference<CachedWorkerPool> pool;
...
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
...
@Override
public Worker createWorker() {
// 1
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
...
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
// 2
this.threadWorker = pool.get();
}
}
複製代碼
首先,在註釋1處調用了pool.get()這個方法,pool是一個CachedWorkerPool類型的原子引用對象,它的做用就是用於緩存工做者對象Worker的。而後,將獲得的CachedWorkerPool傳入新建立的EventLoopWorker對象中。重點關注一下注釋2處,這裏將CachedWorkerPool緩存的threadWorker對象保存起來了。
下面,咱們繼續分析3.6處代碼段的註釋2處的代碼,這裏又是一個關於hook的封裝處理,最終仍是返回的當前的Runnable對象。在註釋3處新建了一個切斷任務DisposeTask將decoratedRun和w對象包裝了起來。最後在註釋4處調用了工做者的schedule()方法。下面咱們來分析下它內部的處理。
@Override
public Disposable schedule(@NonNull Runnableaction, long delayTime, @NonNull TimeUnit unit){
...
return threadWorker.scheduleActual(action,delayTime, unit, tasks);
}
複製代碼
內部調用了threadWorker的scheduleActual()方法,其實是調用到了父類NewThreadWorker的scheduleActual()方法,咱們繼續看看NewThreadWorker的scheduleActual()方法中作的事情。
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 1
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
// 2
if (delayTime <= 0) {
// 3
f = executor.submit((Callable<Object>)sr);
} else {
// 4
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
複製代碼
在NewThreadWorker的scheduleActual()方法的內部,在註釋1處首先會新建一個ScheduledRunnable對象,將Runnable對象和parent包裝起來了,這裏parent是一個DisposableContainer對象,它實際的實現類是CompositeDisposable類,它是一個保存全部事件流是否被切斷狀態的容器,其內部的實現是使用了RxJava本身定義的一個簡單的OpenHashSet類進行存儲。最後註釋2處,判斷是否設置了延遲時間,若是設置了,則調用線程池的submit()方法當即進行線程切換,不然,調用schedule()方法進行延時執行線程切換。
從上面的分析,咱們能夠很容易瞭解到被觀察者被訂閱時是從最外面的一層(ObservableSubscribeOn)通知到裏面的一層(ObservableOnSubscribe),當連續執行了到屢次subscribeOn()的時候,其實就是先執行倒數第一次的subscribeOn()方法,直到最後一次執行的subscribeOn()方法,這樣確定會覆蓋前面的線程切換。
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
....
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
複製代碼
能夠看到,observeOn()方法內部最終也是返回了一個ObservableObserveOn對象,咱們直接來看看ObservableObserveOn的subscribeActual()方法。
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 1
if (scheduler instanceof TrampolineScheduler) {
// 2
source.subscribe(observer);
} else {
// 3
Scheduler.Worker w = scheduler.createWorker();
// 4
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
複製代碼
首先,在註釋1處,判斷指定的調度器是否是TrampolineScheduler,這是一個不進行線程切換,當即執行當前代碼的調度器。若是是,則會直接調用ObservableSubscribeOn的subscribe()方法,若是不是,則會在註釋3處建立一個工做者對象。而後,在註釋4處建立一個新的ObserveOnObserver將SubscribeOnobserver對象包裝起來,並傳入ObservableSubscribeOn的subscribe()方法進行訂閱。接下來看看ObserveOnObserver類的重點方法。
@Override
public void onNext(T t) {
...
if (sourceMode != QueueDisposable.ASYNC) {
// 1
queue.offer(t);
}
schedule();
}
@Override
public void onError(Throwable t) {
...
schedule();
}
@Override
public void onComplete() {
...
schedule();
}
複製代碼
去除非主線邏輯的代碼,在ObserveOnObserver的onNext()和onError()、onComplete()方法中最後都會調用到schedule()方法。接着看schedule()方法,其中onNext()還會把消息存放到隊列中。
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
複製代碼
這裏使用了worker進行調度ObserveOnObserver這個實現了Runnable的任務。worker就是在AndroidSchedulers.mainThread()中建立的,內部其實就是使用Handler進行線程切換的,此處再也不贅述了。接着看ObserveOnObserver的run()方法。
@Override
public void run() {
// 1
if (outputFused) {
drainFused();
} else {
// 2
drainNormal();
}
}
複製代碼
在註釋1處會先判斷outputFused這個標誌位,它表示事件流是否被融化掉,默認是false,因此,最後會執行到drainNormal()方法。接着看看drainNormal()方法內部的處理。
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
// 1
final Observer<? super T> a = downstream;
...
// 2
v = q.poll();
...
// 3
a.onNext(v);
...
}
複製代碼
在註釋1處,這裏的downstream其實是從外面傳進來的SubscribeOnObserver對象。在註釋2處將隊列中的消息取出來,接着在註釋3處調用了SubscribeOnObserver的onNext方法。最終,會從咱們包裝類的最外層一直調用到最裏面的咱們自定義的Observer中的onNext()方法,因此,在observeOn()方法下面的鏈式代碼都會執行到它所指定的線程中,噢,原來如此。
其實筆者使用了RxJava也已經有一年多的時間了,可是一直沒有去深刻去了解過它的內部實現原理,現在細細品嚐,的確是酣暢淋漓。從一開始的OkHttp到現現在的RxJava源碼分析,到此爲止,Android主流三方庫源碼分析系列文章已經發布了五篇了,咱們的征途已通過半,接下來,我將會對Android中的內存泄露框架LeakCanary源碼進行深刻地講解,盡請期待~
一、RxJava V2.2.5 源碼
二、Android 進階之光
歡迎關注個人微信:
bcce5360
微信羣若是不能掃碼加入,麻煩你們想進微信羣的朋友們,加我微信拉你進羣。
2千人QQ羣,Awesome-Android學習交流羣,QQ羣號:959936182, 歡迎你們加入~