樓主最近在找實習工做,因爲簡歷上說了解RxJava,因此在面試的時候應該會問到RxJava的知識,因而樓主結合RxJava的源碼,對RxJava的工做原理進行初步的瞭解。也只敢說是初步瞭解,由於本身也是第一次看RxJava的源碼,理解的程度確定不是很深。仍是那樣,若是有錯誤之處,但願各位指正!
本文參考:html
1.除非特殊說明,源碼來自:2.2.0版本面試
樓主打算將RxJava的源碼分析寫成一個系列文章,因此這個是這個系列的第一篇文章,在概述裏面仍是對RxJava是什麼簡單的介紹一下,本系列文章不會對RxJava的基本用法進行展開,若是有老哥對RxJava的基本使用掌握的不是很好的話,推薦這個系列的文章:給初學者的RxJava2.0教程(一)。
簡單的說一下RxJava,RxJava是基於觀察者模式的一個框架,在RxJava中有兩個角色,一個Observable,一般被稱爲被觀察者,一個是Observer,一般被稱爲觀察者。整體的架構是,由Observable來處理任務或者發送事件,而後在Observer裏面來接受到Observable發送過來的信息。
RxJava有不少的優點,好比線程調度,在Android裏面,耗時操做必須放在子線程中,可是同時還須要主線程來更細UI,因此線程調度就顯得尤其重要。固然RxJava還有不少重要的操做符,使得咱們的開發變得很是的方便。本系列文章不會對每一個操做符的基本使用展開,而是對一些比較經常使用的操做源碼分析,所說的經常使用,也是指樓主用到的!!畢竟是菜雞,確定有不少的東西都不太懂。安全
想要對RxJava的基本原理有一個更好的瞭解,必須對它的基本有一個大概的瞭解。咱們先經過一個簡單的案例,來對RxJava的基本元素進行提取。服務器
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
在這個簡單的案例當中,咱們能夠提取的元素有:Observable
, ObservableOnSubscribe
, ObservableEmitter
,Observer
。
元素仍是挺少的,咱們如今對每一個元素的類結構來進行簡單的分析一下。架構
public abstract class Observable<T> implements ObservableSource<T> { }
咱們發現Observable自己是一個抽象類,而且實現了ObservableSource接口,在來看看ObservableSource接口裏面有什麼。app
public interface ObservableSource<T> { void subscribe(@NonNull Observer<? super T> observer); }
ObservableSource接口裏面只有一個subscribe
方法,也就是說,RxJava將註冊觀察者這部分的功能提取成一個接口,從而能夠看出來,面向接口編程是多麼的重要😂😂。。。
再分別來看看咱們上面案例中使用的兩個方法--create
和subscribe
。框架
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { // 先省略代碼部分,待會詳細的分析。 }
啊,嚇我一跳,我覺得create方法的參數又是一個接口類型,還好是ObservableOnSubscribe
類型,也是上面提取出來的元素其中之一,關於這個類,待會會詳細的分析。ide
public final void subscribe(Observer<? super T> observer) { //... }
這個方法就更加的簡單了,就是傳遞了一個Observer接口的對象。不過須要注意的是這個方法有不少的重載,其中以Consumer類型的操做最爲多,不過這個也沒什麼,最後仍是Consumer轉換成爲了Observer,這個就涉及到Observer接口的一個實現類--LambdaObserver
。不要懼怕,待會都會一一的講解的。源碼分析
說了被觀察者,咱們先來看看觀察者--Observer
。
public interface Observer<T> { void onSubscribe(@NonNull Disposable d); void onNext(@NonNull T t); void onError(@NonNull Throwable e); void onComplete(); }
哎呀呀,更加的簡單了, Observer只是簡單的接口,不過咱們須要注意的是這個接口定義的4個方法,這裏不講解四個方法的做用,畢竟咱們這裏將Observable的基本原理🙄🙄。
public interface ObservableOnSubscribe<T> { void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception; }
一如既往的接口,subscribe
方法裏面就是具體作事情的地方,這個相信大佬們應該都知道,我這裏就班門弄斧的提醒一下😂😂。
public interface ObservableEmitter<T> extends Emitter<T> { void setDisposable(@Nullable Disposable d); void setCancellable(@Nullable Cancellable c); boolean isDisposed(); ObservableEmitter<T> serialize(); boolean tryOnError(@NonNull Throwable t); }
ObservableEmitter
也是一個接口,同時繼承了Emitter
接口,咱們來看看Emitter
接口的定義
public interface Emitter<T> { void onNext(@NonNull T value); void onError(@NonNull Throwable error); void onComplete(); }
做爲一個發射信息器,Emitter
裏面定義了不少關於發送消息給Observer
的方法,Emitter
的onNext
對應着Observer
的onNext
方法,其餘的方法也是相似的。
咱們對相關部分的基本元素有了一個基本的瞭解,如今咱們來對整個流程的工做原理進行分析。首先咱們create
方法入手
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
create方法沒有咱們想象中的那麼難,就只有兩行代碼,還有一行用來check的😂😂。對於ObservableCreate
類這裏先不進行分析,咱們來看看 RxJavaPlugins
的onAssembly
方法。
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; }
這裏提醒一下,onAssembly
方法的參數類型是Observable
類型,也就是說ObservableCreate
自己就是一個Observable
。好了,扯了題外話,來看看onAssembly
方法具體是幹嗎的。
整個方法的執行過程比較簡單,若是onObservableAssembly
爲null,直接就返回了source
,也就是說返回了ObservableCreate
自己。而咱們在整個Observable的源碼中發現,onObservableAssembly
初始值自己爲null。
public static void reset() { //······ setOnObservableAssembly(null); //······ }
爲何須要這樣子繞圈子的作呢?這裏就是作了鉤子,以便於之後的擴展。
因此Observable
的create
方法就是返回了一個ObservableCreate
對象,不過須要注意的是ObservableCreate
包裹了一個ObservableOnSubscribe
對象,也就是咱們在create方法裏面new的那個ObservableOnSubscribe
對象。
咱們先來不急着去理解ObservableCreate
是什麼,仍是來看看subscribe
方法爲咱們作了什麼。
當咱們經過Observable的create方法來獲取一個Observable對象時,一般還會調用Observable的subscribe方法來註冊一個觀察者。如今咱們來看看subscribe方法的實現。
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
整個過程也不是想象中的那麼神祕,除去check相關的方法不看,歸根結底就是兩行代碼,先是經過RxJavaPlugins
的onSubscribe
方法來獲取Observer
對象,具體操做這裏就不說了,確定跟RxJavaPlugins
的onAssembly
方法差很少,最後返回的是observer自己,最後調用了subscribeActual
方法。這個subscribeActual
方法是幹嗎的?
protected abstract void subscribeActual(Observer<? super T> observer);
臥了個槽?抽象方法!那我怎麼知道調用的是哪一個類的subscribeActual
方法?不急哈,記得咱們以前在create
方法返回的Observable
對象是哪一個類的對象嗎?想起來了吧,是ObservableCreate
先來看看ObservableCreate
類結構。
public final class ObservableCreate<T> extends Observable<T> { }
咱們發現,ObservableCreate
繼承了Observable
,其實在分析create方法時,我也說過喲。
在ObservableCreate
類中,只有一個ObservableOnSubscribe
類型的成員變量,這個成員變量就是咱們在create
方法裏面new的ObservableOnSubscribe
對象
咱們再來看看ObservableCreate
對subscribeActual
方法的實現
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
在subscribeActual
方法裏面,先是對Observer
對象進行一次包裝,將它包裝在CreateEmitter
類中。而後咱們會發現兩個比較眼熟的方法onSubscribe
方法和subscribe
方法。其中onSubscribe
方法在Observer
裏面看到過,而這裏剛好是經過Observer
對象來調用的,沒錯,這個的observer
就是在subscribe
方法裏面new的對象。但是咱們記得onSubscribe
方法的參數類型是Disposable
,而這裏是一個CreateEmitter
。咱們來看看CreateEmitter的類結構:
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { //······ }
沒錯,CreateEmitter
實現了Disposable
接口,因此CreateEmitter
自己能夠充當Disposable
的角色。
調用了Observer
的onSubscribe
方法以後,而後就會調用ObservableOnSubscribe
的subscribe
方法。
到這裏,咱們應該完全的明白了整個Observable
的工做流程。咱們經過create方法建立一個ObservableCreate
方法,而後調用了subscribe
方法來註冊了一個觀察者,在subscribe
方法裏面又調用了subscribeActual
方法,在subscribeActual
方法裏面先是調用了Observer
的onSubscribe
方法,而後調用了
ObservableOnSubscribe
的subscribe
方法,在ObservableOnSubscribe
的subscribe
方法當中,具體的作的事有兩件:1.作咱們本身的事情,好比從服務器上獲取數據之類;2.將發送信息到Observer
去。
理解了整個流程的工做原理,咱們如今來看看CreateEmitter
是怎麼信息發給Observer
的。
咱們知道,咱們在ObservableOnSubscribe
的subscribe
方法裏面使用ObservableEmitter
來發射信息到Observer
。如今咱們來看看整個CreateEmitter
的工做原理,不過,咱們仍是先來看看這個類的結構,雖然上面已經看了,可是擔憂大佬們忘了:
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { //······ }
在上面已經說了CreateEmitter
實現了Disposable
接口,能夠做爲Disposable
對象來操做,在接下來,咱們將重點介紹Disposable
是怎麼控制Observer
對信息的接收,同時還會介紹CreateEmitter
做爲ObservableEmitter
接口的那部分功能。
以前在分析基本元素時,已經說了ObservableEmitter
這個接口,它實現了Emitter
接口。在Emitter
接口裏面有三個方法用來發送信息給Observer
,分別是:onNext
,onError
,onComplete
。而CreateEmitter
類則是具體的實現了這三個方法,咱們來看看。
public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } }
代碼是很是的簡單,直接調用了Observer
的onNext
方法,也沒用什麼高逼格的東西😂😂。其他兩個方法也是如此。只不過是,在調用onNext
方法時作了一個isDisposed
的判斷。
因此感受Disposable
纔是這個類的核心。咱們來看看isDisposed
方法:
@Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); }
在isDisposed
方法裏面調用了DisposableHelper
的isDisposed
方法。不過這裏須要注意的是這裏傳遞過去的是get方法的返回值,這個返回值什麼意思?
回到CreateEmitter
的類結構,發現它繼承了AtomicReference
類,因此get方法返回的是一個Disposable
對象。
同時,咱們發現CreateEmitter
的dispose
方法也是經過DisposableHelper
類進行進行操做的,看看要理解Disposable
的功能,必須瞭解DisposableHelper
是怎麼操做的。
從感官上來講,一個發射信息器是否dispose
,直接設置一個boolean
類型的flag就OK了,爲何搞得這麼複雜,又是AtomicReference
,又是DisposableHelper
。這一切,咱們從DisposableHelper
來尋找答案。
首先咱們仍是來看看DisposableHelper
的結構:
public enum DisposableHelper implements Disposable { DISPOSED ; }
DisposableHelper
自己是一個enum類型,同時實現了Disposable
接口。這裏使用enum主要是爲了作一個DISPOSED
的單例。而後在經過isDisposed
方法來判斷是否dispose
,能夠直接與DISPOSED
比較。
public static boolean isDisposed(Disposable d) { return d == DISPOSED; }
既然判斷是否dispose
是直接與DISPOSED
比較,那麼若是dispose
的話,應該是將AtomicReference
裏面的值設置爲DISPOSED
吧?咱們來看一下dispose
方法:
public static boolean dispose(AtomicReference<Disposable> field) { Disposable current = field.get(); Disposable d = DISPOSED; if (current != d) { current = field.getAndSet(d); if (current != d) { if (current != null) { current.dispose(); } return true; } } return false; }
果真,跟咱們猜想同樣的,AtomicReference
裏面的值設置爲DISPOSED
。只是,這裏爲了線程安全,作了不少的判斷操做。
從這裏咱們能夠獲得,爲何須要設置DisposableHelper
來控制dispose
的狀態,那是由於線程安全,若是直接設置一個flag,在有些狀況下,可能存在線程不安全的風險。同時爲了代碼的優雅,若是這部分的邏輯寫在CreateEmitter
裏面,會不會顯得冗雜呢?
寫到這裏,我感受也差很少了。這裏對着部分的知識作一個總結。
1.在整個流程中,基本有Observable
,ObservableOnSubscribe
,ObservableEmitter
,Observer
,若是想要對整個過程有一個大概的理解,必須對這幾個元素有基本的認識。
2.Observer
的onNext
之類方法的觸發時機,其實是Observable
的subscribe
方法,由於subscribe
方法調用了Observable
的subscribeActual
方法,而在subscribeActual
方法裏面作了兩部分的操做:1.直接調用了Observer
的onSubscribe
方法;2.使用ObservableEmitter
將Observer
包裹起來,因此咱們在ObservableOnSubscribe
的subscribe
方法用ObservableEmitter
來發射信息,至關於調用了Observer
的相關方法。
3.在ObservableEmitter
的onNext
之類方法裏面,存在一種相似AOP的代碼,由於在調用Observer
的相關方法,作了一些其餘的操做。
做者:瓊珶和予 連接:https://www.jianshu.com/p/f17821d2cf78 來源:簡書 簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。