RxJava2源碼初探

前言

衆所周知RxJava有許多優勢好比強大的鏈式調用,方便的線程調度,可是我對其原理仍是瞭解的太少了,所以打算閱讀下源碼,先從一個最基本的例子開始java

1、例子

這個例子只是爲了示例,正常狀況下也不會這麼寫數組

fun main() {
    Observable.create {
        emitter: ObservableEmitter<Int> ->
        println("onSourceSubscribe")
        emitter.onNext(1)
        emitter.onNext(2)
        emitter.onNext(3)
        emitter.onComplete()
    }.subscribe(object : Observer<Int> {
        override fun onComplete() {
            println("onComplete")
        }
        override fun onSubscribe(d: Disposable) {
            println("onObserverSubscribe")
        }
        override fun onNext(t: Int) {
            println("onNext $t")
        }
        override fun onError(e: Throwable) {
            println("onError")
        } 
    })
}
複製代碼

輸出結果:緩存

onObserverSubscribe
onSourceSubscribe
onNext 1
onNext 2
onNext 3
onComplete
複製代碼

那麼爲何會按這個順序輸出呢?從代碼中也能夠看出從始至終也只調用了create、subscribe兩個方法,先來看看create的源碼bash

// Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}
複製代碼

能夠看出當咱們沒有設置onObservableAssembly時其實就是直接建立了一個ObservableCreate實例返回,接着看看subscribe併發

public final void subscribe(Observer<? super T> observer) {
    subscribeActual(observer);
}
複製代碼

能夠看到內部就是調用了subscribeActual方法,而這個方法是個抽象方法,ObservableCreate實現了該方法app

protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);
    source.subscribe(parent);
}
複製代碼

內部主要就是先建立了一個CreateEmitter實例,而後調用observer的onSubscribe方法,最後再調用source的subscribe方法,這就解釋了onObserverSubscribe和onSourceSubscribe的輸出,而source的subscribe方法又調用了三次onNext方法和一次onComplete方法,先看看onNextide

// CreateEmitter.java
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}
複製代碼

若是還沒dispose那麼直接就調用了observer的onNext,這也就解釋了onNext 一、onNext 二、onNext 3三個輸出接着看onComplete函數

// CreateEmitter.java
public void onComplete() {
    if (!isDisposed()) {
        try {
            observer.onComplete();
        } finally {
            dispose();
        }
    }
}
複製代碼

若是還沒dispose就直接調用observer的onComplete,直接解釋了onComplete的輸出,咱們注意到Observer還有一個onError回調,該方法能夠經過調用emitter.onError手動觸發oop

// CreateEmitter.java
public void onError(Throwable t) {
    if (!tryOnError(t)) {
        RxJavaPlugins.onError(t);
    }
}
public boolean tryOnError(Throwable t) {
    if (t == null) {
        t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
    }
    if (!isDisposed()) {
        try {
            observer.onError(t);
        } finally {
            dispose();
        }
        return true;
    }
    return false;
}
複製代碼

能夠看到當還沒被dispose就會調用到observer的onError方法,至此這個基本demo的源碼已經分析完畢。 總結下上述代碼其實就分爲以下幾步ui

  1. 建立Observable實例
  2. 調用該實例的subscribeActual方法
  3. 回調observer的onSubscribe方法
  4. 調用source的subscribe方法
  5. 上述的subscribe方法內部能夠執行若干次onNext,最多一個onError,最多一次onComplete

上述實例的總體流程圖以下

下面來從源碼的角度研究研究RxJava中的幾個基本方法

2、基本方法

首先從最基本的map方法開始

1. map

map方法將每一個onNext事件都調用所傳入的Function實例的apply方法來達到轉化數據源的效果,以下圖所示

示例代碼以下所示

fun main() {
    Observable.create {
        emitter: ObservableEmitter<Int> ->
        println("onSourceSubscribe")
        emitter.onNext(1)
        emitter.onNext(2)
        emitter.onNext(3)
        emitter.onComplete()
    }
    .map {
       it + 1
    }
    .subscribe(object : Observer<Int> {
        override fun onComplete() {
            println("onComplete")
        }
        override fun onSubscribe(d: Disposable) {
            println("onObserverSubscribe")
        }
        override fun onError(e: Throwable) {
            println("onError")
        }
        override fun onNext(t: Int) {
            println("onNext $t")
        }
    })
}
複製代碼

輸出結果:

onObserverSubscribe
onSourceSubscribe
onNext 2
onNext 3
onNext 4
onComplete
複製代碼

