RxJava小考題 -- Rxjava源碼分析(一)

前言:

如今面試不少都會問RxJava的源碼,直接講RxJava的源碼,估計你們都不太會看下去,咱們先看個小考題,而後再去看相關的源碼。java

正文:

問題一:

//對象類
class Data {
        public String name;
        public Data(String name) {
            this.name = name;
        }
}

//好比咱們使用just操做符來發送二個Data對象
Observable<Data> data = Observable.just(
      new Data("aaaa"),
      new Data( "bbbb")
);

//好比咱們用一個Consumer對象來訂閱
data.subscribe(new Consumer<Data>() {
      @Override
      public void accept(Data data) throws Exception {
            //裏面的內容是把Observable發送過來的對象裏面的name屬性值改爲cccc
            data.name = "cccc";
      }
});

//這時候咱們在用一個新的Consumer來訂閱這個Observable
data.subscribe(new Consumer<Data>() {
      @Override
      public void accept(Data data) throws Exception {
            //這時候打印data.name
            Log.v("TAG",data.name);
      }
});

複製代碼

問題:程序員

咱們用二個Customer分別去訂閱一個發送對象的Observable,這時候咱們的Log.v("TAG",data.name);輸出內容是什麼。面試


問題二:

Observable<Integer> data1 = Observable.just(
       1,2
);

data1.subscribe(new Consumer<Integer>() {
     @Override
     public void accept(Integer d) throws Exception{
            d++;
     }
});

data1.subscribe(new Consumer<Integer>() {
     @Override
     public void accept(Integer d) throws Exception{
            Log.v("TAG","d:"+d);
     }
});

複製代碼

問題:數組

這時候咱們的Log.v("TAG","d:"+d);輸出內容是什麼。bash


答案是: 第一個輸出是cccc,cccc;第二個是1,2。不知道你們作對了沒有。app

若是沒有作對題目的,咱們就來一塊兒來分析代碼。ide


問題分析:

咱們先看第一個情形的代碼:函數

Observable.just(
      new Data("aaaa"),
      new Data( "bbbb")
);
複製代碼

just源碼 :ui

public static <T> Observable<T> just(T item1, T item2) {
     ObjectHelper.requireNonNull(item1, "The first item is null");
     ObjectHelper.requireNonNull(item2, "The second item is null");

     return fromArray(item1, item2);
}
複製代碼

前面的二行是檢查是否爲null。主要是第三行經過fromeArray返回了一個Observable。this

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> fromArray(T... items) {
     ObjectHelper.requireNonNull(items, "items is null");
     if (items.length == 0) {
         return empty();
     } else
     if (items.length == 1) {
         return just(items[0]);
     }
     return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
複製代碼

咱們能夠看到根據用戶傳的個數,返回不一樣的Observable,好比0個的時候返回empty();,一個的時候返回just(items[0]);,其餘都返回RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));,但其實本質都差很少。爲何這麼說:

empty()源碼:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings("unchecked")
public static <T> Observable<T> empty() {
    return RxJavaPlugins.onAssembly((Observable<T>) ObservableEmpty.INSTANCE);
}
複製代碼

咱們能夠看到也是調用了RxJavaPlugins.onAssembly方法。

just(items[0])源碼:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(T item) {
     ObjectHelper.requireNonNull(item, "The item is null");
     return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
複製代碼

其實也是調用了RxJavaPlugins.onAssembly方法。

因此三者雖然是不一樣的Observable,可是都是調用RxJavaPlugins.onAssembly方法,而後傳入不一樣的對象參數而已。

PS : 對於咱們平時見到的什麼Observable.create方法,或者Observable.interval方法等,都是最終調用RxJavaPlugins.onAssembly方法,只是一個調用了RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));,一個調用了RxJavaPlugins.onAssembly(new ObservableInterval(xxx,xxxx,xxxx))方法。

由於咱們的情形一種是發射了二個對象,那咱們就重點來看一下RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));方法:

咱們能夠看到RxJavaPlugins.onAssembly方法:

@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
   Function<? super Observable, ? extends Observable> f = onObservableAssembly;
   if (f != null) {
         return apply(f, source);
   }
   return source;
}
複製代碼

咱們能夠看到最後返回了傳入的Observable source,因此也就是咱們傳入的new ObservableFromArray<T>(items)

因此最終咱們拿到的Observable是new ObservableFromArray<T>(items),因此咱們通常接下去就是

//本質就是傳了一個 new ObservableFromArray<T>(items)
Observable observable = Observable.just(
      new Data("aaaa"),
      new Data( "bbbb")
);

//好比咱們用一個Observer對象來訂閱
observable .subscribe(new Observer<Data>() {
      @Override
      public void onSubscribe(Disposable d) {

      }

      @Override
      public void onNext(Data data) {

      }

      @Override
      public void onError(Throwable e) {

      }

      @Override
      public void onComplete() {

      }
});
複製代碼

咱們先來看subscribe方法:

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

             //最終調用了Observable的subscribeActual方法
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
}

複製代碼

咱們能夠看到代碼裏面的加的備註,說明最終咱們的 observable.subscribe(observer)最終執行了變爲:observable.subscribeActual(observer);,由於咱們說過咱們的observable具體是ObservableFromArray的實例,因此咱們直接看相關源碼。

