RxJava2源碼分析——FlatMap和ConcatMap及其相關併發編程分析

本文章主要是對RxJava2FlatMapConcatMap這兩個操做符進行源碼分析,在閱讀以前,能夠先閱讀如下文章:java

RxJava2源碼分析——訂閱react

RxJava2源碼分析——線程切換android

RxJava2源碼分析——Map操做符git

本文章用的RxJavaRxAndroid版本以下:github

implementation 'io.reactivex.rxjava2:rxjava:2.2.6'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
複製代碼

FlatMap

FlatMap操做符能夠將一個發射數據的Observable轉變爲多個Observables,而後將這些發射的數據合併進一個單獨的Observable,發射的數據不保證有序數組

咱們先寫段示例代碼,爲了方便理解,在調用FlatMap方法的時候,我就不用上Lambda鏈式調用了,代碼以下:緩存

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("Tan:");
    emitter.onNext("Jia:");
    emitter.onNext("Jun:");
    emitter.onComplete();
})
        .flatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String s) {
                List<String> list = new ArrayList<>();

                for (int i = 0; i < 3; i++) {
                    list.add(s + i);
                }
                return Observable.fromIterable(list);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                // no implementation
            }

            @Override
            public void onNext(String s) {
                Log.i("TanJiaJun", s);
            }

            @Override
            public void onError(Throwable e) {
                // no implementation
            }

            @Override
            public void onComplete() {
                // no implementation
            }
        });
複製代碼

Log以下:多線程

FlatMapLog.png

源碼分析

咱們看下flatMap方法,分析可知,會依次調用如下方法,代碼以下:app

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    // 注意:參數delayErrors傳入的是false
    return flatMap(mapper, false);
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
    // 注意:參數maxConcurrency傳入的是Integer.MAX_VALUE
    return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
    // bufferSize是指數據緩衝區的大小,與背壓(Backpressure)有關
    return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    // 這裏有個判斷,判斷this是否是ScalarCallable的實現類,詳細解釋請看下面
    if (this instanceof ScalarCallable) {
        @SuppressWarnings("unchecked")
        T v = ((ScalarCallable<T>)this).call();
        if (v == null) {
            return empty();
        }
        return ObservableScalarXMap.scalarXMap(v, mapper);
    }
    // 若是不是ScalarCallable的實現類就會調用如下方法
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
複製代碼

bufferSize()是數據緩衝區的大小,默認是128,可從如下代碼得知:框架

// Observable.java
public static int bufferSize() {
    return Flowable.bufferSize();
}

// Flowable.java
public static int bufferSize() {
    return BUFFER_SIZE;
}

// Flowable.java
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
    BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
}
複製代碼

ScalarCallable是一個接口,它的實現類有6個FlowableEmptyFlowableJustMaybeEmptyMaybeJustObservableEmptyObservableJust,分別對應這6個方法:Flowable.empty()Flowable.just(T item)Maybe.empty()Maybe.just(T item)Observable.empty()Observable.just(T item)

根據前幾篇文章的經驗可知,咱們只要看ObservableFlatMap這個類就好了,代碼以下:

// ObservableFlatMap.java
public ObservableFlatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
    super(source);
    this.mapper = mapper;
    this.delayErrors = delayErrors;
    this.maxConcurrency = maxConcurrency;
    this.bufferSize = bufferSize;
}

@Override
public void subscribeActual(Observer<? super U> t) {

    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
        return;
    }

    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
複製代碼

咱們也像前幾篇文章那樣,先看下subscribeActual方法,這裏會先調用ObservableScalarXMap.tryScalarXMapSubscribe方法,若是是true的話就return,這個方法中會判斷source是否是Callable的實現類,若是是的話就會委託ObservableScalarXMap來發射事件,而後返回true,不然返回false,上面說的ScalarCallable接口就是繼承Callable接口,因此咱們主要是看下面的邏輯,調用了sourcesubscribe方法,而且傳入new出來的MergeObserver,咱們來看下MergeObserver這個類,要注意的點我都寫上註釋了,代碼以下:

