Rxjava2 Observable源碼淺析

關於Flowable的源碼解析能夠看RxJava2 Flowable源碼淺析java

關於Subject的源碼解析能夠看RxJava2 Subject源碼淺析緩存

概念

  • 上游:通常來講是ObservableFlowableSubject
  • 下游:通常來講是ObserverSubscrption
  • cold Observable:只有經過Observable#subscribe纔開始請求上游發送數據。當下遊請求dispose()中止通知上游中止發送。
  • hot Observable:無論有無下游,均可以進行數據的發送
  • Rxjava1開始就有人說Rxjava能夠看做流水線,上游怎麼加工對於下游來講是無感知的,下游只要負責接收響應對應數據事件就行。

對於rxajva的思考,能夠參考一下:Rxjava沉思錄系列Rxjava主要負責人系列博客bash

cold Observable

通常cold Observable建立都是經過justcreatefromXXjust建立的。最簡單粗暴的建立方式:多線程

Observable.create<String> { it.onNext("") }.subscribe()
複製代碼
//[僅關注點相關代碼]
//ObservableOnSubscribe僅一個subscribe方法
public interface ObservableOnSubscribe<T> {
    void subscribe(ObservableEmitter<T> e) throws Exception;
}

public abstract class Observable<T> implements ObservableSource<T> {
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        //RxJavaPlugins這是一個全局Hook,#onAssembly不實現默認直接返回
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

    public final void subscribe(Observer<? super T> observer) {
        try {
            ........
            //真正調用subscribe的實現
            subscribeActual(observer);
        } 
        ......
    }
    //整個Observable惟一的抽象方法,由子Observable實現,經過這個方法將上游和下游關聯起來
    protected abstract void subscribeActual(Observer<? super T> observer);
}
複製代碼

Observable#create真正返回的是ObservableCreate,當調用Observable#subscribe才真正通知上游Observable開始發送數據。其實質是經過#subscribeActual將上下游創建聯繫,並調用上游#subscribe(在ObservableCreate中就是ObservableOnSubscribe#subscribe)方法通知上游,下游已訂閱能夠開始發送數據。app

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            //source(上游)即Observable#create傳入的ObservableOnSubscribe
            //這裏就將上下游真正的聯繫了起來。
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

    static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
        
        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
        .....
    }
}
複製代碼

因此實質就是下游通知上游,下游已產生訂閱觸發上游下發數據/事件,上游再經過下發數據/事件,最終下游經過指定方法響應上游下發的數據/事件。因此一開始說的流水線方式就能夠理解了。異步

由於每次下游產生一次訂閱都會通知到上游的#subscribe,因此若是上游只在#subscribe中去建立初始數據源就能夠每一個作到不一樣下游的數據不關聯ide

Observable.create<String> { it.onNext("") }.subscribe()流程圖以下:post

Observable.create<String> { it.onNext(

map操做符

//Observable#map
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    //**ObservableMap將上游Observable和當前的轉換Function創建聯繫
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

//ObservableMap.java
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        //將下游包裝成MapObserver,並將MapObserver和上游創建聯繫
        //這樣上游下發時,先經過MapObserver處理才下發給真正的Observer
        source.subscribe(new MapObserver<T, U>(t, function));
    }


    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            ....
            U v;
            try {
                //經過Function獲取到map後的數據
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch ....
            //向下游下發數據
            actual.onNext(v);
        }
        ...
    }
}
複製代碼

能夠看到map操做符的做用就是經過將上游攔截返回ObservableMap提供給下游訂閱,並在map上游返回數據前經過mapper將上游數據轉化並下發給下游。ui

線程調度

subscribeOn

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    //emmmm,是否是點眼熟
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
//ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        //onSubscribe()方法執行在 訂閱處所在的線程
        s.onSubscribe(parent);
        //將上游放入scheduler中調用,且當即執行
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }

        @Override
        //scheduler#scheduleDirect中執行完後
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //該方法調用已經在scheduler中調用
            source.subscribe(parent);
        }
    }
}
複製代碼

由源碼能夠看出由scheduler.scheduleDirect->SubscribeTask#run->SubscribeOnObserver#subscribe(observer)將整個調度切換到指定線程中。this

由於訂閱是用下自上的,因此subscribeOn也老是離源最近的一個生效。由於觸發源的subscribe是離源最近一個。

observeOn

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
複製代碼

能夠看出Rxjava的操做符套路基本是將源Observable經過裝飾者模式封裝一層再返回新的Observable

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;//默認false
    final int bufferSize;//通常128
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //建立主線程調度器
            Scheduler.Worker w = scheduler.createWorker();
            //關聯上下游,觸發上游訂閱過程
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
}
複製代碼