很明顯map方法會對全部的next的數據作一次變化這裏是加1,接着看看map的源碼實現

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
複製代碼

內部建立了一個ObservableMap實例並將當前的Observable實例和Function實例傳入,根據本文一開始的分析當調用Observable的subscribe方法其實會調用subscribeActual方法

// ObservableMap.java
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}
複製代碼

建立了MapObserver實例將Observer實例進行包裝而後調用source.subscribe,這個source其實就是上一級Observable實例本例中對應ObservableCreate實例,接着根據上文的分析會調用該MapObserver實例的onNext三次而後調用一次onComplete

// MapObserver.java
public void onNext(T t) {
    // 初始化的時候done爲false
    if (done) {
        return;
    }
    // 初始化的時候就是NONE
    if (sourceMode != NONE) {
        downstream.onNext(null);
        return;
    }
    U v;
    try {
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    downstream.onNext(v);
}
複製代碼

咱們能夠看到內部調用了mapper.apply方法,接着將拿到的結果v當作參數調用downstream的onNext方法,注意這裏的downStream就是外界建立的一個Observer對象。上述實例是總體流程圖以下。注:綠色框表示對象,藍色框表示方法調用,括號內爲簡稱

綜上咱們能夠知道map經過代理下游Observer實例完成數據轉換,接着看看flatMap的源碼實現

2. flatMap

flatMap方法用於將上游的每個onNext事件都轉換成一個Observable實例,以下圖所示

fun main() {
    Observable.create {
        emitter: ObservableEmitter<Int> ->
        println("onSourceSubscribe")
        emitter.onNext(1)
        emitter.onNext(2)
        emitter.onNext(3)
        emitter.onComplete()
    }
    .flatMap {
        Observable.just(it, it + 1)
    }
    .subscribe(object : Observer<Int> {
        override fun onComplete() {
            println("onComplete")
        }

        override fun onSubscribe(d: Disposable) {
            println("onObserverSubscribe")
        }

        override fun onError(e: Throwable) {
            println("onError")
        }

        override fun onNext(t: Int) {
            println("onNext $t")
        }
    })
}
複製代碼

輸出結果:

onObserverSubscribe
onSourceSubscribe
onNext 1
onNext 2
onNext 2
onNext 3
onNext 3
onNext 4
onComplete
複製代碼

很顯然flatMap將每個事件好比1轉換成一個擁有一、2兩個事件的Observable實例,來看看其源碼實現

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return flatMap(mapper, false);
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
    return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
    return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
複製代碼

默認delayErrors爲false表示當一個事件出現異常就會中止整個事件序列,默認併發數爲Int的最大值,默認緩存大小爲128,而後根據這些參數和當前Observable實例構建出一個ObservableFlatMap實例,咱們看看其subscribeActual方法

// ObservableFlatMap.java
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
複製代碼

內部又經過這些參數和下游的Observer實例構建了一個MergeObserver實例,直接看看其onSubscribe方法

// MergeObserver.java
public void onSubscribe(Disposable d) {
    // 只會回調一次下游的onSubscribe方法
    if (DisposableHelper.validate(this.upstream, d)) {
        this.upstream = d;
        downstream.onSubscribe(this);
    }
}
複製代碼

若是已經有上游了就不作任何處理否則進行上游的賦值,而後回調了下游也就是自定義的那個Observer的onSubscribe方法,接着看看其onNext方法是怎麼把一個輸入源轉化成一個Observable的

public void onNext(T t) {
    ObservableSource<? extends U> p;
    p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
    subscribeInner(p);
}
複製代碼

先是調用了傳入的apply方法將每一個onNext數據源轉化爲Observable實例,接着調用subscribeInner方法

void subscribeInner(ObservableSource<? extends U> p) {
    for (;;) {
        if (p instanceof Callable) {
            ...
        } else {
            InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
            if (addInner(inner)) {
                p.subscribe(inner);
            }
            break;
        }
    }
}
boolean addInner(InnerObserver<T, U> inner) {
    for (;;) {
        InnerObserver<?, ?>[] a = observers.get();
        int n = a.length;
        InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
        System.arraycopy(a, 0, b, 0, n);
        b[n] = inner;
        if (observers.compareAndSet(a, b)) {
            return true;
        }
    }
}
複製代碼

爲每一個Observable對象都建立了一個InnerObserver實例,而後將其放入到一個數組中去,最後調用subscribe方法進行訂閱,因爲apply方法返回了一個ObservableFromArray實例,因此看看其subscribeActual方法

// ObservableFromArray.java
public void subscribeActual(Observer<? super T> observer) {
    FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
    observer.onSubscribe(d);
    if (d.fusionMode) {
        return;
    }
    d.run();
}
複製代碼

observer指代InnerObserver,看看其onSubscribe方法

public void onSubscribe(Disposable d) {
    // 第一次調用會返回true,d就是FromArrayDisposable實例其派生自QueueDisposable
    if (DisposableHelper.setOnce(this, d)) {
        if (d instanceof QueueDisposable) {
            QueueDisposable<U> qd = (QueueDisposable<U>) d;
            // 至關於傳入了7
            int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
            if (m == QueueDisposable.SYNC) {
                fusionMode = m;
                queue = qd;
                done = true;
                parent.drain();
                return;
            }
            if (m == QueueDisposable.ASYNC) {
                fusionMode = m;
                queue = qd;
            }
        }
    }
}
//FromArrayDisposable.java
public int requestFusion(int mode) {
    // 很明顯7 & 1 != 0
    if ((mode & SYNC) != 0) {
        fusionMode = true;
        return SYNC;
    }
    return NONE;
}
複製代碼

這裏暫時還無法理解這個fusionMode(混合模式)是幹什麼用的,接着會調用到MergeObserver的drain方法

void drain() {
    // 只會執行一次,循環將全部事件取出
    if (getAndIncrement() == 0) {
        drainLoop();
    }
}
// 當取消了或者出現了錯誤並其dealyErrors爲false時會將全部InnerObserver都dispose掉
boolean checkTerminate() {
    if (cancelled) {
        return true;
    }
    Throwable e = errors.get();
    if (!delayErrors && (e != null)) {
        disposeAll();
        e = errors.terminate();
        if (e != ExceptionHelper.TERMINATED) {
            downstream.onError(e);
        }
        return true;
    }
    return false;
}
void drainLoop() {
    // 這裏的downstream就是外界自定義的Observer實例
    final Observer<? super U> child = this.downstream;
    for (;;) {
        ...
        for (int i = 0; i < n; i++) {
            if (checkTerminate()) {
                return;
            }
            InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
            SimpleQueue<U> q = is.queue;
            if (q != null) {
                for (;;) {
                    U o;
                    try {
                        o = q.poll();
                    } catch (Throwable ex) {
                        ...
                        if (checkTerminate()) {
                            return;
                        }
                        continue sourceLoop;
                    }
                    // 每取出一個便會調用下游的onNext方法
                    child.onNext(o);
                }
                // 會把一個Observable源全部的數據都取完了之後纔會進入下一個
                if (o == null) {
                    break;
                }
            }
            ...
        }
        ..
    }
}
複製代碼

drainLoop內部會從數組中一個個取出InnerObserver實例,並取出所對應的數據源而後每取出一個回調下游Observer的onNext方法,下面用一張圖來總結下實例的流程

3. zip

zip方法經過一個函數將多個Observables的發射物結合到一塊兒,基於這個函數的結果爲每一個結合體發射單個數據項。以下圖所示

下面是一個使用zip操做符的一個簡單例子

fun main() {
    Observable.zip(getObservable1(), getObservable2(), zipper())
        .subscribe(object : Observer<String> {
            override fun onComplete() {
                println("onComplete")
            }
            override fun onSubscribe(d: Disposable) {
                println("onSubscribe")
            }
            override fun onNext(t: String) {
                println("onNext $t")
            }
            override fun onError(e: Throwable) {
                println("onError $e")
            }
        })
}
fun getObservable1(): ObservableSource<String> {
    return Observable.create {
        it.onNext("A")
        it.onNext("B")
        it.onNext("C")
        Thread.sleep(1000)
    }
}
fun getObservable2(): ObservableSource<String> {
    return Observable.create {
        it.onNext("1")
        Thread.sleep(1000)
        it.onNext("2")
        Thread.sleep(1000)
        it.onNext("3")
    }
}
fun zipper(): BiFunction<String, String, String> {
    return BiFunction { s1, s2 -> s1 + s2 }
}
複製代碼

輸出結果以下,onSubscribe輸出後過一秒輸出A1,再過一秒輸出A2,再過一秒輸出A3

onSubscribe
onNext A1
onNext B2
onNext C3
複製代碼

那麼爲何輸出結果會是這個樣子的呢?來看看zip的源碼實現

public static <T1, T2, R> Observable<R> zip( ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, BiFunction<? super T1, ? super T2, ? extends R> zipper) {
    return zipArray(Functions.toFunction(zipper), false, bufferSize(), source1, source2);
}
public static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends R> zipper, boolean delayError, int bufferSize, ObservableSource<? extends T>... sources) {
    return RxJavaPlugins.onAssembly(new ObservableZip<T, R>(sources, null, zipper, bufferSize, delayError));
}
複製代碼