ObservableFromArray.class:

public final class ObservableFromArray<T> extends Observable<T> {
    final T[] array;
    
    //實例化對象的調用的構造函數,同時傳入咱們要發送的數組
    public ObservableFromArray(T[] array) {
        this.array = array;
    }

     //最終訂閱的時候調用這個方法
    @Override
    public void subscribeActual(Observer<? super T> s) {

        //new 一個咱們平時用來取消訂閱的Disposable,這裏具體是FromArrayDisposable
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);

        //也就是咱們Observer複寫的onSubscribe方法,並把Disposable對象傳入
        s.onSubscribe(d);
        
        
        if (d.fusionMode) {
            return;
        }
        
        //而後執行了FromArrayDisposable對象的run方法
        d.run();
    }

    static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {

        final Observer<? super T> actual;

        final T[] array;

        int index;

        boolean fusionMode;

        volatile boolean disposed;
         //構造函數,傳入了Observer 和咱們要傳的數組
        FromArrayDisposable(Observer<? super T> actual, T[] array) {
            this.actual = actual;
            this.array = array;
        }

        @Override
        public int requestFusion(int mode) {
            if ((mode & SYNC) != 0) {
                fusionMode = true;
                return SYNC;
            }
            return NONE;
        }

        @Nullable
        @Override
        public T poll() {
            int i = index;
            T[] a = array;
            if (i != a.length) {
                index = i + 1;
                return ObjectHelper.requireNonNull(a[i], "The array element is null");
            }
            return null;
        }

        @Override
        public boolean isEmpty() {
            return index == array.length;
        }

        @Override
        public void clear() {
            index = array.length;
        }

        //咱們用來取消訂閱的方法
        @Override
        public void dispose() {
            disposed = true;
        }

        //用來判斷是否取消了訂閱
        @Override
        public boolean isDisposed() {
            return disposed;
        }

        //當訂閱的時候,真正執行的是Disposable的run方法。
        void run() {
            T[] a = array;
            int n = a.length;
            
            /*遍歷咱們要傳的數組,而後而且判斷isDisposed()的disposed值
            因此咱們就知道了爲啥咱們取消訂閱只要執行Disposable.dispose()方法了
            由於這時候會把disposed返回true,而後這裏的for循環判斷就會退出循環。
             */
            for (int i = 0; i < n && !isDisposed(); i++) {
                T value = a[i];
                /*咱們知道在RxJava 1的時候咱們發送一個null值是能夠的,
                  可是RxJava2就不行了,由於作了一個判空操做。
                  就會執行Observer的onError方法
                */
                if (value == null) {
                    actual.onError(new NullPointerException("The " + i + "th element is null"));
                    return;
                }
                //執行Observer的onNext方法,而且把值一個個傳過去
                actual.onNext(value);
            }
            /*若是用戶在onNext都運行完後,而且沒有執行dispose()方法,
              則if裏面爲true,就會執行Observer的onComplete()方法。
            */
            if (!isDisposed()) {
                actual.onComplete();
            }
        }
    }
}
複製代碼

咱們經過代碼能夠看到,其實Observable在生成實例後,裏面發送的數組是同一個數組,而且發送的數據也是同一個數據,因此雖然咱們用多個Observer去訂閱的時候,收到的Data對象是同一個,可是由於第一個Observer對這個對象裏面的屬性修改了,因此第二個Observer對獲取同個對象的時候,獲取的屬性值也就變了。

簡單能夠這麼理解:

Data data = new Data("aaaa");
Log.v("TAG",data.name);
change(data);
Log.v("TAG",data.name)

public void change(Data data){
     data.name = "bbbb";
}
複製代碼

其實咱們能夠這麼理解。雖然都是打印同一個對象,可是屬性被更改了。

因此咱們的情形一的代碼結果是否是已經能理解了呢,各位。

而情形二其實不是考驗RxJava的源碼基礎,而是考驗 Java基礎。由於情形二咱們發送的是(1,2);至關於:

int data = 1;
Log.v("TAG","data:"+data);
change(data);
Log.v("TAG","data:"+data);

public void change(int data){
     data = 2;
}
複製代碼

你就會發現其實二個Log打印的內容是同樣的,都是1。

與其餘語言不一樣,Java不容許程序員選擇按值傳遞仍是按引用傳遞各個參數,基本類型(byte--short--int--long--float--double--boolean--char)的變量老是按值傳遞。就對象而言,不是將對象自己傳遞給方法,而是將對象的的引用或者說對象的首地址傳遞給方法,引用自己是按值傳遞的-----------也就是說,講引用的副本傳遞給方法(副本就是說明對象此時有兩個引用了),經過對象的引用,方法能夠直接操做該對象(當操做該對象時才能改變該對象,而操做引用時源對象是沒有改變的)。

具體能夠參考這篇:Java值傳遞以及引用的傳遞、數組的傳遞

結語:

因此本章咱們更多地看了Rxjava的Observable生成及Observer訂閱時候的部分源碼及Java值傳遞等相關知識。

相關文章
相關標籤/搜索