// ObservableFlatMap.java
// MergeObserver繼承AtomicInteger
static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> {

    private static final long serialVersionUID = -2117620485640801370L;

    final Observer<? super U> downstream;
    final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
    final boolean delayErrors;
    final int maxConcurrency;
    final int bufferSize;

    volatile SimplePlainQueue<U> queue;

    volatile boolean done;

    final AtomicThrowable errors = new AtomicThrowable();

    volatile boolean cancelled;

    // 存放InnerObserver的數組
    final AtomicReference<InnerObserver<?, ?>[]> observers;

    static final InnerObserver<?, ?>[] EMPTY = new InnerObserver<?, ?>[0];

    static final InnerObserver<?, ?>[] CANCELLED = new InnerObserver<?, ?>[0];

    Disposable upstream;

    long uniqueId;
    long lastId;
    int lastIndex;

    Queue<ObservableSource<? extends U>> sources;

    int wip;

    MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        this.downstream = actual;
        // mapper是Function接口的實現類
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.maxConcurrency = maxConcurrency;
        this.bufferSize = bufferSize;
        // 根據上面的代碼可知,傳入的Integer.MAX_VALUE,因此這段邏輯不會
        if (maxConcurrency != Integer.MAX_VALUE) {
            sources = new ArrayDeque<ObservableSource<? extends U>>(maxConcurrency);
        }
        // 建立一個InnerObserver數組的原子引用
        this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY);
    }

    @Override
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.validate(this.upstream, d)) {
            this.upstream = d;
            downstream.onSubscribe(this);
        }
    }

    @Override
    public void onNext(T t) {
        // safeguard against misbehaving sources
        if (done) {
            return;
        }
        ObservableSource<? extends U> p;
        try {
            // 調用mapper的apply方法
            p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            upstream.dispose();
            onError(e);
            return;
        }

        // 在上面也分析過了,傳入的Integer.MAX_VALUE,因此這段邏輯不會執行
        if (maxConcurrency != Integer.MAX_VALUE) {
            synchronized (this) {
                if (wip == maxConcurrency) {
                    sources.offer(p);
                    return;
                }
                wip++;
            }
        }

        subscribeInner(p);
    }

    @SuppressWarnings("unchecked")
    void subscribeInner(ObservableSource<? extends U> p) {
        // 一個死循環
        for (;;) {
            // 判斷p是否是Callable接口的實現類,上面分析過,這裏再也不贅述
            if (p instanceof Callable) {
                if (tryEmitScalar(((Callable<? extends U>)p)) && maxConcurrency != Integer.MAX_VALUE) {
                    boolean empty = false;
                    synchronized (this) {
                        p = sources.poll();
                        if (p == null) {
                            wip--;
                            empty = true;
                        }
                    }
                    if (empty) {
                        drain();
                        break;
                    }
                } else {
                    break;
                }
            } else {
                // 若是p不是Callable接口的實現類,建立InnerObserver
                InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                // 調用addInner方法,將InnerObserver存放到observers數組中,下面會解析
                if (addInner(inner)) {
                    // 對每次建立的InnerObserver進行訂閱
                    p.subscribe(inner);
                }
                break;
            }
        }
    }

    boolean addInner(InnerObserver<T, U> inner) {
        // 又是一個死循環
        for (;;) {
            // 從observers數組取出InnerObserver
            InnerObserver<?, ?>[] a = observers.get();
            if (a == CANCELLED) {
                // 若是是CANCELLED狀態的就取消訂閱
                inner.dispose();
                return false;
            }
            int n = a.length;
            // 建立新的InnerObserver數組,大小爲a數組大小加1
            InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
            // 將a數組數據複製到b數組
            System.arraycopy(a, 0, b, 0, n);
            // 將新建的InnerObserver放到b數組最後的位置
            b[n] = inner;
            // 將b數組數據原子性地更新到a數組中
            if (observers.compareAndSet(a, b)) {
                // 若是成功就返回true
                return true;
            }
        }
    }

    // 移除InnerObserver的方法
    void removeInner(InnerObserver<T, U> inner) {
        for (;;) {
            InnerObserver<?, ?>[] a = observers.get();
            int n = a.length;
            if (n == 0) {
                return;
            }
            int j = -1;
            for (int i = 0; i < n; i++) {
                if (a[i] == inner) {
                    j = i;
                    break;
                }
            }
            if (j < 0) {
                return;
            }
            InnerObserver<?, ?>[] b;
            if (n == 1) {
                b = EMPTY;
            } else {
                b = new InnerObserver<?, ?>[n - 1];
                System.arraycopy(a, 0, b, 0, j);
                System.arraycopy(a, j + 1, b, j, n - j - 1);
            }
            if (observers.compareAndSet(a, b)) {
                return;
            }
        }
    }

    boolean tryEmitScalar(Callable<? extends U> value) {
        U u;
        try {
            u = value.call();
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            errors.addThrowable(ex);
            drain();
            return true;
        }

        if (u == null) {
            return true;
        }

        if (get() == 0 && compareAndSet(0, 1)) {
            downstream.onNext(u);
            if (decrementAndGet() == 0) {
                return true;
            }
        } else {
            SimplePlainQueue<U> q = queue;
            if (q == null) {
                if (maxConcurrency == Integer.MAX_VALUE) {
                    q = new SpscLinkedArrayQueue<U>(bufferSize);
                } else {
                    q = new SpscArrayQueue<U>(maxConcurrency);
                }
                queue = q;
            }

            if (!q.offer(u)) {
                onError(new IllegalStateException("Scalar queue full?!"));
                return true;
            }
            if (getAndIncrement() != 0) {
                return false;
            }
        }
        drainLoop();
        return true;
    }

    void tryEmit(U value, InnerObserver<T, U> inner) {
        // 判斷get()是否是等於0,若是等於0就將值設爲1
        if (get() == 0 && compareAndSet(0, 1)) {
            // 調用下游的onNext方法
            downstream.onNext(value);
            // 發射完數據後,判斷自減1後的值是否是等於0,若是等於0,證實全部數據發射完成,方法結束
            if (decrementAndGet() == 0) {
                return;
            }
        } else {
            SimpleQueue<U> q = inner.queue;
            if (q == null) {
                // 建立SpscLinkedArrayQueue隊列,它是一個單生產、單消費的數組隊列,它能夠在消費者變慢的狀況下分配新的數組
                q = new SpscLinkedArrayQueue<U>(bufferSize);
                inner.queue = q;
            }
            // 將接收的上游數據緩存到隊列中
            q.offer(value);
            // 判斷值是否是不等於0後自增1,若是不等於0就結束方法
            if (getAndIncrement() != 0) {
                return;
            }
        }
        // 調用drainLoop方法
        drainLoop();
    }

    @Override
    public void onError(Throwable t) {
        if (done) {
            RxJavaPlugins.onError(t);
            return;
        }
        if (errors.addThrowable(t)) {
            done = true;
            drain();
        } else {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public void onComplete() {
        if (done) {
            return;
        }
        done = true;
        drain();
    }

    @Override
    public void dispose() {
        if (!cancelled) {
            cancelled = true;
            if (disposeAll()) {
                Throwable ex = errors.terminate();
                if (ex != null && ex != ExceptionHelper.TERMINATED) {
                    RxJavaPlugins.onError(ex);
                }
            }
        }
    }

    @Override
    public boolean isDisposed() {
        return cancelled;
    }

    void drain() {
        if (getAndIncrement() == 0) {
            drainLoop();
        }
    }

    void drainLoop() {
        final Observer<? super U> child = this.downstream;
        int missed = 1;
        for (;;) {
            // 檢查訂閱是否是被終止,若是是,方法結束
            if (checkTerminate()) {
                return;
            }
            // 將MergeObserver內的變量queue複製給svq,queue是一個隊列
            SimplePlainQueue<U> svq = queue;

            if (svq != null) {
                for (;;) {
                    // 再次檢查訂閱是否是被終止,若是是,方法結束
                    if (checkTerminate()) {
                        return;
                    }

                    // 從隊列中取出數據
                    U o = svq.poll();

                    // 若是是null的話,跳出該循環
                    if (o == null) {
                        break;
                    }

                    // 調用下游Observer的onNext方法,發射數據
                    child.onNext(o);
                }
            }

            boolean d = done;
            svq = queue;
            InnerObserver<?, ?>[] inner = observers.get();
            int n = inner.length;

            int nSources = 0;
            if (maxConcurrency != Integer.MAX_VALUE) {
                synchronized (this) {
                    nSources = sources.size();
                }
            }

            if (d && (svq == null || svq.isEmpty()) && n == 0 && nSources == 0) {
                Throwable ex = errors.terminate();
                if (ex != ExceptionHelper.TERMINATED) {
                    // 判斷Throwable是否是null
                    if (ex == null) {
                        // 調用下游Observer的onComplete方法
                        child.onComplete();
                    } else {
                        // 調用下游Observer的onError方法
                        child.onError(ex);
                    }
                }
                return;
            }

            // 處理數組數據
            int innerCompleted = 0;
            if (n != 0) {
                long startId = lastId;
                int index = lastIndex;

                if (n <= index || inner[index].id != startId) {
                    if (n <= index) {
                        index = 0;
                    }
                    int j = index;
                    for (int i = 0; i < n; i++) {
                        if (inner[j].id == startId) {
                            break;
                        }
                        j++;
                        if (j == n) {
                            j = 0;
                        }
                    }
                    index = j;
                    lastIndex = j;
                    lastId = inner[j].id;
                }

                int j = index;
                sourceLoop:
                for (int i = 0; i < n; i++) {
                    if (checkTerminate()) {
                        return;
                    }

                    @SuppressWarnings("unchecked")
                    InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
                    SimpleQueue<U> q = is.queue;
                    if (q != null) {
                        // 處理InnerObserver數組中的每個InnerObserver對象
                        for (;;) {
                            U o;
                            try {
                                o = q.poll();
                            } catch (Throwable ex) {
                                Exceptions.throwIfFatal(ex);
                                is.dispose();
                                errors.addThrowable(ex);
                                if (checkTerminate()) {
                                    return;
                                }
                                removeInner(is);
                                innerCompleted++;
                                j++;
                                if (j == n) {
                                    j = 0;
                                }
                                continue sourceLoop;
                            }
                            if (o == null) {
                                break;
                            }

                            // 調用onNext方法,發射InnerObserver的數據
                            child.onNext(o);

                            // 檢查訂閱是否是被終止,若是是,方法結束
                            if (checkTerminate()) {
                                return;
                            }
                        }
                    }

                    boolean innerDone = is.done;
                    SimpleQueue<U> innerQueue = is.queue;
                    // 檢查隊列裏的數據是否處理完畢
                    if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
                        // 若是是,將對應的InnerObserver從數組中移除
                        removeInner(is);
                        // 檢查訂閱是否是被終止,若是是,方法結束
                        if (checkTerminate()) {
                            return;
                        }
                        // innerCompleted自增
                        innerCompleted++;
                    }

                    j++;
                    if (j == n) {
                        j = 0;
                    }
                }
                lastIndex = j;
                lastId = inner[j].id;
            }

            // 判斷innerCompleted是否是不等於0,也就是判斷當前InnerObserver是否處理完畢
            if (innerCompleted != 0) {
                if (maxConcurrency != Integer.MAX_VALUE) {
                    while (innerCompleted-- != 0) {
                        ObservableSource<? extends U> p;
                        synchronized (this) {
                            p = sources.poll();
                            if (p == null) {
                                wip--;
                                continue;
                            }
                        }
                        subscribeInner(p);
                    }
                }
                // 結束當前當前循環,進入下一個循環,繼續處理下一個InnerObserver
                continue;
            }
            // 數據發射完畢後,將值自減
            missed = addAndGet(-missed);
            // 若是missed等於0,證實隊列中的全部數據所有發射完畢,跳出循環,方法結束
            if (missed == 0) {
                break;
            }
        }
    }

    // 檢查訂閱是否是被終止的方法
    boolean checkTerminate() {
        if (cancelled) {
            return true;
        }
        Throwable e = errors.get();
        if (!delayErrors && (e != null)) {
            disposeAll();
            e = errors.terminate();
            if (e != ExceptionHelper.TERMINATED) {
                downstream.onError(e);
            }
            return true;
        }
        return false;
    }

    boolean disposeAll() {
        upstream.dispose();
        InnerObserver<?, ?>[] a = observers.get();
        if (a != CANCELLED) {
            a = observers.getAndSet(CANCELLED);
            if (a != CANCELLED) {
                for (InnerObserver<?, ?> inner : a) {
                    inner.dispose();
                }
                return true;
            }
        }
        return false;
    }
}