根據源碼能夠看出zip方法其實最終建立了一個ObservableZip實例,直接看其subscribeActual

// ObservableZip
public void subscribeActual(Observer<? super R> observer) {
    ObservableSource<? extends T>[] sources = this.sources;
    int count = sources.length;
    ZipCoordinator<T, R> zc = new ZipCoordinator<T, R>(observer, zipper, count, delayError);
    zc.subscribe(sources, bufferSize);
}
ZipCoordinator(Observer<? super R> actual,
        Function<? super Object[], ? extends R> zipper,
        int count, boolean delayError) {
    this.downstream = actual;
    this.zipper = zipper;
    this.observers = new ZipObserver[count];
    this.row = (T[])new Object[count];
    this.delayError = delayError;
}
public void subscribe(ObservableSource<? extends T>[] sources, int bufferSize) {
    ZipObserver<T, R>[] s = observers;
    int len = s.length;
    for (int i = 0; i < len; i++) {
        s[i] = new ZipObserver<T, R>(this, bufferSize);
    }
    this.lazySet(0);
    downstream.onSubscribe(this);
    for (int i = 0; i < len; i++) {
        if (cancelled) {
            return;
        }
        sources[i].subscribe(s[i]);
    }
}
複製代碼

能夠看出ZipCoordinator的subscribe內部建立了輸入源大小的ZipObserver實例,而後調用每一個輸入源的subscribe方法,這樣當輸入源發送事件時就會調用ZipObserver的onNext方法

