rxjava2源碼解析(四)--變換

引言

先貼上前面幾篇的連接:
rxjava2源碼解析(一)基本流程分析
rxjava2源碼解析(二)線程切換分析
rxjava2源碼解析(三)線程池原理分析java

上一篇說了rxjava2的線程池原理,這篇咱們來講說rxjava的變換。緩存

變換和線程切換算是rxjava最關鍵的兩個功能。常見的變換有map(),flatMap()。咱們先從map方法提及吧。bash

map

咱們先舉一個簡單的例子,來看看map能作什麼:併發

Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String name) {
        Log.d(tag, name);
    }
    ...
};
Observable.from(students)
    .map(new Function<Student, String>() {
        @Override
        public String apply(Student student) {
            return student.getName();
        }
    })
    .subscribe(subscriber);
複製代碼

上面的例子是一個功能,打印一個班級裏students的名字。很簡單,經過from方法對student進行遍歷,一個map方法將student變換成name,而後下游打印就完事了。咱們知道rxjava2裏面是有不少泛型設定的,若是類型錯誤是會直接標紅。from方法返回的下游數據類型是student,而subscriber中接收的數據類型必須是String。很顯然,這裏map就將下游的數據類型進行了變換。
具體在源碼中是如何實現的呢?咱們先看map的源碼:app

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
複製代碼

仍是老樣子,拋開判空代碼和hock機制,直接看ObservableMap類。不過在此以前,先看看map方法裏面設定的泛型。T是Observable裏設定的上游數據類型,map方法會返回一個Observable,這裏就將整個鏈條的數據類型進行了變換。異步

public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
複製代碼

看過前面的幾篇就知道,這裏仍是老套路,仍是裝飾器模式,仍是建立一個內部處理器MapObserver。內部處理器MapObserver負責與上游綁定,因此它的處理數據類型仍爲T。ObservableMap與下游進行綁定訂閱,因此ObservableMap中數據的類型爲R。咱們在看MapObserver以前,先看看Function是什麼。ide

public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    R apply(@NonNull T t) throws Exception;
}
複製代碼

OK,Function是一個接口,只有一個接口方法applyFunction規定了兩個泛型:T、R。其中T是apply的參數類型,R是返回值類型。咱們在使用過程當中,重寫apply方法進行數據類型變換,而後再用map方法插入到整條流水線中,就達到了變換的目的。oop

下面看看MapObserver中具體是怎麼實現的:post

MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }
            U v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }
複製代碼

很簡單,MapObserveronNext負責處理上游下來的數據,在onNext方法中調用Functionapply方法,將T變換爲下游須要的U(也就是前面的R),而後再將數據傳遞下去,達到變換的目的。ui

map的使用和源碼都很簡單,咱們來看看flatMap的。

flatMap

仍是先用一個簡單的例子來看flatMap的用途:

Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};
Observable.from(students)
    .flatMap(new Function<Student, Observable<Course>>() {
        @Override
        public Observable<Course> apply(Student student) {
            return Observable.from(student.getCourses());
        }
    })
    .subscribe(subscriber);
複製代碼

產品說功能要改一改,不是打印每一個student的名字,而是要打印每一個sutdent全部課程名稱。正常狀況下,咱們在subscriber中獲取到每一個student,而後用個for循環進行遍歷打印就行,可是flatMap能夠直接一步搞定。

細心的已經發現,這裏的Function比較奇怪,它的返回值類型居然是Observable。具體怎麼回事,咱們看看源碼:

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        //這裏的delayErrors,maxConcurrency,bufferSize都是默認值。
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
    }
複製代碼

先解釋一下,delayErrorsmaxConcurrency,bufferSize這幾個參數的意義:

  • delayErrors表示異常是否須要延遲到全部內部數據都傳輸完畢後拋出。默認值是false
  • maxConcurrency 表示最大併發數,默認值爲Integer.MAX_VALUE
  • bufferSize 緩存的內部被觀察者事件總數大小,默認值爲128.

老樣子,咱們直接看ObservableFlatMap

public ObservableFlatMap(ObservableSource<T> source,
            Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        super(source);
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.maxConcurrency = maxConcurrency;
        this.bufferSize = bufferSize;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {

        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
            return;
        }

        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }
複製代碼

仍是原來的配方,仍是原來的味道。咱們來看看MergeObserver的源碼一探究竟:

