rxjava2源碼解析(一)基本流程分析

寫在前面

2020年開始了,給本身定了一個周更博客的計劃。還好計劃定的不算晚,我能夠在第一個星期過去以前趕出第一篇,《rxjava2源碼解析(一)基本流程分析》。這也是Read The Fucking Source Code系列的第一篇文章。java

先給你們說說我寫博客的初衷。我寫博客的目的只有一個:就是成爲優質博主git

去年的我,把焦慮當作學習的驅動力,結果很慘。頭髮沒了,對學習的興趣也愈來愈低。因此今年的我決定換種方式,去TM的焦慮。我不愛學習,我也不愛焦慮,我只想成爲優質博主。因此就也有了今年周更博客的計劃。我要把寫博客當成打怪升級,把大家的每一次閱讀看成我補一個兵,每個點贊評論看成個人一次單殺。程序員

爲了有更好的遊戲體驗,我必然會把每篇技術博客寫到極致,作到老小皆宜,你們都愛看。但願你們走過路過,點個贊再走啊!在此拜謝github

引言

Read The Fucking Source Code,是程序員圈子裏的一個衆所周知的梗。你們都知道讀源碼枯燥無趣,可又不得不作,非常痛苦。我作這個系列的目的就是想讓你們在閱讀源碼時,也能體驗到愉悅。開篇第一章,決定用rxjava2源碼閱讀開頭。由於這個框架平常都在用,面試也常常會問,已經成爲Android必備技能。可是知道怎麼用並不夠,面試官一問原理就蒙圈可不行。因此就有了RTFSC的第一卷,rxjava2源碼閱讀。我會盡可能把讀源碼這個枯燥的事情,給你們說的有趣一點,通俗一點。面試

從基本使用入手

首先隨便寫一個rxjava2的基本用法,咱們根據這個簡單的示例來看看rxjava2整個流程是什麼樣的。bash

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onNext("4");
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG,"onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG,"s = "+ s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
複製代碼

上面的部分,看起來太長,咱們能夠先將其簡化。app

Observable.create(ObservableOnSubscribe).subscribe(Observer);
複製代碼

廢話很少說,直接劃重點:框架

  • 1.能夠看到這裏出現了三個類名很是相像的類:ObservableObservableOnSubscribeObserver。也就是咱們平常說的,被觀察者,觀察者。
  • 2.爲了更好的區分,咱們將其形象化一點。Observable咱們稱其爲裝飾器,ObservableOnSubscribe咱們也稱其爲發射源,Observer咱們稱其爲處理器。爲何這麼稱呼,咱們能夠邊看源碼邊講。
  • 3.咱們能夠把上面的內容形象化爲:裝飾器Observable經過一個create方法和一個subscribe方法,將發射源和處理器鏈接起來。

接下來咱們看看這個鏈接在源碼中是如何實現的。ide

裝飾器Observable

首先從Observablecreate入手。函數

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");//判空做用
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
複製代碼

劃重點:

  • 1.create方法,須要傳入一個發射源ObservableOnSubscribe<T>對象,返回一個Observable<T>對象。
  • 2.忽略掉判空的代碼,onAssembly方法咱們也暫時放在一邊,只須要知道是返回傳入參數就好了。那create方法就是返回一個ObservableCreate對象。

那咱們來看看ObservableCreate這個類。

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    ....
}
複製代碼

劃重點:

  • 1.ObservableCreate這個類繼承自Observable
  • 2.ObservableCreate的構造方法中直接將參數中的發射源ObservableOnSubscribe做爲source存在本地。

OK,create方法看完了。很簡單,一句話總結,建立了一個裝飾器對象,將發射源存在本地備用。(有沒有一種看王剛炒菜的感受?)

爲何咱們稱Observable爲裝飾器?由於rxjava在這裏用到了裝飾器模式,而Observable是裝飾器模式下的基類。裝飾器模式這裏看還不明顯,看到後面就知道了。

發射源ObservableOnSubscribe

上面create方法須要傳入一個發射源ObservableOnSubscribe參數,咱們來看看源碼:

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
複製代碼

劃重點:

  • 1.發射源ObservableOnSubscribe是一個接口,咱們在使用它時會重寫subscribe方法。
  • 2.咱們會在subscribe方法中定義接下來要進行的一系列事件,因此咱們稱ObservableOnSubscribe爲事件發射源。
  • 3.subscribe方法有一個參數就是發射器ObservableEmitter(後面會詳細說明)。

訂閱(鏈接)

接下來講說下一步:subscribe
前面說到,Observablecreate方法返回的是ObservableCreate對象,ObservableCreatesubscribe方法並無進行重寫,咱們直接看Observable裏的subscribe方法。

@SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
複製代碼

讓咱們拋開那些不重要的代碼,直入主題。將其中的關鍵代碼簡化以後能夠變爲:

public final void subscribe(Observer<? super T> observer) {
    observer = RxJavaPlugins.onSubscribe(this, observer);
    subscribeActual(observer);
}
複製代碼

RxJavaPlugins這個一樣先甩在一邊放着無論,跟前面的onAssembly同樣,咱們只須要知道這是返回傳入的observer就好了。
那麼只有subscribeActual(observer)這一句關鍵代碼了。ObservablesubscribeActual是一個抽象方法,具體實如今子類中。

