RxJava的部分變換操做符源碼分析

一. map操做符

(一)定義

對Observable發射的每一項數據應用一個函數,執行變換操做,以下圖。java

map操做符對原始Observable發射的的每一項數據應用一個你選擇的函數,而後返回一個發射這些結果的Observable。bash

RxJava這個操做符實現爲map函數,這個操做符默認再也不任何特定的調度器上執行。app

(二)示例

public void mapTest(){
        Observable.just("HELLO")
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        return s.toLowerCase();
                    }
                })
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        return s + " world";
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        System.out.println(s);
                    }
                });
    }
複製代碼

輸出結果:ide

hello world
複製代碼

(三)源碼分析

1.調用map()方法

/** * Returns an Observable that applies a specified function to each item emitted by the source ObservableSource and * emits the results of these function applications. * * @param <R> the output type * @param mapper * a function to apply to each item emitted by the ObservableSource * @return an Observable that emits the items from the source ObservableSource, transformed by the specified * function */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
複製代碼

2.底層在調用ObservableMap類

將自定義的Function對象賦值給成員變量:function. 該類中一樣有個重寫的subscribeActual()方法, 在函數

@Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
複製代碼

後續調用subscribe()時,其會先調用Observable類中的subscribe()方法源碼分析

/**
     * Subscribes to an ObservableSource and provides a callback to handle the items it emits.
     *
     * @param onNext
     *             the {@code Consumer<T>} you have designed to accept emissions from the ObservableSource
     * @return a {@link Disposable} reference with which the caller can stop receiving items before
     *         the ObservableSource has finished sending them
     * @throws NullPointerException
     *             if {@code onNext} is null
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
複製代碼

subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer())該方法會調用Observable同類中的重載的方法:subscribe()fetch

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
        
        // 核心方法
        subscribe(ls);

        return ls;
    }
複製代碼

該方法又會調用同類中重載的subscribe()方法ui

@SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            
            // 核心方法,繼承Observable的類都須該方法
            subscribeActual(observer);
            
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
複製代碼

此時就會調用ObservableMap類中的subscribeActual()方法實現,this

該方法又會調用上述方法,不過此時subscribeActual()方法是ObservableJust類中的方法spa

@Override
    protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        s.onSubscribe(sd);
        sd.run();
    }
複製代碼

該方法就會使用s.onSubscribe()來BasicFuseableObserver類中的OnSubscribe()方法

// final: fixed protocol steps to support fuseable and non-fuseable upstream
    @SuppressWarnings("unchecked")
    @Override
    public final void onSubscribe(Disposable s) {
        if (DisposableHelper.validate(this.s, s)) {

            this.s = s;
            if (s instanceof QueueDisposable) {
                this.qs = (QueueDisposable<T>)s;
            }

            if (beforeDownstream()) {
            
                //對應的代碼
                actual.onSubscribe(this);

                afterDownstream();
            }

        }
    }
複製代碼

進過屢次的調用,最終就會調用咱們再主方法中重寫的accept()方法,輸出對應的結果。

二.flatmap操做符

(一)定義

flatMap將一個發射數據的Observable變換爲多個Observable,而後將它們發射的數據合併放進一個單獨的Observable,如圖2:

圖2

flapMap操做符使用一個指定的函數對原始Observable發射的每一項數據執行變換操做,這個函數返回一個自己也發射數據的Observable,而後flatMap合併這些Observables發射的數據,最後將合併後的結果看成它本身的數據序列發射。

(二)示例

數據類

public class User {
    public String userName;
    public List<Address> addresses;

    public static class Address {
        public String street;
        public String city;
    }
}
複製代碼
public void flatMapTest() {
        User user = new User();
        user.userName = "tony";
        user.addresses = new ArrayList<>();
        User.Address address1 = new User.Address();
        address1.street = "ren ming road";
        address1.city = "Su zhou";
        user.addresses.add(address1);

        User.Address address2 = new User.Address();
        address2.street = "dong wu bei road";
        address2.city = "Su zhou";
        user.addresses.add(address2);

        Observable.just(user)
                .flatMap(new Function<User, ObservableSource<User.Address>>() {
                    @Override
                    public ObservableSource<User.Address> apply(User user) throws Exception {
                        return Observable.fromIterable(user.addresses);
                    }
                })
                .subscribe(new Consumer<User.Address>() {
                    @Override
                    public void accept(User.Address address) throws Exception {
                        System.out.println(address.street);
                    }
                });
    }
複製代碼

輸出結果:

ren ming road
dong wu bei road
複製代碼

(三) 源碼分析

1.flatMap()

flatMap()底層會調用有多個重載的方法,最終會調用以下方法:

/** * Returns an Observable that emits items based on applying a function that you supply to each item emitted * by the source ObservableSource, where that function returns an ObservableSource, and then merging those resulting * ObservableSources and emitting the results of this merger, while limiting the maximum number of concurrent * subscriptions to these ObservableSources. * * @param <R> the value type of the inner ObservableSources and the output type * @param mapper * a function that, when applied to an item emitted by the source ObservableSource, returns an * ObservableSource * @param maxConcurrency * the maximum number of ObservableSources that may be subscribed to concurrently * @param delayErrors * if true, exceptions from the current Observable and all inner ObservableSources are delayed until all of them terminate * if false, the first one signalling an exception will terminate the whole sequence immediately * @param bufferSize * the number of elements to prefetch from each inner ObservableSource * @return an Observable that emits the result of applying the transformation function to each item emitted * by the source ObservableSource and merging the results of the ObservableSources obtained from this * transformation */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        
        if (this instanceof ScalarCallable) {
            @SuppressWarnings("unchecked")
            T v = ((ScalarCallable<T>)this).call();
            if (v == null) {
                return empty();
            }
            return ObservableScalarXMap.scalarXMap(v, mapper);
        }
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
    }
