RxJava2.0(三)談一談基礎功能源碼實現

前言

咱們在使用RxJava的時候最經常使用的功能就是寫一個被觀察者、一個觀察者。在被觀察者中發射數據,在觀察者中接收數據,最後用subscribe將二者給訂閱起來實現最基礎的功能。例以下面這種:git

//被觀察者
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("aa");
                e.onNext("bb");
                e.onNext("cc");
                e.onNext("dd");
                e.onComplete();
            }
        });

        //觀察者
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                //TODO 初始化數據
                d.dispose();
            }

            @Override
            public void onNext(String value) {
                //TODO 接收被觀察者發送的數據
                Log.i("result:",value);

            }

            @Override
            public void onError(Throwable e) {
                //TODO 錯誤

            }

            @Override
            public void onComplete() {
                //TODO 完成以後回調
            }
        };

        //訂閱
        observable.subscribe(observer);
複製代碼

那麼在這種狀況下,被觀察者是如何發送數據給觀察者?觀察者又是如何接收數據?二者又是如何被subscribe訂閱起來的呢?下面咱們經過源碼的分析來查看這一切的操做流程。github

Observable.creat()

首先,在建立被觀察者的時候,通常來講都是經過Observable.creat()來建立。那麼咱們進入creat()方法去看看裏面作了什麼操做。bash

@SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    
        //非空判斷
        ObjectHelper.requireNonNull(source, "source is null");
        
        //返回一個Observable
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
複製代碼

咱們跳過非空判斷邏輯,直接查看return。這裏會經過RxJavaPlugins.onAssembly()返回一個Observable對象。那麼咱們跳進onAssembly去查看裏面是如何進行Observable的建立的。ide

public static <T> Observable<T> onAssembly(Observable<T> source) {
       
       ...
       
        //返回傳入的參數對象
        return source;
    }
複製代碼

這裏其實沒作什麼很特別的操做,僅僅只是返回了參數對象。也就是說咱們如今應該返回去重點研究的是這個參數對象source。而這個source根據前面的源碼查看,能夠看到實際上是new ObservableCreate()。咱們跳進這裏查看源碼分析

ObservableCreat()

//ObservableCreat繼承了Observable,爲被觀察者Observable的子類
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    //構造方法,傳入參數ObservableOnSubscribe,也就是咱們在裏面進行onNext、onComplete與onError的方法。
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    
    //此方法是在被訂閱subscribe時候才調用,具體後面再說。
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
}
複製代碼

代碼進行了一些刪減。學習

經過上面代碼邏輯註釋能夠看到:ui

1.ObservableCreat爲Observable的子類,由於他擁有Observable的所有特性。this

2.在ObservableCreat構造方法中傳入了ObservableOnSubscribe,這個具體的做用咱們下面講。spa

3.有一個subscribeActual方法,這個方法實際上是後面用做訂閱的方法subscribe來實現的具體方式。線程

如今咱們再來看看傳入ObservableCreat中參數ObservableOnSubscribe

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(ObservableEmitter<T> e) throws Exception;
}
複製代碼

發現ObservableOnSubscribe實際上是一個接口,而這個接口裏有一個方法,專門用來實現咱們的向觀察者發送消息的方法。到此,被觀察者在進行creat()的源碼分析完畢,咱們來總結一下。

總結:

1.在進行creat的時候,內部返回了一個Observable,而這個Observable其實是一個繼承了ObserVable的ObservableCreat類

2.ObserVable的ObservableCreat構造方法中傳入了一個接口ObservableOnSubscribe,咱們通常進行數據的發送都是經過這個接口中的subscribe方法裏的ObservableEmitter來進行onNext、onComplete與onError。

Obsevable.subscribe()

public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
        
            //獲取observer
            observer = RxJavaPlugins.onSubscribe(this, observer);
            
            //非空判斷
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            
            //訂閱方法
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
複製代碼

拋開上面的邏輯判斷不談,咱們直接看訂閱方法subscribeActual()。不知道你們還記不記得,在咱們講Observable.creat的時候,在ObservableCreat這個類裏面有兩個用到的方法,一個是構造方法,傳入接口ObservableOnSubscribe,另一個是subscribeActual。而如今Observable.subscribe實際上就是在執行這個方法。

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        //實例化CreatEmitter對象
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        
        //執行觀察者中的onSubscribe方法
        //onSubscribe()回調所在的線程是ObservableCreate執行subscribe()所在的線程
        //和subscribeOn()與observeOn()無關!
        observer.onSubscribe(parent);

        try {
            //真正的訂閱方法
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
複製代碼

咱們在來看下這方法中實例化CreatEmitter對象的這個類。

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                //當沒有被取消訂閱的時候就執行onNext()方法用於發送數據
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            //出現錯誤時調用這個方法,用於拋出異常,而且在拋出以後的finally中調用dispose用於取消訂閱
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
            } else {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
        //判斷若是沒有執行disposed方法就調用onComplete而且dispose。這也就是爲何onComplete與onError爲何只會執行其中一個
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }


複製代碼

裏面的邏輯能夠說是一點都不復雜,就是咱們平時常常使用的onNext、onComplete與onError方法。

onNext():先判斷髮送的消息是否爲null,若是爲空則調用onError方法來拋出異常。若不爲空而且並未取消訂閱,則發送數據。

onError():出現錯誤的時候執行這個方法。當拋去異常以後經過finally強行執行dispose()方法,來強制結束掉訂閱。

onComplete():判斷若是沒有執行disposed方法就調用onComplete而且dispose。這也就是爲何onComplete與onError爲何只會執行其中一個。

總結

分析一下各個類的職責:

Observable :我的理解是裝飾器模式下的基類,實際上全部操做都是Observable的子類進行的實現

ObservableOnSubscribe: 接口,定義了數據源的發射行爲

ObservableCreate: 裝飾器模式的具體體現,內部存儲了數據源的發射事件,和subscribe訂閱事件

ObservableEmitter: 數據源發射器,內部存儲了observer

Observer: 接收到數據源後的回調(好比打印數據等)

1.Observable.create(),實例化ObservableCreate和ObservableOnSubscribe,並存儲數據源發射行爲,準備發射(我已經準備好數據源,等待被訂閱)

2.Observable.subscribe(),實例化ObservableEmitter(發射器ObservableEmitter準備好!數據發射後,數據處理方式Observer已準備好!)

3.執行Observer.onSubscribe()回調,ObservableEmitter做爲Disposable參數傳入

4.執行ObservableOnSubscribe.subscribe()方法(ObservableEmitter發射數據,ObservableEmitter內部的Observer處理數據)

具體其餘的一些操做符的用法,請參考個人github:RxJavaDemo

有興趣能夠關注個人小專欄,學習更多知識:小專欄

相關文章
相關標籤/搜索