RxJava2 Flowable源碼淺析

關於Observable的源碼解析能夠看Rxjava2 Observable源碼淺析java

關於Subject的源碼解析能夠看RxJava2 Subject源碼淺析緩存

前言

看過Rxjava2 Observable源碼淺析的你會發現其實Rxjava的實現套路都差很少,因此其實Flowable也差很少,只是在實現的細節上稍微有些差別而已。bash

背景

Flowable的出現其實主要是爲了解決在異步模型中上下游數據發送和接收的差別性而存在的。上游發送速度大於下游接收速度時就會產生數據積壓致使OOM,而Flowable就提供了背壓(BackPressure) 策略來處理數據積壓問題。多線程

流程

從最原始的Flowable#create開始異步

//FlowableOnSubscribe就是最原始的數據源發生器
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
    ObjectHelper.requireNonNull(source, "source is null");
    ObjectHelper.requireNonNull(mode, "mode is null");
    //將FlowableOnSubscribe轉化成了FlowableCreate
    return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}
複製代碼

能夠看到create方法也是將數據源進行了一層封裝。而subscribe方法和Observable#subscribe就是差很少,最終仍是調用的Flowable#subscribeActual,而這裏就是FlowableCreate#subscribeActualide

public final class FlowableCreate<T> extends Flowable<T> {

    final FlowableOnSubscribe<T> source;

    final BackpressureStrategy backpressure;

    public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
        this.source = source;
        this.backpressure = backpressure;
    }

    @Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;
        //根據不一樣的背壓策略實現不一樣Emitter
        switch (backpressure) {
        case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
        }
        //通常來講在Subscriber#onSubscribe,調用emitter.request指定拉取上游多少數據
        t.onSubscribe(emitter);
        try {
            //將上下游關聯
            //調用Flowable#create一開始建立的FlowableOnSubscribe#subscribe
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }
}
複製代碼

能夠纔看到這裏核心就是根據不用背壓策略實現不一樣的Emitter。通常來講在Subscriber#onSubscribe,調用emitter.request指定拉取上游多少數據,從而經過背壓策略對數據下發的策略不一樣。post

BaseEmitter