這裏能夠看出ObservableObserveOn仍是很簡單的,上游訂閱過程並不用關心,下游的觸發則由ObserveOnObserver處理。

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
        implements Observer<T>, Runnable {

    private static final long serialVersionUID = 6576896619930983584L;
    final Observer<? super T> actual;
    final Scheduler.Worker worker;
    final boolean delayError;
    final int bufferSize;
    //上游數據的緩存隊列
    SimpleQueue<T> queue;

    Disposable s;

    Throwable error;
    volatile boolean done;

    volatile boolean cancelled;

    int sourceMode;

    boolean outputFused;

    ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
        this.actual = actual;
        this.worker = worker;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    public void onSubscribe(Disposable s) {
        if (DisposableHelper.validate(this.s, s)) {
            this.s = s;
            ......
            //建立對接緩存數據
            queue = new SpscLinkedArrayQueue<T>(bufferSize);
            //回調下游onSubscribe
            actual.onSubscribe(this);
        }
    }

    @Override
    public void onNext(T t) {
        if (done) {//執行過complete/error則donetrue
            return;
        }
        
        if (sourceMode != QueueDisposable.ASYNC) {//非異步數據,默認同步數據
            queue.offer(t);//入隊列
        }
        schedule();
    }

    @Override
    public void onError(Throwable t) {
        if (done) {
            RxJavaPlugins.onError(t);
            return;
        }
        error = t;
        done = true;//標記已完成
        schedule();
    }

    @Override
    public void onComplete() {
        if (done) {
            return;
        }
        done = true;//標記已完成
        schedule();
    }

    @Override
    public void dispose() {
        if (!cancelled) {
            cancelled = true;
            s.dispose();
            worker.dispose();
            if (getAndIncrement() == 0) {
                queue.clear();
            }
        }
    }

    @Override
    public boolean isDisposed() {
        return cancelled;
    }

    void schedule() {
        //自旋+1,!=0則表示worker.schedule已在執行無需在調度
        if (getAndIncrement() == 0) {
            worker.schedule(this);//經過調度器處理,將數據取出下發到下游
        }
    }
    
    @Override
    public void run() {
        if (outputFused) {//默認false
            drainFused();
        } else {
            drainNormal();//取出數據下發
        }
    }

    void drainNormal() {
        int missed = 1;

        final SimpleQueue<T> q = queue;
        final Observer<? super T> a = actual;

        for (; ; ) {
            //檢測是否不用再處理
            if (checkTerminated(done, q.isEmpty(), a)) {
                return;
            }

            for (; ; ) {
                boolean d = done;
                T v;

                try {
                    v = q.poll();//取出一個數據
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    s.dispose();
                    q.clear();
                    a.onError(ex);
                    worker.dispose();
                    return;
                }
                boolean empty = v == null;

                if (checkTerminated(d, empty, a)) {//可能已經提早disposed了
                    return;
                }

                if (empty) {//數據爲空隊列無數據,退出下發循環
                    break;
                }
                //下發
                a.onNext(v);
            }
            
            //可能有錯過的schedule,再次循環檢測
            missed = addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
    
    //檢測是否compelte/error/隊列已空
    boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
        if (cancelled) {//已經disposed
            queue.clear();
            return true;
        }
        if (d) {//是否已結束
            Throwable e = error;
            if (delayError) {//延遲error,等待隊列清空
                if (empty) {
                    if (e != null) {
                        a.onError(e);
                    } else {
                        a.onComplete();
                    }
                    worker.dispose();
                    return true;
                }
            } else {
                if (e != null) {
                    queue.clear();
                    a.onError(e);//下發error
                    worker.dispose();
                    return true;
                } else if (empty) {
                    a.onComplete();
                    worker.dispose();
                    return true;
                }
            }
        }
        return false;
    }
}
複製代碼

ObserveOnObserver繼承於BasicIntQueueDisposable繼承於AtomicInteger,經過自身的原子性(自旋/CAS)來消除多線程對#schedule的調用。

能夠看出#observeOn只對下游有影響。

線程調度總結

由於subscribeOn()切換線程是在subscribeActual中切換,經過切換上游訂閱過程的整個線程,從而影響發射數據的下發所在線程。因此subscribeOn()只有最靠近源的一次生效。

observeOn主動切換下發過程,對下發過程產生影響,·且屢次調用屢次生效。 PS:操做符的轉換效果都是在onXXX下發過程當中實現的,因此對操做符也有做用。

相關文章
相關標籤/搜索