從源碼查看RxJava中的map和flatMap的用法與區別

前言:

RxJava中提供了大量的操做符,這大大提升了了咱們的開發效率。其中最基本的兩個變換操做符就是mapflatMap。而其餘變換操做符的原理基本與map相似。java

  • map和flatMap都是接受一個函數做爲參數(Func1)並返回一個被觀察者Observable
  • Func1的< I,O >I,O模版分別爲輸入和輸出值的類型,實現Func1的call方法對I類型進行處理後返回O類型數據,只是flatMap中執行的方法的返回類型爲Observable類型

做用

map對Observable發射的每一項數據應用一個函數,執行變換操做。對原始的Observable發射的每一項數據應用一個你選擇的函數,而後返回一個發射這些結果的Observable。git

flatMap將一個發射數據的Observable變換爲多個Observables,而後將它們發射的數據合併後放進一個單獨的Observable。操做符使用一個指定的函數對原始Observable發射的每一項數據執行變換操做,這個函數返回一個自己也發射數據的Observable,而後FlatMap合併這些Observables發射的數據,最後將合併後的結果當作它本身的數據序列發射

使用方法:

經過代碼來看一下二者的使用用方法:github

map

Observable.just(new User("白瑞德"))
                  .map(new Function<User, String>() {

                      @Override
                      public String apply(User user) throws Throwable {

                          return user.getName();
                      }
                  })
                  .subscribe(new Consumer<String>() {

                      @Override
                      public void accept(String s) throws Throwable {

                          System.out.println(s);
                      }
                  });
<<<白瑞德
複製代碼

這段代碼接受一個User對象,最後打印出User中的name。bash

flatMap

假設存在一個需求,圖書館要打印每一個User借走每一本書的名字: User結構以下:markdown

class User {
    private String       name;
    private List<String> book;
}
複製代碼

咱們來看一下map的實現方法:app

Observable.fromIterable(userList)
                  .map(new Function<User, List<String>>() {

                      @Override
                      public List<String> apply(User user) throws Throwable {

                          return user.getBook();
                      }
                  })
                  .subscribe(new Consumer<List<String>>() {

                      @Override
                      public void accept(List<String> strings) throws Throwable {

                          for (String s : strings) {
                              System.out.println(s);
                          }
                      }
                  });
複製代碼

能夠看到,map的轉換老是一對一,只能單一轉換。咱們不得不借助循環進行打印。 下面咱們來看一下flatMap的實現方式:ide

Observable.fromIterable(userList)
                  .flatMap(new Function<User, ObservableSource<String>>() {

                      @Override
                      public ObservableSource<String> apply(User user) throws Throwable {

                          return Observable.fromIterable(user.getBook());
                      }
                  })
                  .subscribe(new Consumer<String>() {

                      @Override
                      public void accept(String  o) throws Throwable {
                          System.out.println(o);
                      }
                  });
複製代碼

flatmap既能夠單一轉換也能夠一對多/多對多轉換。flatMap使用一個指定的函數對原始Observable發射的每一項數據之行相應的變換操做,這個函數返回一個自己也發射數據的Observable,所以能夠再內部再次進行事件的分發。而後flatMap合併這些Observables發射的數據,最後將合併後的結果當作它本身的數據序列發射。函數

源碼分析

下面咱們就結合源碼來分析一下這兩個操做符。爲了下降代碼閱讀難道,這裏只保留核心代碼:oop

map

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    //接受一個Function實例,並返回一個ObservableMap
    return new ObservableMap<T, R>(this, mapper);
}

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        //調用用父類構造方法,初始化父類中的downstream
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            v = mapper.apply(t);
            downstream.onNext(v);
        }
    }
}
複製代碼

這段代碼是去掉map源碼中一些校驗和其它相關回調後的精簡代碼。接下來分析一下代碼流程:源碼分析

  • 當在調用map時,map接受一個匿名內部類Function的實例,並返回一個ObservableMap對象。
  • ObservableMap本質上是一個Observable,也是一個被觀察者,其構造方法接受最外層的那個被Observable實例,和Function實例。ObservableMap重寫了subscribeActual方法,在subscribeActual中使用新建了一個MapObserver實現了對原始Observable的觀察。
  • 原始的Observable中的數據變會被髮送到MapObserver的實例中。
  • MapObserver構造方法接收原始Observable的觀察者actual,和Function的實例mapper
  • MapObserver在其onNext方法中調用mapperapply方法,得到該方法的返回值v apply方法就是map實例中: public String apply(User user) throws Throwable { return user.getName(); }
  • 調用downstream的onNext方法,並傳入實參v。其中downstreamMapObserver父類中定義的變量,在MapObserver構造方法中super(actual);時初始化,其自己就是傳入的actual,本質上就是最原始的Observable