abstract static class BaseEmitter<T>
    extends AtomicLong
    implements FlowableEmitter<T>, Subscription {
        private static final long serialVersionUID = 7326289992464377023L;

        final Subscriber<? super T> actual;

        final SequentialDisposable serial;

        BaseEmitter(Subscriber<? super T> actual) {
            this.actual = actual;
            this.serial = new SequentialDisposable();
        }

        @Override
        public void onComplete() {
            complete();
        }

        protected void complete() {
            if (isCancelled()) {
                return;
            }
            try {
                actual.onComplete();
            } finally {
                serial.dispose();
            }
        }

        @Override
        public final void onError(Throwable e) {
            //嘗試下發完成緩存數據
            if (!tryOnError(e)) {
                RxJavaPlugins.onError(e);
            }
        }

        @Override
        public boolean tryOnError(Throwable e) {
            return error(e);
        }

        protected boolean error(Throwable e) {
            if (e == null) {
                e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (isCancelled()) {
                return false;
            }
            try {
                actual.onError(e);
            } finally {
                serial.dispose();
            }
            return true;
        }

        @Override
        public final void cancel() {
            serial.dispose();
            onUnsubscribed();
        }

        @Override
        public final boolean isCancelled() {
            return serial.isDisposed();
        }

        @Override
        public final void request(long n) {
            //記錄請求的個數
            if (SubscriptionHelper.validate(n)) {
                BackpressureHelper.add(this, n);
                onRequested();
            }
        }

        void onRequested() {
            // default is no-op
        }

        @Override
        public final void setDisposable(Disposable s) {
            serial.update(s);
        }

        @Override
        public final void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public final long requested() {
            return get();
        }
        .....
    }
複製代碼

這裏能夠看到BaseEmitter經過自身繼承AtomicLong取記錄請求個數,而不是經過鎖或者volatile來提升性能。性能

MissingEmitter - BackpressureStrategy.MISSING

不作任何處理,由下游自行處理overflow。MissingEmitter實現很簡單。學習

static final class MissingEmitter<T> extends BaseEmitter<T> {

        private static final long serialVersionUID = 3776720187248809713L;

        MissingEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        public void onNext(T t) {
            if (isCancelled()) {
                return;
            }
            //這裏能夠看出,對應數據下發沒有任何限制
            if (t != null) {
                actual.onNext(t);
            } else {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //request減1
            for (;;) {
                long r = get();
                if (r == 0L || compareAndSet(r, r - 1)) {
                    return;
                }
            }
        }

    }
複製代碼

BufferAsyncEmitter - BackpressureStrategy.BUFFER

static final class BufferAsyncEmitter<T> extends BaseEmitter<T> {

    private static final long serialVersionUID = 2427151001689639875L;

    final SpscLinkedArrayQueue<T> queue;///數據緩存列表

    Throwable error;
    volatile boolean done;//標記是否onComplete或onError

    final AtomicInteger wip;//標記調用了多少次drain

    BufferAsyncEmitter(Subscriber<? super T> actual, int capacityHint) {
        super(actual);
        this.queue = new SpscLinkedArrayQueue<T>(capacityHint);
        this.wip = new AtomicInteger();
    }

    @Override
    public void onNext(T t) {
        if (done || isCancelled()) {
            return;
        }

        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        queue.offer(t);///數據入隊列
        drain();//檢測並下發數據
    }

    @Override
    public boolean tryOnError(Throwable e) {
        if (done || isCancelled()) {
            return false;
        }

        if (e == null) {
            e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        }

        error = e;
        done = true;//標記完成
        drain();//檢測並下發未完成數據
        return true;
    }

    @Override
    public void onComplete() {//僅標記,若隊列有數據繼續下發完成
        done = true;//標記完成
        drain();//檢測並下發未完成數據
    }

    @Override
    void onRequested() {//#request(long n)後調用
        drain();//檢測並下發數據
    }

    @Override
    void onUnsubscribed() {
        if (wip.getAndIncrement() == 0) {
            queue.clear();
        }
    }

    void drain() {
        //相似於if(wip++ != 0)
        //因此這裏屢次調用#drain只有第一次調用纔會經過,或者已經清空隊列等待一下調用#drain
        if (wip.getAndIncrement() != 0) {
            return;
        }

        int missed = 1;
        final Subscriber<? super T> a = actual;
        final SpscLinkedArrayQueue<T> q = queue;

        for (; ; ) {
            long r = get();//數據請求數,由#request決定
            long e = 0L;

            while (e != r) {
                if (isCancelled()) {
                    q.clear();
                    return;
                }
                //是否已完成,調用onComplete/onError後會標記done==true
                boolean d = done;
                //獲取隊列第一條數據
                T o = q.poll();
                //用於標記隊列是否爲空
                boolean empty = o == null;
                //已標記完成且隊列爲空,調用onComplete/onError
                if (d && empty) {
                    Throwable ex = error;
                    if (ex != null) {
                        error(ex);
                    } else {
                        complete();
                    }
                    return;
                }
                //隊列爲空,退出獲取數據循環
                if (empty) {
                    break;
                }
                //下發數據
                a.onNext(o);
                //標記已下發數據
                e++;
            }
            //數據下發量和請求數相符
            if (e == r) {
                if (isCancelled()) {
                    q.clear();
                    return;
                }
                //標記是否完成
                boolean d = done;
                //標記隊列是否爲空
                boolean empty = q.isEmpty();
                //隊列爲空且已完成,調用onComplete/onError
                if (d && empty) {
                    Throwable ex = error;
                    if (ex != null) {
                        error(ex);
                    } else {
                        complete();
                    }
                    return;
                }
            }
            //request數減去已經下發數
            if (e != 0) {
                BackpressureHelper.produced(this, e);
            }
            //已處理一次drain,wip-missed避免錯過屢次調用drain
            //和Observable#observeOn時的ObserveOnObserver#drainNormal處理方式同樣
            missed = wip.addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
}
複製代碼

這裏的#drain下發數據方法和Observable#observeOn->ObserveOnObserver#drainNormal的處理方式是有點類似的。經過自己記錄request數和wip協調下發數據量及正確的下發。在調用Subscriber#onSubscribeEmitter#onNextEmitter#onComplete都會觸發#drain嘗試去下發緩存的數據。其中Emitter#onNext時先緩存數據在嘗試下發,並且數據還沒下發完成前調用onCompleteonError(這裏重寫了tryOnError)僅先標記完成,還要等數據徹底下發纔會真正調用actual對應方法。ui

其實這裏咱們仍是能夠學到一些東西的:

  • 若是能夠的話,使用Atomic包下的類代替volatile和鎖提升性能
  • 使用missedwip來協調多線程分發任務
  • 多線程中標誌位的判斷最好經過臨時變量存儲判斷並屢次判斷

LatestAsyncEmitter - BackpressureStrategy.LATEST

BackpressureStrategy.LATEST當數據背壓時只會緩存最後一次下發的數據(經過AtomicReference來緩存)。具體實現原理和BackpressureStrategy.BUFFER較爲相似就不貼代碼了。

BackpressureStrategy.DROP & BackpressureStrategy.ERROR

abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {

        private static final long serialVersionUID = 4127754106204442833L;

        NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        public final void onNext(T t) {
            if (isCancelled()) {
                return;
            }

            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //是否已達請求數
            if (get() != 0) {
                actual.onNext(t);//未達請求數,下發
                BackpressureHelper.produced(this, 1);//請求數減1
            } else {
                onOverflow();//已超過請求,調用對應策略方法
            }
        }
        //
        abstract void onOverflow();
}
複製代碼

BackpressureStrategy.DROP對應的DropAsyncEmitterBackpressureStrategy.ERROR對應的ErrorAsyncEmitter都是繼承於NoOverflowBaseAsyncEmitter。實現方式也是很簡單,僅僅在onNext判斷一下是否已經到達了請求數,未到達就下發,若到達了調用onOverflow()處理溢出方案。

BackpressureStrategy.DROP的溢出方案爲空實現即捨去溢出數據 BackpressureStrategy.ERROR的溢出方案爲調用onError即溢出時報錯

總結

MISS策略須要下游自行處理背壓問題

BUFFER策略則在還有數據未下發完成時就算上游調用onCompleteonError也會等待數據下發完成

LATEST策略則當產生背壓時僅會緩存最新的數據

DROP策略爲背壓時拋棄背壓數據

ERROR策略是背壓時拋出異常調用onError

在學習源碼時獲得的一些關於多線程的領悟:

  • 若是能夠的話,使用Atomic包下的類代替volatile和鎖提升性能
  • 使用missedwip來協調多線程分發任務
  • 多線程中標誌位的判斷最好經過臨時變量存儲判斷並屢次判斷
相關文章
相關標籤/搜索