其實,在這裏咱們就能夠看出來,這是一個裝飾器模式。Observable是裝飾器模式的基類,實際上全部操做都是它的子類完成的。因此咱們稱其爲裝飾器。不僅是create方法,其餘一些操做符,例如mapflatMap也是這樣的。這個後面講到操做符和線程切換的時候,大家應該會更有體會。

因此後面咱們分析Observablesubscribe方法時,直接看子類中的subscribeActual(observer)就行。

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
複製代碼

劃重點:

  • 1.先建立一個CreateEmitter對象parent,而後調用處理器observeronSubscribe方法持有它。
  • 2.再調用source.subscribe(parent)將其傳入到source當中。這個source就是前面咱們說到備用的發射源ObservableOnSubscribe,其中的subscribe方法正好須要一個發射器CreateEmitter

那整條訂閱線就很清晰了:

  • 1.Observable調用create方法,參數是一個發射源ObservableOnSubscribe(咱們對其subscribe方法進行重寫),生成一個ObservableCreate對象。
  • 2.ObservableCreate調用subscribe方法,參數是一個處理器Observer
  • 3.在subscribe方法中咱們以Observer爲參數生成了一個發射器CreateEmitter,而且將這個發射器做爲參數,調用了發射源ObservableOnSubscribesubscribe方法。

這個CreateEmitter是什麼?咱們來看看它的源碼。

發射器CreateEmitter

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
        
        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
        
        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
        .....
    }
複製代碼

劃重點:

  • 1.CreateEmitterObservableCreate中的一個靜態內部類,繼承自AtomicReference<Disposable>,ObservableEmitter<T>, Disposable,咱們稱其爲發射器。
  • 2.咱們從onNext方法中能夠看出,這個發射器是直接與外部處理器對接的。
  • 3.發射器繼承自Disposable接口,這個接口只有dispose()和isDisposed()兩個方法,做用是切斷髮射過程。
  • 4.在上面的subscribeActual方法中咱們能夠看到,Observer有調用onSubscribe方法持有這個CreateEmitter發射器對象。因此咱們能夠在處理器中經過dispose()接口隨時中斷髮射流程。
  • 5.同時咱們能夠在代碼中看到,onErroronComplete兩個是互斥的。只會執行一個,由於一旦執行其中一個,會當即切斷髮射過程。

總結

總結一下出現的幾個類:

  • Observable -> 裝飾器模式的基類,咱們稱其爲裝飾器。有一個create方法,參數是一個ObservableOnSubscribe發射源,會返回一個ObservableCreate對象。
  • ObservableCreate -> 裝飾器實現類。有一個subscribe方法,參數是Observer處理器。在subscribe方法內部,咱們以Observer爲參數生成了一個CreateEmitter發射器,而且將這個發射器做爲參數,調用了發射源的subscribe方法。
  • ObservableOnSubscribe -> 發射源,自己只是一個接口,咱們重寫了subscribe方法,定義了接下來要處理的事件,因此稱其爲發射源。
  • CreateEmitter -> 發射器,構造方法中包含一個處理器。處理器持有這個發射器對象,能夠隨時中斷髮射過程。發射器中的onErroronComplete兩個是互斥的,只會執行一個。
  • Observer -> 處理器。用於處理髮射器發送的數據。

再總結一下整個運行流程以下:

  • 1.Observable調用create方法,參數是一個發射源ObservableOnSubscribe(咱們對其subscribe方法進行重寫),生成一個ObservableCreate對象。
  • 2.ObservableCreate調用subscribe方法,參數是一個處理器Observer
  • 3.在subscribe方法中咱們以Observer爲參數生成了一個CreateEmitter發射器,而且將這個發射器做爲參數,調用了發射源ObservableOnSubscribesubscribe方法。
  • 4.發射源ObservableOnSubscribesubscribe方法中定義了咱們要處理的事件,並將結果傳遞給發射器CreateEmitterCreateEmitter先判斷事件流是否斷開,不斷開則將結果傳遞給處理器Observer
  • 5.處理器Observer處理結果。

拓展

這時候咱們再回頭看咱們前面扔掉的東西,RxJavaPlugins.onAssemblyRxJavaPlugins.onSubscribe。咱們直接看源碼。

/**
     * Calls the associated hook function.
     * @param <T> the value type
     * @param source the hook's input value * @return the value returned by the hook */ @SuppressWarnings({ "rawtypes", "unchecked" }) @NonNull public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; } 複製代碼

方法介紹中有描述:Calls the associated hook function。
瞭解hook的應該就知道了,這裏至關因而利用Java反射機制,對source進行了一層包裝攔截。rxjava給咱們提供了一個注入hook的方法,咱們能夠經過hook來實如今調用source以前,須要先調用咱們設置的攔截函數。咱們如今只須要知道有這個東西就好了,後面有這個須要再用。

最後

這一篇主要是講rxjava基本使用中的源碼流程,下一篇咱來講說線程切換。

每週更新,敬請期待~

相關文章
相關標籤/搜索