複製代碼

然後根據this是否爲ScalarCallable類,返回不一樣的對象:

本示例是該示例,進而調用ObservableScalarXMap.scalarXMap(v, mapper);

/** * Maps a scalar value into an Observable and emits its values. * * @param <T> the scalar value type * @param <U> the output value type * @param value the scalar value to map * @param mapper the function that gets the scalar value and should return * an ObservableSource that gets streamed * @return the new Observable instance */
    public static <T, U> Observable<U> scalarXMap(T value, Function<? super T, ? extends ObservableSource<? extends U>> mapper) {
        return RxJavaPlugins.onAssembly(new ScalarXMapObservable<T, U>(value, mapper));
    }
複製代碼

進而建立ScalarXMapObservable對象。

2.subscribe()

該方法也有多個重載方法,最終會調用subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe)並執行其中的**subscribe(ls);方法 進而再調用subscribe(Observer<? super T> observer)並執行其中的subscribeActual(observer);**方法

進而調用ObservableScalarXMap的**subscribeActual(Observer<? super R> s)**方法

@SuppressWarnings("unchecked")
        @Override
        public void subscribeActual(Observer<? super R> s) {
            ObservableSource<? extends R> other;
            try {
                // 核心代碼
                // 這兒調用在用戶自定義的flatMap方法中自定義apply()方法
                other = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper returned a null ObservableSource");
            } catch (Throwable e) {
                EmptyDisposable.error(e, s);
                return;
            }
            
            // 此處是判斷other的類型,進而執行不一樣的方法
            if (other instanceof Callable) {
                R u;

                try {
                    u = ((Callable<R>)other).call();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    EmptyDisposable.error(ex, s);
                    return;
                }

                if (u == null) {
                    EmptyDisposable.complete(s);
                    return;
                }
                ScalarDisposable<R> sd = new ScalarDisposable<R>(s, u);
                s.onSubscribe(sd);
                sd.run();
            } else {
                // 由於other是Observable類型,於是執行這步
                other.subscribe(s);
            }
        }
    }
複製代碼

在subscribeActual()調用mapper.apply(value)執行用戶自定義的flatMap方法中的apply()方法

此時又會調用Observable類的subscribe()方法。

注意此處的subscribe()和上面已經調用的Observable不是同一個對象!!!

再次調用該方法中的subscribeActual()方法,進而調用ObservableFromIterable類中的**subscribeActual()**方法:

@Override
    public void subscribeActual(Observer<? super T> s) {
        Iterator<? extends T> it;
        try {
            // 此處對source的數據進行依次發送
            it = source.iterator();
            
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            EmptyDisposable.error(e, s);
            return;
        }
        boolean hasNext;
        try {
            // 判斷是否it ------------------------------------------
            hasNext = it.hasNext();
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            EmptyDisposable.error(e, s);
            return;
        }
        if (!hasNext) {
            EmptyDisposable.complete(s);
            return;
        }

        FromIterableDisposable<T> d = new FromIterableDisposable<T>(s, it);
        s.onSubscribe(d);

        if (!d.fusionMode) {
            d.run();
        }
    }
複製代碼

(四)總結

相關文章
相關標籤/搜索