// InnerObserver繼承AtomicReference
static final class InnerObserver<T, U> extends AtomicReference<Disposable> implements Observer<U> {

    private static final long serialVersionUID = -4606175640614850599L;
    final long id;
    final MergeObserver<T, U> parent;

    volatile boolean done;
    volatile SimpleQueue<U> queue;

    int fusionMode;

    InnerObserver(MergeObserver<T, U> parent, long id) {
        this.id = id;
        this.parent = parent;
    }

    @Override
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.setOnce(this, d)) {
            if (d instanceof QueueDisposable) {
                @SuppressWarnings("unchecked")
                QueueDisposable<U> qd = (QueueDisposable<U>) d;

                // requestFusion和背壓(Backpressure)有關,由於咱們這裏沒用到相關的類,因此fusionMode的值爲0
                int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
                // 訂閱類型是同步
                if (m == QueueDisposable.SYNC) {
                    fusionMode = m;
                    queue = qd;
                    done = true;
                    // 發射數據
                    parent.drain();
                    return;
                }
                // 訂閱類型是異步
                if (m == QueueDisposable.ASYNC) {
                    fusionMode = m;
                    queue = qd;
                }
            }
        }
    }

    @Override
    public void onNext(U t) {
        // 根據上面分析可知,fusionMode的值爲0,因此等於QueueDisposable.NONE
        if (fusionMode == QueueDisposable.NONE) {
            // 調用tryEmit方法
            parent.tryEmit(t, this);
        } else {
            parent.drain();
        }
    }

    @Override
    public void onError(Throwable t) {
        if (parent.errors.addThrowable(t)) {
            if (!parent.delayErrors) {
                parent.disposeAll();
            }
            done = true;
            parent.drain();
        } else {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public void onComplete() {
        done = true;
        parent.drain();
    }

    public void dispose() {
        DisposableHelper.dispose(this);
    }
}
複製代碼

ConcatMap

ConcatMap操做符能夠將一個發射數據的Observable轉變爲多個Observables,而後將這些發射的數據合併進一個單獨的Observable,發射的數據保證有序

咱們先寫段示例代碼,爲了方便理解,在調用ConcatMap方法的時候,我就不用上Lambda鏈式調用了,代碼以下:

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("Tan:");
    emitter.onNext("Jia:");
    emitter.onNext("Jun:");
    emitter.onComplete();
})
        .concatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String s) {
                List<String> list = new ArrayList<>();

                for (int i = 0; i < 3; i++) {
                    list.add(s + i);
                }
                return Observable.fromIterable(list);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                // no implementation
            }

            @Override
            public void onNext(String s) {
                Log.i("TanJiaJun", s);
            }

            @Override
            public void onError(Throwable e) {
                // no implementation
            }

            @Override
            public void onComplete() {
                // no implementation
            }
        });