@Override
        public void onNext(T t) {
            //調用apply方法,獲取到轉換的Observable
            ObservableSource<? extends U> p;
            try {
                p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                upstream.dispose();
                onError(e);
                return;
            }
            //隱藏了一些判斷代碼
            subscribeInner(p);
        }

        @SuppressWarnings("unchecked")
        void subscribeInner(ObservableSource<? extends U> p) {
            for (;;) {
                //這裏會走到else
                if (p instanceof Callable) {
                    ...
                } else {
                //這裏新建一個InnerObserver,調用addInner添加到隊列中,而後用apply中生成的Observable與之訂閱。
                    InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                    if (addInner(inner)) {
                        p.subscribe(inner);
                    }
                    break;
                }
            }
        }
複製代碼

如註釋中所示,這裏根據上游每個數據,生成一個Observable,而後新建一個InnerObserver,將這個InnerObserver添加到內部處理器隊列中,並將Observable與這個InnerObserver進行訂閱。
咱們以Observable.from()爲例,看看這中間的流程是什麼樣的。

//from 方法返回一個ObservableFromArray裝飾器
    public static <T> Observable<T> fromArray(T... items) {
       //省略部分判空代碼
        return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
    }
    
    
//ObservableFromArray源碼
public final class ObservableFromArray<T> extends Observable<T> {
    final T[] array;
    public ObservableFromArray(T[] array) {
        this.array = array;
    }

    
    @Override
    public void subscribeActual(Observer<? super T> observer) {
        //訂閱後,建立一個FromArrayDisposable內部類對象
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(observer, array);
        //這個方法很關鍵,咱們待會能夠看看InnerObserver的onSubscribe方法。
        observer.onSubscribe(d);

        if (d.fusionMode) {
            return;
        }

        d.run();
    }
    
    //FromArrayDisposable不是一個處理器,他只是一個帶簡單隊列的Disposable
    static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
        final Observer<? super T> downstream;
        final T[] array;
        int index;
        boolean fusionMode;
        volatile boolean disposed;
        FromArrayDisposable(Observer<? super T> actual, T[] array) {
            this.downstream = actual;
            this.array = array;
        }
        // 這裏顯然是返回同步
        @Override
        public int requestFusion(int mode) {
            if ((mode & SYNC) != 0) {
                fusionMode = true;
                return SYNC;
            }
            return NONE;
        }

        //poll方法會逐個返回隊列中的數據
        @Nullable
        @Override
        public T poll() {
            int i = index;
            T[] a = array;
            if (i != a.length) {
                index = i + 1;
                return ObjectHelper.requireNonNull(a[i], "The array element is null");
            }
            return null;
        }

        @Override
        public boolean isEmpty() {
            return index == array.length;
        }

        @Override
        public void clear() {
            index = array.length;
        }

        @Override
        public void dispose() {
            disposed = true;
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
        //在run方法中,開始向下遊傳遞數據。不過這時候已經不重要了,由於在InnerObserver的onSubscribe方法中,已經經過poll方法將隊列中的數據都傳遞出去了。固然這僅僅是在這個示例中是這樣
        void run() {
            T[] a = array;
            int n = a.length;
            //開始向下遊傳遞數據
            for (int i = 0; i < n && !isDisposed(); i++) {
                T value = a[i];
                if (value == null) {
                    downstream.onError(new NullPointerException("The element at index " + i + " is null"));
                    return;
                }
                downstream.onNext(value);
            }
            if (!isDisposed()) {
                downstream.onComplete();
            }
        }
    }
}

複製代碼

如上面註釋所示,from方法返回一個簡單的ObservableFromArrayObservableFromArraysubscribe中,調用下游處理器的onSubscribe方法,而後調用自身的run方法。咱們看看InnerObserver中是怎麼處理的:

