RxJava中提供了大量的操做符,這大大提升了了咱們的開發效率。其中最基本的兩個變換操做符就是map
和flatMap
。而其餘變換操做符的原理基本與map
相似。java
Observable
map
對Observable發射的每一項數據應用一個函數,執行變換操做。對原始的Observable發射的每一項數據應用一個你選擇的函數,而後返回一個發射這些結果的Observable。git
flatMap
將一個發射數據的Observable變換爲多個Observables,而後將它們發射的數據合併後放進一個單獨的Observable。操做符使用一個指定的函數對原始Observable發射的每一項數據執行變換操做,這個函數返回一個自己也發射數據的Observable,而後FlatMap合併這些Observables發射的數據,最後將合併後的結果當作它本身的數據序列發射
經過代碼來看一下二者的使用用方法:github
Observable.just(new User("白瑞德")) .map(new Function<User, String>() { @Override public String apply(User user) throws Throwable { return user.getName(); } }) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Throwable { System.out.println(s); } }); <<<白瑞德 複製代碼
這段代碼接受一個User對象,最後打印出User中的name。bash
假設存在一個需求,圖書館要打印每一個User借走每一本書的名字: User
結構以下:markdown
class User {
private String name;
private List<String> book;
}
複製代碼
咱們來看一下map
的實現方法:app
Observable.fromIterable(userList) .map(new Function<User, List<String>>() { @Override public List<String> apply(User user) throws Throwable { return user.getBook(); } }) .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> strings) throws Throwable { for (String s : strings) { System.out.println(s); } } }); 複製代碼
能夠看到,map
的轉換老是一對一,只能單一轉換。咱們不得不借助循環進行打印。 下面咱們來看一下flatMap
的實現方式:ide
Observable.fromIterable(userList) .flatMap(new Function<User, ObservableSource<String>>() { @Override public ObservableSource<String> apply(User user) throws Throwable { return Observable.fromIterable(user.getBook()); } }) .subscribe(new Consumer<String>() { @Override public void accept(String o) throws Throwable { System.out.println(o); } }); 複製代碼
flatmap
既能夠單一轉換也能夠一對多/多對多轉換。flatMap
使用一個指定的函數對原始Observable
發射的每一項數據之行相應的變換操做,這個函數返回一個自己也發射數據的Observable
,所以能夠再內部再次進行事件的分發。而後flatMap
合併這些Observables
發射的數據,最後將合併後的結果當作它本身的數據序列發射。函數
下面咱們就結合源碼來分析一下這兩個操做符。爲了下降代碼閱讀難道,這裏只保留核心代碼:oop
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { //接受一個Function實例,並返回一個ObservableMap return new ObservableMap<T, R>(this, mapper); } public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { //調用用父類構造方法,初始化父類中的downstream super(source); this.function = function; } @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { v = mapper.apply(t); downstream.onNext(v); } } } 複製代碼
這段代碼是去掉map源碼中一些校驗和其它相關回調後的精簡代碼。接下來分析一下代碼流程:源碼分析
map
時,map接受一個匿名內部類Function
的實例,並返回一個ObservableMap
對象。ObservableMap
本質上是一個Observable
,也是一個被觀察者,其構造方法接受最外層的那個被Observable
實例,和Function
實例。ObservableMap
重寫了subscribeActual
方法,在subscribeActual
中使用新建了一個MapObserver
實現了對原始Observable
的觀察。Observable
中的數據變會被髮送到MapObserver
的實例中。MapObserver
構造方法接收原始Observable
的觀察者actual
,和Function
的實例mapper
MapObserver
在其onNext
方法中調用mapper
的apply
方法,得到該方法的返回值v apply方法就是map實例中: public String apply(User user) throws Throwable { return user.getName(); }
downstream
的onNext方法,並傳入實參v。其中downstream
是MapObserver
父類中定義的變量,在MapObserver
構造方法中super(actual);
時初始化,其自己就是傳入的actual
,本質上就是最原始的Observable
整個流程能夠歸納以下: 存在一個原始的ObservableA
和一個觀察者ObserverA
,當原始的被觀察者ObservableA
調用map
,並傳入一個匿名內部類實例化的’function‘,map
新建並返回了一個被觀察者ObservableB
,經過subscribe
讓觀察者ObserverA
對其進行訂閱。並重寫subscribeActual
方法,在其被訂閱時建立一個新的觀察者ObserverB
其接受的,並用ObserverB
對原始的ObservableA
進行訂閱觀察。當原始的ObservableA
發出事件,調用ObserverB
的onNext
方法,subscribeActual
接受的觀察者即是最原始的觀察者ObserverA
。ObserverB
變執行經過匿名內部類實例化的’function‘的apply
方法獲得數據v
,緊接着調用原始的ObservableA
的onNext
方法,並傳入實參v
,ObserverA
觀察到事件。 一句話歸納:一個原始的被觀察者和觀察者,可是讓原始的觀察者去訂閱一個新的觀察者,當新的被觀察者被訂閱的時候,建立一個新的觀察者去訂閱原始的被觀察者,並在監聽的事件以後執行指定的操做後再通知原始觀察者。
faltMap
和map
的基本原理相似,其代碼以下:
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) { return new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize); } public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends ObservableSource<? extends U>> mapper; final boolean delayErrors; final int maxConcurrency; final int bufferSize; public ObservableFlatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { super(source); } @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize)); } static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> { MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { ... this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY); } @Override public void onSubscribe(Disposable d) { downstream.onSubscribe(this); } @Override public void onNext(T t) { ObservableSource<? extends U> p; p = mapper.apply(t); subscribeInner(p); } @SuppressWarnings("unchecked") void subscribeInner(ObservableSource<? extends U> p) { InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++); p.subscribe(inner); } void drain() { drainLoop(); } void drainLoop() { final Observer<? super U> child = this.downstream; child.onNext(o); } } static final class InnerObserver<T, U> extends AtomicReference<Disposable> implements Observer<U> { private static final long serialVersionUID = -4606175640614850599L; final long id; final MergeObserver<T, U> parent; volatile boolean done; volatile SimpleQueue<U> queue; int fusionMode; InnerObserver(MergeObserver<T, U> parent, long id) { this.id = id; this.parent = parent; } @Override public void onNext(U t) { parent.drain(); } } } 複製代碼
上述代碼便是faltMap
精簡後的源碼,其中大部分代碼的運做流程和前文中的map
源碼一致,咱們繼續延續上文中講解中的觀察者和被觀察者。重點關注其不一樣的地方: faltMap
返回一個新的被觀察者ObservableB
,重寫ObservableB
的subscribeActual
方法在原始的觀察者ObserverA
對其進行訂閱時新建一個觀察者ObserverB
對原始的ObservableA
進行訂閱。新的觀察者ObserverB
持有原始的ObserverA
和faltMap
接收的匿名對象實例function
。當ObserverB
監聽到原始的被觀察者ObservableA
的事件時,ObserverB
調用function
的apply
方法得到新新的被觀察者ObservableC
,再建立一個新的觀察者ObserverC
對ObservableC
進行訂閱,ObserverC
持有原始的觀察者ObserverA
,在ObserverC
觀察到被觀察者ObservableC
的時間時,調用原始的觀察者ObserverA
的方法。
至此,map和flatMap已基本分析完畢,其中map的代碼比較簡單易懂,flatMap中還涉及到大量輔助操做,文中並未涉及到其中的合併等操做,閱讀起來有些困難。若是僅僅是爲了瞭解兩者的原理,能夠閱讀Single<T>
中的代碼。其中的代碼量遠遠少於Observable
中的代碼量。若是對RxJava基本的模式還不瞭解,能夠閱讀大神博客手寫極簡版的Rxjava