複製代碼

Log以下:

ConcatMapLog.png

源碼分析

咱們看下ConcatMap方法,分析可知,會依次調用如下方法,代碼以下:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
    return concatMap(mapper, 2);
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    ObjectHelper.verifyPositive(prefetch, "prefetch");
    // 判斷this是否是ScalarCallable,上面分析過了,這裏再也不贅述
    if (this instanceof ScalarCallable) {
        @SuppressWarnings("unchecked")
        T v = ((ScalarCallable<T>)this).call();
        if (v == null) {
            return empty();
        }
        return ObservableScalarXMap.scalarXMap(v, mapper);
    }
    // 最後一個參數delayErrors傳入的是ErrorMode.IMMEDIATE
    return RxJavaPlugins.onAssembly(new ObservableConcatMap<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE));
}
複製代碼

根據前幾篇文章的經驗可知,咱們只要看ObservableConcatMap這個類就好了,代碼以下:

public ObservableConcatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper, int bufferSize, ErrorMode delayErrors) {
    super(source);
    this.mapper = mapper;
    this.delayErrors = delayErrors;
    this.bufferSize = Math.max(8, bufferSize);
}

@Override
public void subscribeActual(Observer<? super U> observer) {

    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, observer, mapper)) {
        return;
    }

    // 這裏delayErrors傳入的是ErrorMode.IMMEDIATE
    if (delayErrors == ErrorMode.IMMEDIATE) {
        // 對observer進行序列化
        SerializedObserver<U> serial = new SerializedObserver<U>(observer);
        // 調用訂閱方法,而且傳入new出來的SourceObserver
        source.subscribe(new SourceObserver<T, U>(serial, mapper, bufferSize));
    } else {
        source.subscribe(new ConcatMapDelayErrorObserver<T, U>(observer, mapper, bufferSize, delayErrors == ErrorMode.END));
    }
}
複製代碼

