RxJava2源碼分析(一):基本流程分析

前言:到如今這個階段,網上關於RxJava2源碼分析的文章已經滿天飛了,我寫這篇文章的目的並非說我會分析的比他們好,比他們透徹,這篇文章的目的只是單純的記錄本身分析RxJava2源碼的成功及收穫。java

概述

  對於一個編程人的技術成長,通常會經歷三個階段,首先是學會使用開源庫,而後是知道開源庫的原理,最後就是本身寫一個開源庫。雖然在平常的開發中使用RxJava2已經達到了駕輕就熟的地步,可是不瞭解具體的原理,總感受有點虛。因而就想靜下心來,好好的分析一下RxJava源碼,達到不只知其然更知其因此然的地步。git

  下圖是分析RxJava基本流程後,畫的UML圖,對於已經分析過源碼的大神,能夠看下圖畫的是否正確,對於沒有分析過源碼的人,能夠看下,先有個映像,而後再跟着文章的內容,一點點的理解。(點擊圖片查看大圖)github

基本流程分析,UML圖

源碼分析

  先看RxJava2基礎用法的代碼編程

private void basicUseRxJava() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.e("wizardev", "onNext: "+s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }
複製代碼

以上代碼,只是RxJava2的基本使用,並無涉及任何的操做符代碼,下面咱們就按方法順序開始分析源碼。app

create方法分析

  看下create()方法的代碼ide

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
     //一、判空
        ObjectHelper.requireNonNull(source, "source is null");
     //二、
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
複製代碼

從以上代碼能夠看出,create方法的返回值類型是Observable,參數是ObservableOnSubscribe<T>,能夠先看下這個ObservableOnSubscribe類,源碼以下源碼分析

public interface ObservableOnSubscribe<T> {

    /** * Called for each Observer that subscribes. * @param emitter the safe emitter instance, never null * @throws Exception on error */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
複製代碼

能夠發現ObservableOnSubscribe類是一個接口,裏面有一個subscribe方法。如今繼續看create方法中的代碼,在「1」處代碼是判斷傳入的參數是否爲空。這裏主要看下「2」處,這句RxJavaPlugins.onAssembly實際上是一個Hook方法,**「2」處代碼實質就是return new ObservableCreate<T>(source);,**不信的話,能夠看下onAssembly方法,以下ui

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
複製代碼

經調試,onObservableAssembly爲null,因此上面的代碼就直接返回了new ObservableCreate<T>(source)this

  如今看下ObservableCreate類,以下spa

public final class ObservableCreate<T> extends Observable<T> {
    //一、全局變量
    final ObservableOnSubscribe<T> source;

    //二、構造方法中將source賦值
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    //三、這個方法是在調用subscribe方法才調用的
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    //...省略部分代碼
}
複製代碼

從上面的代碼能夠知道,ObservableCreate類繼承自Observable,在實例化的時候將create方法中的ObservableOnSubscribe<T> source參數注入了進來,做爲成員變量source

結論

  經過分析Observable類的create方法,能夠有如下結論:

  1. create方法的返回值類型是Observable
  2. create方法的參數的類型是接口;
  3. create方法實際返回的是ObservableCreate類,而ObservableCreate類是Observable的子類;
  4. 在實例化ObservableCreate類的時候將create的方法的參數注入到了ObservableCreate類中,做爲它的成員變量source

這裏重點看下第4個結論,在這裏create方法的參數實際就是下面的代碼

new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        }
複製代碼

subscribe方法分析

  分析完了create方法,接着來分析subscribe方法,其方法代碼以下

public final void subscribe(Observer<? super T> observer) {
     //一、判空
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            //二、Hook方法,實質就是observer
            observer = RxJavaPlugins.onSubscribe(this, observer);
//判空
            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
			//四、重點,
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
複製代碼

這裏重點看下「4」處, 這裏調用了ObeservablesubscribeActual方法,能夠看下Obeservable類中的這個方法,以下

protected abstract void subscribeActual(Observer<? super T> observer);
複製代碼

這個方法是抽象的,實際調用的是它子類中的方法,經過上文的分析,咱們知道ObservableCreateObeservable類的子類,因此,這裏調用的實際就是ObservableCreate類中的subscribeActual方法。如今,咱們再看下這個方法中的代碼,以下

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        //一、實例化CreateEmitter
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //二、回調方法
        observer.onSubscribe(parent);

        try {
            //三、回調方法
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
複製代碼

咱們一步步的分析這個方法中的代碼,先看「1」處的代碼,這裏實例化了CreateEmitter這個類,在實例化的同時將observer傳了進去。看下CreateEmitter這個類的代碼,以下

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
//...省略部分代碼
        final Observer<? super T> observer;
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

       //...省略部分代碼
    }
複製代碼

經過上面的代碼,能夠發現CreateEmitter這個類實現了ObservableEmitter這個接口,而這個接口是ObservableOnSubscribe接口中subscribe方法的參數,是否是發現什麼了?如今繼續往下看,看下「2」處的代碼,這裏回調了ObserveronSubscribe方法,分析到這裏,能夠得出下面的結論

onSubscribe()回調所在的線程是ObservableCreate執行subscribe()所在的線程,和subscribeOn()/observeOn()無關!

重點來了,這裏看下「3」處的代碼,還記得source是誰嗎?**它就是執行Observable.create方法時,咱們注入給ObservableCreate類的成員變量,是ObservableOnSubscribe接口的實例。**這裏調用的subscribe方法,實際就是下面代碼的subscribe方法,

public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
複製代碼

這段代碼中的subscribe方法的參數實質就是CreateEmitter,調用的onNext方法就是CreateEmitter類中的onNext方法。繼續看下CreateEmitter類中的onNext方法,代碼以下

@Override
        public void onNext(T t) {
            //一、判斷傳入的參數是否爲null
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                //二、調用Observer中的onNext方法
                observer.onNext(t);
            }
        }
複製代碼

分析到這裏,就能夠得出如下結論了

subscribe方法中發射器所調用的onNext方法,若是代碼沒有出錯的話,最終調用的就是Observer中的onNext方法。

分析CreateEmitter中的其餘方法,還能夠知道爲何Observer中的onErroronComplete方法只有一個會回調的緣由了,緣由就是不管調用的是哪個方法都會調用dispose()方法取消訂閱。

結論

  對Observable.subscribe方法的分析能夠得出如下結論

  1. subscribe方法最終調用了ObservableCreate類中的subscribeActual方法。
  2. subscribeActual方法中,實例化了發射器,並開始發射數據。
  3. subscribe方法中發射器所調用的onNext方法,若是代碼沒有出錯的話,最終調用的就是Observer接口中的onNext方法。

總結

  經過對RxJava基本流程的源碼分析,是否是對RxJava的原理有了更清晰的認識呢?分析完以後,咱們再看下這張圖,是否是感受如今看起來就明白多了呢?

結束語

  想要了解一些開源庫的原理,咱們必需要閱讀其源碼,只有從源碼中才能獲得想要的答案,才能對庫的原理有更清晰的認識。

  再說下,閱讀開源庫的注意事項,閱讀源碼時,咱們最好帶着問題來閱讀,閱讀前先有個目標,好比我此次閱讀要搞懂什麼問題,而後再開始閱讀,否則就會很容易在茫茫代碼中迷失。還有就是不要想着每句代碼都搞懂,搞懂與本身想要獲取的答案有關的代碼便可。

  轉載請註明出處:www.wizardev.cn

歡迎關注個人公衆號
掃碼關注公衆號,回覆「獲取資料」有驚喜
相關文章
相關標籤/搜索