重學Rx(二)


一 前言bash

       上一篇講解的被觀察者的分類,以及特殊的subject/Processor。起初腦海中構建這篇文章大概是講解subject,可是被《一篇不太同樣的RxJava介紹》深深的吸引,這篇文章闡述了rx真實的推導過程,原來Observable是一個異步集合——從主動問是否有數據,到被動等通知數據到來,其實被動的等通知就是統一同步和異步世界的鑰匙,而且順帶統一的回調。以後一發不可收拾的又看了《Observable究竟如何封裝數據》簡單有效的源碼分析。仍是不夠過癮,因而就有了這篇文章------談不上源碼分析,只是簡單點進去看看裏邊的大體調用原理。app

二 僅僅看看你異步

先來看這樣一段代碼:ide

代碼一:
 Observable<Integer> Observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    e.onNext(1);
                }
            });複製代碼

點進去靜態方法create內部代碼以下:源碼分析

源碼一:
 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }複製代碼

source指的是外部傳入的ObservableOnSubscribe對象。source又傳入ObservableCreate對象中,而且靜態方法返回這個ObservableCreate對象。
post

繼續跟進ObservableCreate對象,這個類代碼以下:ui

源碼二:
//1 它是一個Observable
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;
       //2這裏的 source 就是外部傳入的ObservableOnSubscribe
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
      
    @Override //注意這個方法,下邊進行闡述。
    protected void subscribeActual(Observer<? super T> observer) {
         //建立發射對象,傳入被觀察者
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        
        observer.onSubscribe(parent);

        try {
            //3.調用外部邏輯,對外暴漏發射對象
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    } 
}複製代碼

源碼二有兩點須要闡述:this

1.CreateEmitter經過回調對外暴漏對象,同時CreateEmitter持有observer對象,理所固然能夠猜到CreateEmitter內部是onNext方法中去調用observer的onNext方法。點進去猜對了。spa

2. subscribeActual方法何時調用?當訂閱的時候纔會調用這個方法,不信你能夠查看Observable$subscribe方法,裏邊對subscribeActual方法進行了調用。線程

很顯然這個Observable1沒有被訂閱,因此目前來subscribeActual方法不會被調用。

咱們使用map去轉換Observable1而後返回Observable2

代碼二:
Observable<Integer> Observable2 = Observable1.map(new Function<Integer, Integer>() {
                @Override
                public Integer apply(Integer integer) throws Exception {
                    return integer + 10;
                }
            });複製代碼

點進去map,能夠看到

源碼三:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
           這個this就是Observable1實例,畢竟是調用他的map方法
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    } 複製代碼

繼續跟蹤源碼三中的ObservableMap以下:

源碼四
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;
      //注意這個source是上邊傳的this,也是Observable1實例   
 public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override//這個方法是訂閱的時候才調用
    public void subscribeActual(Observer<? super U> t) {
        //注意這個source是上邊傳的this,也是Observable1實例     
           source.subscribe(new MapObserver<T, U>(t, function));
    } }複製代碼

源碼3、四有幾點須要闡述:

源碼四中的source就是上流的Observable1事例

由於Observable2 沒有訂閱,因此subscribeActual方法不會被調用。

咱們再去使用doOnNext去作一次轉換      

代碼三:
Observable<Integer> Observable3 = Observable2.doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {

                }
            });複製代碼

點進去doOnNext查看源碼:

源碼五:
 private Observable<T> doOnEach(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Action onAfterTerminate) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null");
        //這裏的this是指的上流的Observable2,畢竟調用的他的doOnEach方法
       return RxJavaPlugins.onAssembly(new ObservableDoOnEach<T>(this, onNext, onError, onComplete, onAfterTerminate));
    } 複製代碼

繼續跟蹤ObservableDoOnEach

源碼六:
public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
    final Consumer<? super T> onNext;
    final Consumer<? super Throwable> onError;
    final Action onComplete;
    final Action onAfterTerminate;
    //這個source是上邊傳入的this,也就是Observable2實例   
 public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
                              Consumer<? super Throwable> onError,
                              Action onComplete,
                              Action onAfterTerminate) {
        super(source);
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
        this.onAfterTerminate = onAfterTerminate;
    }

    @Override
    public void subscribeActual(Observer<? super T> t) {
         //這個source是上邊傳入的this,也就是Observable2實例     
   source.subscribe(new DoOnEachObserver<T>(t, onNext, onError, onComplete, onAfterTerminate));
    } 複製代碼

源碼六中的source就是上流的Observabl2事例

截止到目前Observable3 也沒有被訂閱,因此全部的subscribeActual方法都沒有被調用。

接下來咱們去訂閱Observable3(注意僅僅是訂閱Observable3

Observable3 .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {

                }
            });複製代碼

這個時候源碼六中的subscribeActual方法被觸發,subscribeActual中的邏輯又會觸發Observable2被訂閱,這個時候源碼四中的的subscribeActual方法被觸發,subscribeActual中的邏輯又會使得Observable1被訂閱,當Observable1被訂閱的時候,源碼二中的subscribeActual方法就開始建立發射對象,進行發射。這就完成了一處訂閱,到處訂閱的效果。

到此,咱們只是跟蹤了訂閱的源碼。

咱們接下來跟蹤一下觀察者是怎麼傳遞和執行的!

看源碼六,外部傳入的Observer經過DoOnEachObserver包裝而後傳遞給Observable2。繼續跟蹤又經過源碼四中的MapObserver包裝傳遞到Observable1最後傳遞到源碼二中的CreateEmitter對象中被調用,這就經過包裝者模式完成數據轉換的層層封裝。

三總結

       都是下流被觀察者持有上流被觀察者對象引用,能夠保證subscribeActual方法遞歸的向上調用。每次調用都把觀察者使用包裝者模式包裝一下本層的邏輯,而後傳遞給上游,直至到源頭流。

在訂閱時,實際上這個順序是逆向的,從下游往上游進行訂閱。數據的傳遞和變換則是正常,方向從上游往下游進行依次處理,最終執行咱們subscribe中傳遞的Observer.onNext()。

以上分析所有是基於同步進行的分析,若是想要印證:

observeOn 後面的全部操做都會在它指定線程工做。subscribeOn 指定的線程是從這個Observable生成一直到
遇到其餘 observeOn。若是程序須要屢次切換線程,使用屢次observeOn是徹底能夠的。而subscribeOn只有最
上方的subscribeOn會起做用。複製代碼

就須要查看ObservableObserveOn被觀察者中的subscribeActual方法。和ObservableSubscribeOn被觀察者的subscribeActual方法。有興趣的可自行研究。大體就是subscribeOn的原理就是實現一個runnable,而後再runnable中去開啓上流訂閱,這樣全部觀察者的邏輯都和runnable的線程一致。而後指定runnable的線程就能夠改變全部觀察者的線程。observeOn的原理就是獲取下游傳遞過來的觀察者,而後指定下流傳遞過來的被觀察者的線程。僞代碼以下:

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //整個訂閱都是在runnable中,因此能夠指定上游的線程和全部觀察者的線程
            source.subscribe(parent);
        }
    }複製代碼

上邊代碼註釋很清楚,指定全部觀察者線程,這就讓全部的代碼經過一行代碼進行了線程切換。直到遇到切換下游線程的代碼observeOn。另外關於指定下游的僞代碼:

Scheduler.Worker w = scheduler.createWorker();
這裏的訂閱是轉換完成再指定下游的線程
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));複製代碼
相關文章
相關標籤/搜索