咱們看下SourceObserver這個類,有些源碼的邏輯和FlatMap比較類似,這裏就再也不贅述了,代碼以下:

// ObservableConcatMap.java
static final class SourceObserver<T, U> extends AtomicInteger implements Observer<T>, Disposable {

    private static final long serialVersionUID = 8828587559905699186L;
    final Observer<? super U> downstream;
    final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
    final InnerObserver<U> inner;
    final int bufferSize;

    SimpleQueue<T> queue;

    Disposable upstream;

    volatile boolean active;

    volatile boolean disposed;

    volatile boolean done;

    int fusionMode;

    SourceObserver(Observer<? super U> actual,
                            Function<? super T, ? extends ObservableSource<? extends U>> mapper, int bufferSize) {
        this.downstream = actual;
        this.mapper = mapper;
        this.bufferSize = bufferSize;
        this.inner = new InnerObserver<U>(actual, this);
    }

    @Override
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.validate(this.upstream, d)) {
            this.upstream = d;
            if (d instanceof QueueDisposable) {
                @SuppressWarnings("unchecked")
                QueueDisposable<T> qd = (QueueDisposable<T>) d;

                // requestFusion和背壓(Backpressure)有關,由於咱們這裏沒用到相關的類,因此fusionMode的值爲0
                int m = qd.requestFusion(QueueDisposable.ANY);
                // 訂閱關係是同步
                if (m == QueueDisposable.SYNC) {
                    fusionMode = m;
                    queue = qd;
                    done = true;

                    downstream.onSubscribe(this);

                    drain();
                    return;
                }

                // 訂閱關係是異步
                if (m == QueueDisposable.ASYNC) {
                    fusionMode = m;
                    queue = qd;

                    downstream.onSubscribe(this);

                    return;
                }
            }

            // 建立一個大小爲數據緩衝區大小的隊列
            queue = new SpscLinkedArrayQueue<T>(bufferSize);

            // 調用下游Observer的onSubscribe方法
            downstream.onSubscribe(this);
        }
    }

    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
        // 根據上面分析可知,fusionMode的值爲0,因此等於QueueDisposable.NONE
        if (fusionMode == QueueDisposable.NONE) {
            // 將接收的上游數據緩存到隊列中
            queue.offer(t);
        }
        // 調用drain方法
        drain();
    }

    @Override
    public void onError(Throwable t) {
        if (done) {
            RxJavaPlugins.onError(t);
            return;
        }
        done = true;
        dispose();
        downstream.onError(t);
    }

    @Override
    public void onComplete() {
        if (done) {
            return;
        }
        done = true;
        drain();
    }

    void innerComplete() {
        active = false;
        drain();
    }

    @Override
    public boolean isDisposed() {
        return disposed;
    }

    @Override
    public void dispose() {
        disposed = true;
        inner.dispose();
        upstream.dispose();

        if (getAndIncrement() == 0) {
            queue.clear();
        }
    }

    void drain() {
        // 判斷值是否是不等於0後自增1,若是不等於0就結束方法
        if (getAndIncrement() != 0) {
            return;
        }

        for (;;) {
            // 判斷是否是結束訂閱
            if (disposed) {
                // 若是是,就清空隊列,結束方法
                queue.clear();
                return;
            }
            // active是用volatile修飾,active是用來判斷當前是否還有InnerObserver在發射,因此能保證發射InnerObserver是有序的,這點和FlatMap不同
            if (!active) {

                boolean d = done;

                T t;

                try {
                    // 從隊列取出數據
                    t = queue.poll();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    dispose();
                    queue.clear();
                    downstream.onError(ex);
                    return;
                }

                boolean empty = t == null;

                // 判斷是否發射完畢,同時隊列是否還有數據
                if (d && empty) {
                    // 若是發射完畢,同時隊列是沒有數據的話,結束訂閱,調用下游Observer的onComplete方法
                    disposed = true;
                    downstream.onComplete();
                    return;
                }

                // 再次判斷隊列是否還有數據
                if (!empty) {
                    // 若是隊列還有數據,執行如下邏輯
                    ObservableSource<? extends U> o;

                    try {
                        // 調用mapper的apply方法
                        o = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        dispose();
                        queue.clear();
                        downstream.onError(ex);
                        return;
                    }

                    // active設爲true,表示當前還在發射數據,其餘任務就進入不了上面所說的判斷了
                    active = true;
                    // 調用下游Observer的訂閱方法
                    o.subscribe(inner);
                }
            }

            // 發射完數據後,判斷自減1後的值是否是等於0,若是等於0,證實全部數據發射完成,方法結束
            if (decrementAndGet() == 0) {
                break;
            }
        }
    }

    // InnerObserver繼承AtomicReference
    static final class InnerObserver<U> extends AtomicReference<Disposable> implements Observer<U> {

        private static final long serialVersionUID = -7449079488798789337L;

        final Observer<? super U> downstream;
        final SourceObserver<?, ?> parent;

        InnerObserver(Observer<? super U> actual, SourceObserver<?, ?> parent) {
            this.downstream = actual;
            this.parent = parent;
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.replace(this, d);
        }

        @Override
        public void onNext(U t) {
            // 調用下游Observer的onNext方法
            downstream.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            parent.dispose();
            downstream.onError(t);
        }

        @Override
        public void onComplete() {
            parent.innerComplete();
        }

        void dispose() {
            DisposableHelper.dispose(this);
        }
    }
}
複製代碼

