Rxjava2源碼解析

1:用法:ide

 Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                //Log.d(TAG, "ObservableEmitter");
                //Log.d(TAG, "Observable thread is" + Thread.currentThread().getName());
                emitter.onNext(12);
                emitter.onNext(13);
                emitter.onNext(14);
                emitter.onNext(15);
                emitter.onComplete();
            }
        });
        Observer<Integer> observer = new Observer<Integer>() {
            private int i;
            private Disposable mDisposable;

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
                mDisposable = d;
            }

            @Override
            public void onNext(Integer integer) {
                /*i++;
                if(i == 3){
                    mDisposable.dispose();
                }*/
                Log.d(TAG, "onNext" + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

        //observable.subscribe(observer);
        observable.doOnSubscribe(disposable -> {
                    Log.d(TAG, "doOnSubscribe");
                }
        ).doOnComplete(() -> {
            Log.d(TAG, "doOnComplete");
        }).doOnNext((C) -> {
            Log.d(TAG, "doNext" + C);
        }).subscribe(observer);

 

2:Observable函數

首先看Observable:ui

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                //Log.d(TAG, "ObservableEmitter");
                //Log.d(TAG, "Observable thread is" + Thread.currentThread().getName());
                emitter.onNext(12);
                emitter.onNext(13);
                emitter.onNext(14);
                emitter.onNext(15);
                emitter.onComplete();
            }
        });

看一下create:this

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

參數ObservableOnSubscribe是一個接口,裏面只有一個函數subscribe:spa

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(ObservableEmitter<T> e) throws Exception;
}

create須要返回的是一個Observable:code

return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));

經過在這裏可知反回的是一個Observable的繼承類ObservableCreate:server

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
。。。。。。。。。。

總結以上的幾個類,能夠概括一下:blog

經過Observable.create返回一個Observable,具體是返回ObservableCreate,該類繼承Observable,同時該類持有ObservableOnSubscribe,而ObservableOnSubscribe是一個接口,具體的實現是在:繼承

 Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                //Log.d(TAG, "ObservableEmitter");
                //Log.d(TAG, "Observable thread is" + Thread.currentThread().getName());
                emitter.onNext(12);
                emitter.onNext(13);
                emitter.onNext(14);
                emitter.onNext(15); emitter.onComplete(); }
        });

Observable的解析暫時先到這裏,咱們先看如何和observer關聯起來:接口

//observable.subscribe(observer);
        observable.doOnSubscribe(disposable -> {
                    Log.d(TAG, "doOnSubscribe");
                }
        ).doOnComplete(() -> {
            Log.d(TAG, "doOnComplete");
        }).doOnNext((C) -> {
            Log.d(TAG, "doNext" + C);
        }).subscribe(observer);

進入subscribe:

 @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

能夠看到這是Observable裏面的一個方法,前面咱們說到,create生成的是ObservableCreate,而該類繼承Observable,因此咱們如今就是在ObservableCreate的subscribe方法裏面,看參數,傳進來的是一個observer,observer也是一個interface,具體實現就是應用層的:

Observer<Integer> observer = new Observer<Integer>() {
            private int i;
            private Disposable mDisposable;

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
                mDisposable = d;
            }

            @Override
            public void onNext(Integer integer) {
                /*i++;
                if(i == 3){
                    mDisposable.dispose();
                }*/
                Log.d(TAG, "onNext" + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete"); }
        };

到這裏再來總結一下,經過subscribe,就是ObservableCreate這個類(該類繼承Observable,同時該類持有ObservableOnSubscribe,而ObservableOnSubscribe是一個接口)執行subscribe,傳參爲observer,是一個實現爲用戶層定義的接口。接下來就是具體看一下subscribe這個函數是如何走的:

@SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

首先看:

subscribeActual(observer);

點進這個函數看到是一個Observable裏面的一個abstract函數,那實現是在哪裏呢,前面一直提到的,咱們如今實際上是處於ObservableCreate這個類,進入該類找到subscribeActual這個函數:

 @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

先看這句:

source.subscribe(parent);

先看看source是怎麼來的:

 final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                //Log.d(TAG, "ObservableEmitter");
                //Log.d(TAG, "Observable thread is" + Thread.currentThread().getName());
                emitter.onNext(12);
                emitter.onNext(13);
                emitter.onNext(14);
                emitter.onNext(15);
                emitter.onComplete();
            }
        });

一層層往上追溯能夠看到sorce其實就是用戶層實現的那個接口:

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(ObservableEmitter<T> e) throws Exception;
}

會到:

 source.subscribe(parent);

這裏的parent是CreateEmitter,先看subscribe:

 public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                //Log.d(TAG, "ObservableEmitter");
                //Log.d(TAG, "Observable thread is" + Thread.currentThread().getName());
                emitter.onNext(12);
                emitter.onNext(13);
                emitter.onNext(14);
                emitter.onNext(15);
                emitter.onComplete();
            }

前面看到subscribe的參數是parent:

  CreateEmitter<T> parent = new CreateEmitter<T>(observer);

這裏定義了一個CreateEmitter,持有observer:

 implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
            } else {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

咱們來看一下onNext是如何執行的,當執行到:

emitter.onNext(12);

的時候,就到了:

 @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

看這句:

observer.onNext(t);

咱們前面講到了,這裏的observer,就是用戶層自定義的:

 Observer<Integer> observer = new Observer<Integer>() {
            private int i;
            private Disposable mDisposable;

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
                mDisposable = d;
            }

            @Override
            public void onNext(Integer integer) {
                /*i++;
                if(i == 3){
                    mDisposable.dispose();
                }*/
                Log.d(TAG, "onNext" + integer);
            }

到這裏,Observable裏面的onNext和Observer的onNext就聯繫起來了,Observavble發送一個,Observer執行一個。onError和onComplete同理。

相關文章
相關標籤/搜索