// ZipObserver.java
public void onNext(T t) {
    queue.offer(t);
    parent.drain();
}
複製代碼

主要看看ZipCoordinator的drain方法

public void drain() {
    final ZipObserver<T, R>[] zs = observers;
    final Observer<? super R> a = downstream;
    final T[] os = row;
    final boolean delayError = this.delayError;
    for (;;) {
        for (;;) {
            int i = 0;
            int emptyCount = 0;
            for (ZipObserver<T, R> z : zs) {
                if (os[i] == null) {
                    boolean d = z.done;
                    T v = z.queue.poll();
                    boolean empty = v == null;
                    if (!empty) {
                        os[i] = v;
                    } else {
                        emptyCount++;
                    }
                } else {
                    // 主要是錯誤判斷
                }
                i++;
            }
            if (emptyCount != 0) {
                break;
            }
            R v = zipper.apply(os.clone());
            a.onNext(v);
            Arrays.fill(os, null);
        }
    }
}
複製代碼

結合示例,首先Obserable1會發送一個A事件,將其放入到了一個隊列中去,接着drain遍歷全部的ZipObserver,第一個ZipObserver能夠從隊列中事件將其賦值給os[0],第二個取不到所以emptyCount++,而後退出循環。接着Observable1又發送了一個B事件,再將其放入隊列中,而後執行drain,此次由於os[0]已經不爲null因此不會從隊列中取,os[1]仍是null,退出循環繼續執行,接着Observable1再次發送一個C事件,這個跟B事件處理邏輯同樣,再接着Observable2會發送一個1事件,將其放入隊列,執行drainos[1]賦值成1,因爲emptyCount等於0,所以會執行zipper.apply,這個方法內部會回調傳入的BiFunction的apply方法(示例中僅僅進行了字符串拼接),獲取到結果A1,回調下游的onNext方法,而後將row這個數組置空,接着線程睡眠1秒,而後再次發送事件2,將其放入隊列中,執行drain,方法內部遍歷兩個ZipObserver而且都能從隊列中取到事件,因此emptyCount等於0,接着就會執行apply而後獲取到結果B2,調用下游的onNext,後面Observable2的3事件也跟2事件同樣就不說了。

3、總結

經過分析map和flatMap兩個方法能夠總結出以下幾個的結論

  1. subscribeActual方法老是會調用上游的subscribe方法
  2. onSubscribe方法老是會調用下游的onSubscribe方法
  3. Observer實例的onSubscribe會在事件發射前調用
  4. RxJava提供的一些操做符其實會在內部建立本身的Observable和Observer實例,其目的無非是爲了對下游的Observer進行封裝還有就是讓下游subscribe調用的是本身建立的Observable實例
相關文章
相關標籤/搜索