FlatMap和ConcatMap對比

在作對比以前,我改下上面的兩段示例代碼,都調用delay方法,延遲1s發射,代碼以下:

FlatMap:

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("Tan:");
    emitter.onNext("Jia:");
    emitter.onNext("Jun:");
    emitter.onComplete();
})
        .flatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String s) {
                List<String> list = new ArrayList<>();

                for (int i = 0; i < 3; i++) {
                    list.add(s + i);
                }
                return Observable.fromIterable(list).delay(1, TimeUnit.SECONDS);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                // no implementation
            }

            @Override
            public void onNext(String s) {
                Log.i("TanJiaJun", s);
            }

            @Override
            public void onError(Throwable e) {
                // no implementation
            }

            @Override
            public void onComplete() {
                // no implementation
            }
        });
複製代碼

Log以下:

FlatMapDelayLog.png

ConcatMap:

Observable.create((ObservableOnSubscribe<String>) emitter -> {
    emitter.onNext("Tan:");
    emitter.onNext("Jia:");
    emitter.onNext("Jun:");
    emitter.onComplete();
})
        .concatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String s) {
                List<String> list = new ArrayList<>();

                for (int i = 0; i < 3; i++) {
                    list.add(s + i);
                }
                return Observable.fromIterable(list).delay(1, TimeUnit.SECONDS);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                // no implementation
            }

            @Override
            public void onNext(String s) {
                Log.i("TanJiaJun", s);
            }

            @Override
            public void onError(Throwable e) {
                // no implementation
            }

            @Override
            public void onComplete() {
                // no implementation
            }
        });