整個流程能夠歸納以下: 存在一個原始的ObservableA和一個觀察者ObserverA,當原始的被觀察者ObservableA調用map,並傳入一個匿名內部類實例化的’function‘,map新建並返回了一個被觀察者ObservableB,經過subscribe讓觀察者ObserverA對其進行訂閱。並重寫subscribeActual方法,在其被訂閱時建立一個新的觀察者ObserverB其接受的,並用ObserverB對原始的ObservableA進行訂閱觀察。當原始的ObservableA發出事件,調用ObserverBonNext方法,subscribeActual接受的觀察者即是最原始的觀察者ObserverAObserverB變執行經過匿名內部類實例化的’function‘的apply方法獲得數據v,緊接着調用原始的ObservableAonNext方法,並傳入實參vObserverA觀察到事件。 一句話歸納:一個原始的被觀察者和觀察者,可是讓原始的觀察者去訂閱一個新的觀察者,當新的被觀察者被訂閱的時候,建立一個新的觀察者去訂閱原始的被觀察者,並在監聽的事件以後執行指定的操做後再通知原始觀察者。

flatMap

faltMapmap的基本原理相似,其代碼以下:

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
        return new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize);
}


public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
    final boolean delayErrors;
    final int maxConcurrency;
    final int bufferSize;

    public ObservableFlatMap(ObservableSource<T> source,
            Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        super(source);
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {

        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }

    static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> {

        MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
                boolean delayErrors, int maxConcurrency, int bufferSize) {
            ...   
            this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY);
        }

        @Override
        public void onSubscribe(Disposable d) {
            downstream.onSubscribe(this);
        }

        @Override
        public void onNext(T t) {
            
            ObservableSource<? extends U> p;
            p = mapper.apply(t);    
            subscribeInner(p);
        }

        @SuppressWarnings("unchecked")
        void subscribeInner(ObservableSource<? extends U> p) {
           
                InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                p.subscribe(inner);
        }

        void drain() {    
            drainLoop();
        }
        void drainLoop() {
            final Observer<? super U> child = this.downstream;
            child.onNext(o);
        }
    }

    static final class InnerObserver<T, U> extends AtomicReference<Disposable>
    implements Observer<U> {

        private static final long serialVersionUID = -4606175640614850599L;
        final long id;
        final MergeObserver<T, U> parent;

        volatile boolean done;
        volatile SimpleQueue<U> queue;

        int fusionMode;

        InnerObserver(MergeObserver<T, U> parent, long id) {
            this.id = id;
            this.parent = parent;
        }

        @Override
        public void onNext(U t) {
            parent.drain();
        }
        
    }
}

複製代碼

上述代碼便是faltMap精簡後的源碼,其中大部分代碼的運做流程和前文中的map源碼一致,咱們繼續延續上文中講解中的觀察者和被觀察者。重點關注其不一樣的地方: faltMap返回一個新的被觀察者ObservableB,重寫ObservableBsubscribeActual方法在原始的觀察者ObserverA對其進行訂閱時新建一個觀察者ObserverB對原始的ObservableA進行訂閱。新的觀察者ObserverB持有原始的ObserverAfaltMap接收的匿名對象實例function。當ObserverB監聽到原始的被觀察者ObservableA的事件時,ObserverB調用functionapply方法得到新新的被觀察者ObservableC,再建立一個新的觀察者ObserverCObservableC進行訂閱,ObserverC持有原始的觀察者ObserverA,在ObserverC觀察到被觀察者ObservableC的時間時,調用原始的觀察者ObserverA的方法。

結語

至此,map和flatMap已基本分析完畢,其中map的代碼比較簡單易懂,flatMap中還涉及到大量輔助操做,文中並未涉及到其中的合併等操做,閱讀起來有些困難。若是僅僅是爲了瞭解兩者的原理,能夠閱讀Single<T>中的代碼。其中的代碼量遠遠少於Observable中的代碼量。若是對RxJava基本的模式還不瞭解,能夠閱讀大神博客手寫極簡版的Rxjava

相關文章
相關標籤/搜索