RxJava 源碼分析系列(一) - Observable的基本分析

樓主最近在找實習工做,因爲簡歷上說了解RxJava,因此在面試的時候應該會問到RxJava的知識,因而樓主結合RxJava的源碼,對RxJava的工做原理進行初步的瞭解。也只敢說是初步瞭解,由於本身也是第一次看RxJava的源碼,理解的程度確定不是很深。仍是那樣,若是有錯誤之處,但願各位指正!
  本文參考:html

  1.除非特殊說明,源碼來自:2.2.0版本面試

  2.RxJava從源碼到應用 移動端開發效率秒提速編程

1.概述

  樓主打算將RxJava的源碼分析寫成一個系列文章,因此這個是這個系列的第一篇文章,在概述裏面仍是對RxJava是什麼簡單的介紹一下,本系列文章不會對RxJava的基本用法進行展開,若是有老哥對RxJava的基本使用掌握的不是很好的話,推薦這個系列的文章:給初學者的RxJava2.0教程(一)
  簡單的說一下RxJava,RxJava是基於觀察者模式的一個框架,在RxJava中有兩個角色,一個Observable,一般被稱爲被觀察者,一個是Observer,一般被稱爲觀察者。整體的架構是,由Observable來處理任務或者發送事件,而後在Observer裏面來接受到Observable發送過來的信息。
  RxJava有不少的優點,好比線程調度,在Android裏面,耗時操做必須放在子線程中,可是同時還須要主線程來更細UI,因此線程調度就顯得尤其重要。固然RxJava還有不少重要的操做符,使得咱們的開發變得很是的方便。本系列文章不會對每一個操做符的基本使用展開,而是對一些比較經常使用的操做源碼分析,所說的經常使用,也是指樓主用到的!!畢竟是菜雞,確定有不少的東西都不太懂。安全

2.基本元素

  想要對RxJava的基本原理有一個更好的瞭解,必須對它的基本有一個大概的瞭解。咱們先經過一個簡單的案例,來對RxJava的基本元素進行提取。服務器

Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> emitter) throws Exception {

      }
    }).subscribe(new Observer<String>() {
      @Override
      public void onSubscribe(Disposable d) {

      }

      @Override
      public void onNext(String s) {

      }

      @Override
      public void onError(Throwable e) {

      }

      @Override
      public void onComplete() {

      }
    });

  在這個簡單的案例當中,咱們能夠提取的元素有:ObservableObservableOnSubscribeObservableEmitter,Observer
  元素仍是挺少的,咱們如今對每一個元素的類結構來進行簡單的分析一下。架構

(1).Observable

public abstract class Observable<T> implements ObservableSource<T> {
}

  咱們發現Observable自己是一個抽象類,而且實現了ObservableSource接口,在來看看ObservableSource接口裏面有什麼。app

public interface ObservableSource<T> {
    void subscribe(@NonNull Observer<? super T> observer);
}

  ObservableSource接口裏面只有一個subscribe方法,也就是說,RxJava將註冊觀察者這部分的功能提取成一個接口,從而能夠看出來,面向接口編程是多麼的重要😂😂。。。
  再分別來看看咱們上面案例中使用的兩個方法--createsubscribe框架

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    // 先省略代碼部分,待會詳細的分析。 
    }

  啊,嚇我一跳,我覺得create方法的參數又是一個接口類型,還好是ObservableOnSubscribe類型,也是上面提取出來的元素其中之一,關於這個類,待會會詳細的分析。ide

public final void subscribe(Observer<? super T> observer) {
      //...
    }

  這個方法就更加的簡單了,就是傳遞了一個Observer接口的對象。不過須要注意的是這個方法有不少的重載,其中以Consumer類型的操做最爲多,不過這個也沒什麼,最後仍是Consumer轉換成爲了Observer,這個就涉及到Observer接口的一個實現類--LambdaObserver。不要懼怕,待會都會一一的講解的。源碼分析

(2).Observer

  說了被觀察者,咱們先來看看觀察者--Observer

public interface Observer<T> {
    void onSubscribe(@NonNull Disposable d);
    void onNext(@NonNull T t);
    void onError(@NonNull Throwable e);
    void onComplete();
}

  哎呀呀,更加的簡單了, Observer只是簡單的接口,不過咱們須要注意的是這個接口定義的4個方法,這裏不講解四個方法的做用,畢竟咱們這裏將Observable的基本原理🙄🙄。

(3).ObservableOnSubscribe

public interface ObservableOnSubscribe<T> {
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