複製代碼

Log以下:

ConcatMapLog.png

我這裏發射了3組數據,須要注意的是,咱們發現FlatMap3組數據都是不按順序的,可是每組數據裏發射的數據都是按順序的ConcatMap3組數據都是按順序的,並且每組數據裏發射的數據也是按順序的,那爲何這樣呢?其實上面閱讀源碼的時候也稍微說起了下,這裏再詳細解釋下,由於FlatMap對應的MergeObserverConcatMap對應的SourceObserver都繼承了AtomicInteger,在解釋這個類前,先說下幾個概念。

volatile

volatile的語義:

  1. volatile修飾的變量的操做不保證是原子性的。
  2. Java內存模型不會對volatile指令進行重排序優化,能夠保證對volatile變量的操做是按照指令的順序執行。
  3. volatile修飾的變量能保證對全部線程的可見性,每次修改值都會馬上同步回主內存,每次讀取值都會從主內存中從新讀取。

指令重排序

處理器經過緩存可以從數量級上下降內存延遲的成本,由於對主存的一次訪問須要花費硬件屢次的時鐘週期,而這些緩存爲了性能會從新排列待定內存操做的順序,也就是重排序,這裏有個前提,**Java內存模型(Java Memory Model)經過先行發生原則(happen-before)**保證順序執行語義,對一個volatile變量的寫操做先行發生於後面對這個變量的讀操做,這裏的前後指的是時間上的順序,在這裏舉個例子:

Object object = new Object();
複製代碼

這條語句會轉成多條彙編指令,大體作了如下三件事情

  1. Object類實例分配內存空間。
  2. 初始化Object對象
  3. object變量指向剛分配的內存,這時候object變量就不是null了。

由於Java編譯器容許指令重排序對其優化,上面這3個步驟可能1->2->3或者是1->3->2,可是步驟1確定是第一個執行的,由於作指令重排序有個前提,就是必須遵循先行發生原則,保證最後是正確的執行結果,執行步驟2步驟3的前提是步驟1,必須爲實例分配內存空間才能去初始化對象或者將變量指向分配的內存。

單線程這樣的優化是沒有問題的,可是在多線程就會有問題了,這裏我舉個例子,咱們會使用雙重檢查鎖定(Double Check Locking,簡稱DCL)來實現單例,它是懶漢模式,代碼以下:

package com.tanjiajun.rxjavademo;

/** * Created by TanJiaJun on 2019-11-14. */
public class Singleton {

    // mInstance用volatile修飾,保證指令執行的順序
    private static volatile Singleton mInstance;

    // 私有構造函數
    private Singleton() {
        // 防止經過反射調用構造函數形成單例失效
        if (mInstance != null) {
            throw new RuntimeException("Cannot construct a singleton more than once.");
        }
    }

    // 獲取單例的方法
    public static Singleton getInstance() {
        // 第一次判斷mInstance是否爲null,判斷是否須要同步,提升性能和效率
        if (mInstance == null) {
            synchronized (Singleton.class) {
                // 第二次判斷mInstance是否爲null,判斷是否已經建立實例
                if (mInstance == null) {
                    mInstance = new Singleton();
                }
            }
        }
        // 返回mInstance
        return mInstance;
    }

}
複製代碼

建立實例的這條語句會轉成多條彙編指令,大概作了以下3件事情

  1. Singleton類實例分配內存空間。
  2. 初始化Singleton對象
  3. mInstance變量指向剛分配的內存,這時候mInstance變量就不是null了。

