RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.java
以上是RxJava在Github上的介紹,大概意思是,針對於JVM(Java虛擬機)的響應式擴展實現,一個在Java VM上使用可觀察的序列來組合實現異步的、基於事件編程的庫。git
RxJava如今你們用的都應該已經很溜了,用法這裏就再也不多說了。咱們都知道RxJava是對觀察者模式的擴展,下面就從觀察者模式的實現機制出發,瞭解一下RxJava2的實現邏輯。只有真正瞭解了RxJava 的實現原理,咱們才能在遇到問題的時候,更快速更準確的定位的到問題。github
這次源碼分析基於 RxJava Release 2.1.7編程
這裏簡單回顧一下觀察者模式的組成及使用方式,經過以前觀察者模式一文中的分析,咱們知道觀察者模式中有四個重要的角色:安全
當咱們建立好了具體主題和觀察者類,就可使用觀察者模式了,下面是一個最簡單的測試demo。bash
public class TestObservePattern { public static void main(String[] args) { // 建立主題(被觀察者) ConcreteSubject concreteSubject = new ConcreteSubject(); // 建立觀察者 ObserverOne observerOne=new ObserverOne(); // 爲主題添加觀察者 concreteSubject.addObserver(observerOne); //主題通知全部的觀察者 concreteSubject.notifyAllObserver("wake up,wake up"); } } 複製代碼
以上就是觀察者模式的使用方式,很簡單是吧。如今就讓咱們帶着如下幾個問題,看看RxJava是如何使用觀察者模式的。markdown
用RxJava這麼久了,你能夠思考一下以下幾個問題:app
若是對以上幾個問題,你有明確的答案,恭喜你,如下內容你就不用再看了,O(∩_∩)O哈哈~。異步
不少開發者對RxJava的學習多是從上游和下游的角度開始,這裏能夠認爲這樣的敘述更偏重RxJava 事件序列的特徵。本文從被觀察者(主題)和觀察者的角度出發,能夠說是更偏向於RxJava 觀察者模式的特徵。這裏的主題就是上游,觀察者就是下游。不管從哪一個角度出發去理解,源碼就那麼一份,無所謂對錯,只是每一個人的認知角度不一樣而已,選擇一種本身更容易瞭解的方式便可。async
好了,若是你看到了這裏,說明你對以上幾個問題,還有些許疑問,那麼咱們就從這幾個問題出發,瞭解一下RxJava的源碼實現。
咱們就帶着上述幾個問題,依次來看看RxJava究竟是怎麼一回事兒。爲了方便敘述和記憶,咱們首先看一段RxJava2 最最基礎的使用方式。
private void basicRxjava2() { Observable mObservable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { e.onNext("1"); e.onNext("2"); e.onNext("3"); e.onNext("4"); e.onComplete(); } }); Observer mObserver = new Observer() { @Override public void onSubscribe(Disposable d) { Log.e(TAG, "onSubscribe: d=" + d); sb.append("\nonSubcribe: d=" + d); } @Override public void onNext(Object s) { Log.e(TAG, "onNext: " + s); sb.append("\nonNext: " + s); } @Override public void onError(Throwable e) { Log.e(TAG, "onError: " + e); sb.append("\nonError: " + e.toString()); logContent.setText(sb.toString()); } @Override public void onComplete() { Log.e(TAG, "onComplete"); sb.append("\nonComplete: "); logContent.setText(sb.toString()); } }; mObservable.subscribe(mObserver); } 複製代碼
上面這段代碼,應該很容易理解了,輸出結果你們閉着眼睛也能想出來吧。咱們就以這段代碼爲基礎,結合上面提到的問題依次展開對RxJava的分析。
首先看看RxJava 中四個重要的角色是如何定義的。
首先能夠看看這個Observable類。
public abstract class Observable<T> implements ObservableSource<T> { …… } 複製代碼
他實現了ObservableSource接口,接着看ObservableSource
public interface ObservableSource<T> { /** * Subscribes the given Observer to this ObservableSource instance. * @param observer the Observer, not null * @throws NullPointerException if {@code observer} is null */ void subscribe(@NonNull Observer<? super T> observer); } 複製代碼
這裏很明顯了,ObservableSource 就是抽象主題(被觀察者)的角色。按照以前觀察者模式中約定的職責,subscribe 方法就是用來實現訂閱觀察者(Observer)角色的功能。從這裏咱們也能夠看出,抽象觀察者的角色就是Observer了。
這裏,你也許會有疑問,這麼簡單?抽象主題(上游)不是須要發送事件嗎?onNext(),onComplete()以及onError()跑哪兒去了?彆着急,咱們後面慢慢看。
回過頭來繼續看Observable,他實現了ObservableSource接口,而且實現了其subscribe方法,可是它並無真正的去完成主題和觀察者之間的訂閱關係,而是把這個功能,轉接給了另外一個抽象方法subscribeActual(具體細節後面分析)。
所以,Observable依舊是一個抽象類,咱們知道抽象類是不能被實例化的,所以從理論上來講,他好像不能做爲具體主題的角色。其實否則,Observable內部提供了create,defer,fromXXX,repeat,just等一系列建立型操做符, 用來建立各類Observable。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } 複製代碼
在RxJava內有不少他的子類。
誠然,你能夠認爲,這些子類其實才是真正的具體主題。可是,換一個角度,從代理模式的角度出發,咱們能夠把Observable當作是一個代理類,客戶端你只管調用create 方法,想要什麼樣的 Observable告訴我一聲就能夠,不一樣Observable之間的差別你不用管,包在我身上,保證給你返回你想要的Observable實例。
同時,Observable另外一個巨大的貢獻,就是定義了不少的操做符,咱們平時經常使用的map,flatMap,distinct等,也是在這裏定義。而且這些方法都是final類型的,所以他的全部子類都會繼承同時也沒法改變這些操做符的實現。
所以,Observable 就是具體主題。
在抽象主題裏已經提過了,Observer就是抽象觀察者的角色。
public interface Observer<T> { void onSubscribe(@NonNull Disposable d); void onNext(@NonNull T t); void onError(@NonNull Throwable e); void onComplete(); } 複製代碼
很是符合觀察者模式中抽象觀察者的職責描述,Observer 定義了觀察者(下游)收到主題(上游)通知後該作什麼事情。這裏須要注意的是onSubscribe 也是定義在這裏的。
這個具體的觀察者,o(╯□╰)oo(╯□╰)o,就很少說了吧。你們平時使用應該都是直接用new一個Observer的實例。RxJava內部有不少Observer的子類,有興趣的同窗能夠具體瞭解一下。這裏其實能夠引伸出一個有意思的問題,一樣是抽象類,爲何接口能夠直接實例化,而用abstract修飾過的類就不能夠?
咱們看一下這段代碼:
Observable mObservable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { } }); public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; // 是否有別的其餘操做符運算,有的話,在此Observable上執行一遍 if (f != null) { return apply(f, source); } return source; } 複製代碼
RxJava的代碼裏,不少時候會有ObjectHelper.requireNonNull這種空檢查的地方,一概都是爲了最大程度的防止NPE的出現,後面出現就再也不贅述了.
咱們使用create操做符建立Observable的過程當中,看似經歷了不少方法,在不考慮任何其餘操做符的前提下,整個過程簡化一下的話就這麼一句代碼
Observable mObservable=new ObservableCreate(new ObservableOnSubscribe()) 複製代碼
從以前的分析,咱們也看到了ObservableCreate 就是Observeable抽象類的一個子類。咱們簡單看一下他的實現。
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { …… } } 複製代碼
能夠看到,他惟一的構造函數須要一個ObservableOnSubscribe實例,同時他實現subscribeActual方法,說明他真正處理主題和觀察者之間實現訂閱的邏輯。
看了半天,你可能一直很好奇,這個ObservableOnSubscribe是個什麼東西呢?他其實很簡單。
/** * A functional interface that has a {@code subscribe()} method that receives * an instance of an {@link ObservableEmitter} instance that allows pushing * events in a cancellation-safe manner. * * @param <T> the value type pushed */ public interface ObservableOnSubscribe<T> { /** * Called for each Observer that subscribes. * @param e the safe emitter instance, never null * @throws Exception on error */ void subscribe(@NonNull ObservableEmitter<T> e) throws Exception; } 複製代碼
ε=(´ο`*)))唉,怎麼又一個subscribe,這又是啥?不要慌,看註釋。意思是說,這裏的subscribe 接收到一個ObservableEmitter實例後,就會容許他以一種能夠安全取消(也就是必定能取消)的形式發送事件。
就是說會有某個對象,給他一個ObservableEmitte的實例,沒給他以前他是不會主動發送事件的,會一直憋着。,到這裏,你是否是想到了什麼,咱們知道在RxJava 中只有觀察者(下游)訂閱(subscribe)了主題(上游),主題纔會發送事件。這就是和普通的觀察者模式有區別的地方之一。
好了,最後再來看看這個神祕的ObservableEmitter是個什麼鬼?
public interface ObservableEmitter<T> extends Emitter<T> { void setDisposable(@Nullable Disposable d); void setCancellable(@Nullable Cancellable c); boolean isDisposed(); ObservableEmitter<T> serialize(); /** * Attempts to emit the specified {@code Throwable} error if the downstream * hasn't cancelled the sequence or is otherwise terminated, returning false * if the emission is not allowed to happen due to lifecycle restrictions. * <p> * Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called * if the error could not be delivered. * @param t the throwable error to signal if possible * @return true if successful, false if the downstream is not able to accept further * events * @since 2.1.1 - experimental */ boolean tryOnError(@NonNull Throwable t); } 複製代碼
這裏能夠關注一下tryOnError這個方法,能夠看到他會把某些類型的error傳遞到下游。
o(╥﹏╥)o,又是一個接口,並且還繼承了另外一個接口,什麼狀況?繼續看
public interface Emitter<T> { void onNext(@NonNull T value); void onError(@NonNull Throwable error); void onComplete(); } 複製代碼
驚不驚喜,意不意外? 哈哈,終於找到你了,熟悉的onNext,onError,onComplete.原來在這裏。
這裏有個問題能夠思考一下,在抽象觀察者中,定義了四個處理事件的方法,這裏只有三個,按照對應關係來講彷佛缺了一個onSubscribe,這又是怎麼回事呢?後面會有分析,能夠本身先想一想
這兩個接口的含義很明顯了,總結一下:
好了,繞了一大圈,就爲了一行代碼:
Observable mObservable=new ObservableCreate(new ObservableOnSubscribe()) 複製代碼
總結一下具體主題(上游)的到底幹了啥:
爲了方便敘述,把問題3和4連在一塊兒說了。
經過上面的敘述,如今具體主題和具體的觀察者都建立好了,接下來就是實現兩者的訂閱關係。
mObservable.subscribe(mObserver);
複製代碼
這裏須要明確的一點是,是觀察者(下游)訂閱了主題(上游),雖然從代碼上看好像了前者訂閱了後者,不要搞混了。
咱們看Observable的subscribe() 方法:
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { …… } } 複製代碼
這個前面已經提到過了,Observable並無真正的去實現subscribe,而是把他轉接給了subscribeActual()方法。
前面已經說過,Observable的實例是一個ObservableCreate對象,那麼咱們就到這個類裏去看看subscribeActual()的實現。
// 爲了方便,順便再看一眼構造函數 public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } 複製代碼
CreateEmitter 實現了以前提到的ObservableEmitter接口。這裏有一句關鍵的代碼:
observer.onSubscribe(parent);
複製代碼
以前在看到Emitter的定義時,咱們說缺乏了onSubscribe方法,到這裏就明白了。onSubscribe並非由主題(上游)主動發送的事件,而是有觀察者(下游)本身調用的一個事件,只是爲了方便獲取Emitter的實例對象,準確的說應該是Disposable的實例對象,這樣下游就能夠控制上游了。
接下來就更簡單了,source 是ObservableOnSubscribe,按照以前的邏輯,調用其subscribe方法,給他一個ObservableEmitter對象實例,ObservableEmitter就會開始發送事件序列。這樣,一旦開始訂閱了,主題(上游)就開始發送事件了。也就是咱們熟悉的onNext,onComplete,onError 方法真正的開始執行了。
接着看看CreateEmitter的實現。
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); …… } static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { private static final long serialVersionUID = -3434801548987643227L; final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } @Override public void onError(Throwable t) { if (!tryOnError(t)) { RxJavaPlugins.onError(t); } } @Override public boolean tryOnError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (!isDisposed()) { try { observer.onError(t); } finally { dispose(); } return true; } return false; } @Override public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } @Override public void setDisposable(Disposable d) { DisposableHelper.set(this, d); } @Override public void setCancellable(Cancellable c) { setDisposable(new CancellableDisposable(c)); } @Override public ObservableEmitter<T> serialize() { return new SerializedEmitter<T>(this); } @Override public void dispose() { DisposableHelper.dispose(this); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } } } 複製代碼
最後再來簡單說一下,RxJava中對常規的觀察者模式作了怎樣的調整,有什麼值得借鑑的地方。大部分優勢在上面已經說起了,這裏就來總結一下。
好了,以上就是從觀察者模式的角度出發,對RxJava的一次解讀,有什麼疏漏或理解錯誤的地方,歡迎讀者指出,共同進步!