  一如既往的接口,subscribe方法裏面就是具體作事情的地方,這個相信大佬們應該都知道,我這裏就班門弄斧的提醒一下😂😂。

(4).ObservableEmitter

public interface ObservableEmitter<T> extends Emitter<T> {
    void setDisposable(@Nullable Disposable d);
    void setCancellable(@Nullable Cancellable c);
    boolean isDisposed();
    ObservableEmitter<T> serialize();
    boolean tryOnError(@NonNull Throwable t);
}

  ObservableEmitter也是一個接口,同時繼承了Emitter接口,咱們來看看Emitter接口的定義

public interface Emitter<T> {
    void onNext(@NonNull T value);
    void onError(@NonNull Throwable error);
    void onComplete();
}

  做爲一個發射信息器,Emitter裏面定義了不少關於發送消息給Observer的方法,EmitteronNext對應着ObserveronNext方法,其餘的方法也是相似的。

3.Observable的工做原理

(1).create方法

  咱們對相關部分的基本元素有了一個基本的瞭解,如今咱們來對整個流程的工做原理進行分析。首先咱們create方法入手

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

  create方法沒有咱們想象中的那麼難,就只有兩行代碼,還有一行用來check的😂😂。對於ObservableCreate類這裏先不進行分析,咱們來看看 RxJavaPluginsonAssembly方法。

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

  這裏提醒一下,onAssembly方法的參數類型是Observable類型,也就是說ObservableCreate自己就是一個Observable。好了,扯了題外話,來看看onAssembly方法具體是幹嗎的。
  整個方法的執行過程比較簡單,若是onObservableAssembly爲null,直接就返回了source,也就是說返回了ObservableCreate自己。而咱們在整個Observable的源碼中發現,onObservableAssembly初始值自己爲null。

public static void reset() {
        //······
        setOnObservableAssembly(null);
        //······
    }

  爲何須要這樣子繞圈子的作呢?這裏就是作了鉤子,以便於之後的擴展。
  因此Observablecreate方法就是返回了一個ObservableCreate對象,不過須要注意的是ObservableCreate包裹了一個ObservableOnSubscribe對象,也就是咱們在create方法裏面new的那個ObservableOnSubscribe對象。
  咱們先來不急着去理解ObservableCreate是什麼,仍是來看看subscribe方法爲咱們作了什麼。

(2). subscribe方法

  當咱們經過Observable的create方法來獲取一個Observable對象時,一般還會調用Observable的subscribe方法來註冊一個觀察者。如今咱們來看看subscribe方法的實現。

public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

  整個過程也不是想象中的那麼神祕,除去check相關的方法不看,歸根結底就是兩行代碼,先是經過RxJavaPluginsonSubscribe方法來獲取Observer對象,具體操做這裏就不說了,確定跟RxJavaPluginsonAssembly方法差很少,最後返回的是observer自己,最後調用了subscribeActual方法。這個subscribeActual方法是幹嗎的?

protected abstract void subscribeActual(Observer<? super T> observer);

  臥了個槽?抽象方法!那我怎麼知道調用的是哪一個類的subscribeActual方法?不急哈,記得咱們以前在create方法返回的Observable對象是哪一個類的對象嗎?想起來了吧,是ObservableCreate

(3). ObservableCreate

  先來看看ObservableCreate類結構。

public final class ObservableCreate<T> extends Observable<T> {
}

  咱們發現,ObservableCreate繼承了Observable,其實在分析create方法時,我也說過喲。
  在ObservableCreate類中,只有一個ObservableOnSubscribe類型的成員變量,這個成員變量就是咱們在create方法裏面new的ObservableOnSubscribe對象
  咱們再來看看ObservableCreatesubscribeActual方法的實現

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

  在subscribeActual方法裏面,先是對Observer對象進行一次包裝,將它包裝在CreateEmitter類中。而後咱們會發現兩個比較眼熟的方法onSubscribe方法和subscribe方法。其中onSubscribe方法在Observer裏面看到過,而這裏剛好是經過Observer對象來調用的,沒錯,這個的observer就是在subscribe方法裏面new的對象。但是咱們記得onSubscribe方法的參數類型是Disposable,而這裏是一個CreateEmitter。咱們來看看CreateEmitter的類結構:

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
    //······
}

  沒錯,CreateEmitter實現了Disposable接口,因此CreateEmitter自己能夠充當Disposable的角色。
  調用了ObserveronSubscribe方法以後,而後就會調用ObservableOnSubscribesubscribe方法。
  到這裏,咱們應該完全的明白了整個Observable的工做流程。咱們經過create方法建立一個ObservableCreate方法,而後調用了subscribe方法來註冊了一個觀察者,在subscribe方法裏面又調用了subscribeActual方法,在subscribeActual方法裏面先是調用了ObserveronSubscribe方法,而後調用了