static final class InnerObserver<T, U> extends AtomicReference<Disposable>
    implements Observer<U> {

        private static final long serialVersionUID = -4606175640614850599L;
        final long id;
        final MergeObserver<T, U> parent;

        volatile boolean done;
        volatile SimpleQueue<U> queue;

        int fusionMode;
        //這裏會用一個獨特的ID來給每一個InnerObserver作標記
        InnerObserver(MergeObserver<T, U> parent, long id) {
            this.id = id;
            this.parent = parent;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.setOnce(this, d)) {
                //FromArrayDisposable知足這個條件
                if (d instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<U> qd = (QueueDisposable<U>) d;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
                    //由上面FromArrayDisposable的源碼可知這裏返回SYNC
                    if (m == QueueDisposable.SYNC) {
                        fusionMode = m;
                        queue = qd;
                        //這裏直接將done設置爲true,是由於下面的parent.drain()會直接取出全部數據並傳遞給下游
                        done = true;
                        //數據在這其中進行下發和傳遞
                        parent.drain();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        fusionMode = m;
                        queue = qd;
                    }
                }
            }
        }

        @Override
        public void onNext(U t) {
            if (fusionMode == QueueDisposable.NONE) {
                parent.tryEmit(t, this);
            } else {
                //當上遊執行到這裏時,數據已經被傳遞完畢了。這裏單指此次示例
                parent.drain();
            }
        }

        ....
    }
複製代碼

具體的信息都寫在上面的註釋中,咱們直接來看MergeObserverdrain()方法。

void drain() {
            //這裏進行判斷,確保drainLoop還在執行時不會被再次調用
            if (getAndIncrement() == 0) {
                drainLoop();
            }
        }

        void drainLoop() {
            //獲取到下游Observer
            final Observer<? super U> child = this.downstream;
            int missed = 1;
            for (;;) {
                //判斷是否有error
                if (checkTerminate()) {
                    return;
                }
                ...
                boolean d = done;
                svq = queue;
                InnerObserver<?, ?>[] inner = observers.get();
                int n = inner.length;
                int nSources = 0;
                ...
                int innerCompleted = 0;
                if (n != 0) {
                //初始lastId lastIndex都爲0
                    long startId = lastId;
                    int index = lastIndex;
                    ...
                    int j = index;
                    sourceLoop:
                    for (int i = 0; i < n; i++) {
                        
                        //獲取到當前InnerObserver
                        @SuppressWarnings("unchecked")
                        InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
                        //q就是FromArrayDisposable。
                        SimpleQueue<U> q = is.queue;
                        if (q != null) {
                            for (;;) {
                                U o;
                                try {
                                    //在這裏循環調取FromArrayDisposable隊列中數據,而後傳遞到下游
                                    o = q.poll();
                                } catch (Throwable ex) {
                                    ....
                                }
                                if (o == null) {
                                    break;
                                }
                                child.onNext(o);
                                ...
                            }
                        }
                        //前面標記過,在onSubscribe中已經將done設置爲true.
                        boolean innerDone = is.done;
                        SimpleQueue<U> innerQueue = is.queue;
                        //因爲上面已經將數據處理完畢,這裏innerQueue.isEmpty()返回爲trueif (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
                            //將該InnerObserver從隊列中移除
                            removeInner(is);
                            if (checkTerminate()) {
                                return;
                            }
                            innerCompleted++;
                        }

                        j++;
                        if (j == n) {
                            j = 0;
                        }
                    }
                    lastIndex = j;
                    lastId = inner[j].id;
                }
                ...
                //這裏與開頭getAndIncrement()相呼應,確保drainLoop在執行時不會被再次調用
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
複製代碼

OK,整個流程就清晰了,劃重點:

  • flatMap()是基礎裝飾器Observable的一個方法,參數是一個Function,只不過這個Functionapply()方法返回類型爲一個Observable
  • flatMap()返回一個ObservableFlatMap裝飾器對象。ObservableFlatMap被訂閱後會調用subscribeActual()方法,在此方法中,會建立一個內部類MergeObserver對象,並將上游裝飾器與之訂閱。
  • MergeObserver在接收到上游數據後,會調用Functionapply()方法,將數據轉換爲一個Observable,並建立一個內部InnerObserver,將這個InnerObserver放入隊列中,而後將生成的Observable與之訂閱。
  • 在同步的狀態下,InnerObserveronSubscribe()方法會直接調用MergeObserverdrain()方法,將數據所有都直接傳遞給下游。從而完成整個流程。

觀察代碼會發現,同步僅僅是flatMap的一個簡單狀況,更復雜的狀況在於異步。具體的你們能夠去源碼裏研究一下,畢竟這篇的篇幅已經夠長了。下一篇預告一下,咱們來看看背壓。

相關文章
相關標籤/搜索