若是咱們不用volatile修飾,也不加同步鎖的話,假設有兩個線程,分別是AB,若是線程A建立實例步驟是1->3->2,當它執行步驟3的時候,這時候mInstance變量已經不是null了,線程B也執行getInstance方法,進入第一個判斷,由於mInstance變量已經不是null了,因此就會建立另一個實例了,形成單例失效

CAS操做

CAS(Compare And Swap),翻譯過來就是比較和交換,它能夠防止共享變量出現髒讀髒寫問題,保證了原子操做CAS也是樂觀鎖的一種實現方式,樂觀鎖是什麼呢?樂觀鎖老是假設最好的狀況,因此在數據進行提交更新的時候纔會去檢查是否有衝突。

AtomicInteger

咱們看下AtomicInteger的源碼,代碼以下:

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // 用到sun.misc.Unsafe
    private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
    private static final long VALUE;

    static {
        try {
            // VALUE是內存偏移值
            VALUE = U.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (ReflectiveOperationException e) {
            throw new Error(e);
        }
    }

    // 用volatile修飾value,保證指令的執行順序
    private volatile int value;

    public AtomicInteger(int initialValue) {
        value = initialValue;
    }

    public AtomicInteger() {
    }

    public final int get() {
        return value;
    }

    public final void set(int newValue) {
        value = newValue;
    }

    public final void lazySet(int newValue) {
        U.putOrderedInt(this, VALUE, newValue);
    }

    public final int getAndSet(int newValue) {
        return U.getAndSetInt(this, VALUE, newValue);
    }

    // 主要看這個方法,它就是CAS操做,我會在下面解析
    public final boolean compareAndSet(int expect, int update) {
        return U.compareAndSwapInt(this, VALUE, expect, update);
    }

    public final boolean weakCompareAndSet(int expect, int update) {
        return U.compareAndSwapInt(this, VALUE, expect, update);
    }

    public final int getAndIncrement() {
        return U.getAndAddInt(this, VALUE, 1);
    }

    public final int getAndDecrement() {
        return U.getAndAddInt(this, VALUE, -1);
    }

    public final int getAndAdd(int delta) {
        return U.getAndAddInt(this, VALUE, delta);
    }

    public final int incrementAndGet() {
        return U.getAndAddInt(this, VALUE, 1) + 1;
    }

    public final int decrementAndGet() {
        return U.getAndAddInt(this, VALUE, -1) - 1;
    }

    public final int addAndGet(int delta) {
        return U.getAndAddInt(this, VALUE, delta) + delta;
    }

    public final int getAndUpdate(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    public final int updateAndGet(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            prev = get();
            next = updateFunction.applyAsInt(prev);
        } while (!compareAndSet(prev, next));
        return next;
    }

    public final int getAndAccumulate(int x, IntBinaryOperator accumulatorFunction) {
        int prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSet(prev, next));
        return prev;
    }

    public final int accumulateAndGet(int x, IntBinaryOperator accumulatorFunction) {
        int prev, next;
        do {
            prev = get();
            next = accumulatorFunction.applyAsInt(prev, x);
        } while (!compareAndSet(prev, next));
        return next;
    }

    public String toString() {
        return Integer.toString(get());
    }

    public int intValue() {
        return get();
    }

    public long longValue() {
        return (long)get();
    }

    public float floatValue() {
        return (float)get();
    }

    public double doubleValue() {
        return (double)get();
    }

}
複製代碼

compareAndSet方法就是CAS操做,它是調用sun.misc.Unsafe裏的compareAndSwapInt方法,這個方法是個native方法,其做用是每次從內存中根據內存偏移量(VALUE)取出的值和expect比較,若是數據一致就把內存中的值改成update

結論

由於FlatMap對應的MergeObserverConcatMap對應的SourceObserver都繼承了AtomicInteger,根據以前的源碼分析,它們兩個操做符每組數據裏發射數據的操做都是原子操做,所以它們都是按順序的;不一樣的是,在ObservableConcatMap的源碼中,咱們能夠看到它用volatile修飾的active布爾值判斷當前是否還有InnerObserver在發射,可是在ObservableFlatMap的源碼中沒看到相關的邏輯,因此FlatMap發射的那幾組數據是不按順序的,ConcatMap發射的那幾組數據是按順序的。

個人GitHub:TanJiaJunBeyond

Android通用框架:Android通用框架(Kotlin-MVVM)

個人掘金:譚嘉俊

個人簡書:譚嘉俊

相關文章
相關標籤/搜索