ObservableOnSubscribesubscribe方法,在ObservableOnSubscribesubscribe方法當中,具體的作的事有兩件:1.作咱們本身的事情,好比從服務器上獲取數據之類;2.將發送信息到Observer去。
  理解了整個流程的工做原理,咱們如今來看看CreateEmitter是怎麼信息發給Observer的。

4. CreateEmitter的工做原理

  咱們知道,咱們在ObservableOnSubscribesubscribe方法裏面使用ObservableEmitter來發射信息到Observer。如今咱們來看看整個CreateEmitter的工做原理,不過,咱們仍是先來看看這個類的結構,雖然上面已經看了,可是擔憂大佬們忘了:

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
    //······
}

  在上面已經說了CreateEmitter實現了Disposable接口,能夠做爲Disposable對象來操做,在接下來,咱們將重點介紹Disposable是怎麼控制Observer對信息的接收,同時還會介紹CreateEmitter做爲ObservableEmitter接口的那部分功能。
  以前在分析基本元素時,已經說了ObservableEmitter這個接口,它實現了Emitter接口。在Emitter接口裏面有三個方法用來發送信息給Observer,分別是:onNextonErroronComplete。而CreateEmitter類則是具體的實現了這三個方法,咱們來看看。

public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

  代碼是很是的簡單,直接調用了ObserveronNext方法,也沒用什麼高逼格的東西😂😂。其他兩個方法也是如此。只不過是,在調用onNext方法時作了一個isDisposed的判斷。
  因此感受Disposable纔是這個類的核心。咱們來看看isDisposed方法:

@Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

  在isDisposed方法裏面調用了DisposableHelperisDisposed方法。不過這裏須要注意的是這裏傳遞過去的是get方法的返回值,這個返回值什麼意思?
  回到CreateEmitter的類結構,發現它繼承了AtomicReference類,因此get方法返回的是一個Disposable對象。
  同時,咱們發現CreateEmitterdispose方法也是經過DisposableHelper類進行進行操做的,看看要理解Disposable的功能,必須瞭解DisposableHelper是怎麼操做的。

5.DisposableHelper

  從感官上來講,一個發射信息器是否dispose,直接設置一個boolean類型的flag就OK了,爲何搞得這麼複雜,又是AtomicReference,又是DisposableHelper。這一切,咱們從DisposableHelper來尋找答案。
  首先咱們仍是來看看DisposableHelper的結構:

public enum DisposableHelper implements Disposable {
    DISPOSED
    ;
}

  DisposableHelper自己是一個enum類型,同時實現了Disposable接口。這裏使用enum主要是爲了作一個DISPOSED的單例。而後在經過isDisposed方法來判斷是否dispose,能夠直接與DISPOSED比較。

public static boolean isDisposed(Disposable d) {
        return d == DISPOSED;
    }

  既然判斷是否dispose是直接與DISPOSED比較,那麼若是dispose的話,應該是將AtomicReference裏面的值設置爲DISPOSED吧?咱們來看一下dispose方法:

public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get();
        Disposable d = DISPOSED;
        if (current != d) {
            current = field.getAndSet(d);
            if (current != d) {
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }

  果真,跟咱們猜想同樣的,AtomicReference裏面的值設置爲DISPOSED。只是,這裏爲了線程安全,作了不少的判斷操做。
  從這裏咱們能夠獲得,爲何須要設置DisposableHelper來控制dispose的狀態,那是由於線程安全,若是直接設置一個flag,在有些狀況下,可能存在線程不安全的風險。同時爲了代碼的優雅,若是這部分的邏輯寫在CreateEmitter裏面,會不會顯得冗雜呢?

6.總結

  寫到這裏,我感受也差很少了。這裏對着部分的知識作一個總結。
  1.在整個流程中,基本有Observable,ObservableOnSubscribe,ObservableEmitter,Observer,若是想要對整個過程有一個大概的理解,必須對這幾個元素有基本的認識。
  2.ObserveronNext之類方法的觸發時機,其實是Observablesubscribe方法,由於subscribe方法調用了ObservablesubscribeActual方法,而在subscribeActual方法裏面作了兩部分的操做:1.直接調用了ObserveronSubscribe方法;2.使用ObservableEmitterObserver包裹起來,因此咱們在ObservableOnSubscribesubscribe方法用ObservableEmitter來發射信息,至關於調用了Observer的相關方法。
  3.在ObservableEmitteronNext之類方法裏面,存在一種相似AOP的代碼,由於在調用Observer的相關方法,作了一些其餘的操做。

做者:瓊珶和予 連接:https://www.jianshu.com/p/f17821d2cf78 來源:簡書 簡書著做權歸做者全部,任何形式的轉載都請聯繫做者得到受權並註明出處。

相關文章
相關標籤/搜索