RxJava中提供了大量的操做符,這大大提升了了咱們的開發效率。其中最基本的兩個變換操做符就是map
和flatMap
。而其餘變換操做符的原理基本與map
相似。java
Observable
map
對Observable發射的每一項數據應用一個函數,執行變換操做。對原始的Observable發射的每一項數據應用一個你選擇的函數,而後返回一個發射這些結果的Observable。git
flatMap
將一個發射數據的Observable變換爲多個Observables,而後將它們發射的數據合併後放進一個單獨的Observable。操做符使用一個指定的函數對原始Observable發射的每一項數據執行變換操做,這個函數返回一個自己也發射數據的Observable,而後FlatMap合併這些Observables發射的數據,最後將合併後的結果當作它本身的數據序列發射
經過代碼來看一下二者的使用用方法:github
Observable.just(new User("白瑞德"))
.map(new Function<User, String>() {
@Override
public String apply(User user) throws Throwable {
return user.getName();
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Throwable {
System.out.println(s);
}
});
<<<白瑞德
複製代碼
這段代碼接受一個User對象,最後打印出User中的name。bash
假設存在一個需求,圖書館要打印每一個User借走每一本書的名字: User
結構以下:app
class User {
private String name;
private List<String> book;
}
複製代碼
咱們來看一下map
的實現方法:ide
Observable.fromIterable(userList)
.map(new Function<User, List<String>>() {
@Override
public List<String> apply(User user) throws Throwable {
return user.getBook();
}
})
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Throwable {
for (String s : strings) {
System.out.println(s);
}
}
});
複製代碼
能夠看到,map
的轉換老是一對一,只能單一轉換。咱們不得不借助循環進行打印。 下面咱們來看一下flatMap
的實現方式:函數
Observable.fromIterable(userList)
.flatMap(new Function<User, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(User user) throws Throwable {
return Observable.fromIterable(user.getBook());
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String o) throws Throwable {
System.out.println(o);
}
});
複製代碼
flatmap
既能夠單一轉換也能夠一對多/多對多轉換。flatMap
使用一個指定的函數對原始Observable
發射的每一項數據之行相應的變換操做,這個函數返回一個自己也發射數據的Observable
,所以能夠再內部再次進行事件的分發。而後flatMap
合併這些Observables
發射的數據,最後將合併後的結果當作它本身的數據序列發射。oop
下面咱們就結合源碼來分析一下這兩個操做符。爲了下降代碼閱讀難道,這裏只保留核心代碼:源碼分析
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
//接受一個Function實例,並返回一個ObservableMap
return new ObservableMap<T, R>(this, mapper);
}
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
//調用用父類構造方法,初始化父類中的downstream
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
v = mapper.apply(t);
downstream.onNext(v);
}
}
}
複製代碼
這段代碼是去掉map源碼中一些校驗和其它相關回調後的精簡代碼。接下來分析一下代碼流程:ui
map
時,map接受一個匿名內部類Function
的實例,並返回一個ObservableMap
對象。ObservableMap
本質上是一個Observable
,也是一個被觀察者,其構造方法接受最外層的那個被Observable
實例,和Function
實例。ObservableMap
重寫了subscribeActual
方法,在subscribeActual
中使用新建了一個MapObserver
實現了對原始Observable
的觀察。Observable
中的數據變會被髮送到MapObserver
的實例中。MapObserver
構造方法接收原始Observable
的觀察者actual
,和Function
的實例mapper
MapObserver
在其onNext
方法中調用mapper
的apply
方法,得到該方法的返回值v apply方法就是map實例中: public String apply(User user) throws Throwable { return user.getName(); }
downstream
的onNext方法,並傳入實參v。其中downstream
是MapObserver
父類中定義的變量,在MapObserver
構造方法中super(actual);
時初始化,其自己就是傳入的actual
,本質上就是最原始的Observable
整個流程能夠歸納以下: 存在一個原始的ObservableA
和一個觀察者ObserverA
,當原始的被觀察者ObservableA
調用map
,並傳入一個匿名內部類實例化的’function‘,map
新建並返回了一個被觀察者ObservableB
,經過subscribe
讓觀察者ObserverA
對其進行訂閱。並重寫subscribeActual
方法,在其被訂閱時建立一個新的觀察者ObserverB
其接受的,並用ObserverB
對原始的ObservableA
進行訂閱觀察。當原始的ObservableA
發出事件,調用ObserverB
的onNext
方法,subscribeActual
接受的觀察者即是最原始的觀察者ObserverA
。ObserverB
變執行經過匿名內部類實例化的’function‘的apply
方法獲得數據v
,緊接着調用原始的ObservableA
的onNext
方法,並傳入實參v
,ObserverA
觀察到事件。 一句話歸納:一個原始的被觀察者和觀察者,可是讓原始的觀察者去訂閱一個新的觀察者,當新的被觀察者被訂閱的時候,建立一個新的觀察者去訂閱原始的被觀察者,並在監聽的事件以後執行指定的操做後再通知原始觀察者。
faltMap
和map
的基本原理相似,其代碼以下:
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize);
}
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final boolean delayErrors;
final int maxConcurrency;
final int bufferSize;
public ObservableFlatMap(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
super(source);
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> {
MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
...
this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY);
}
@Override
public void onSubscribe(Disposable d) {
downstream.onSubscribe(this);
}
@Override
public void onNext(T t) {
ObservableSource<? extends U> p;
p = mapper.apply(t);
subscribeInner(p);
}
@SuppressWarnings("unchecked")
void subscribeInner(ObservableSource<? extends U> p) {
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
p.subscribe(inner);
}
void drain() {
drainLoop();
}
void drainLoop() {
final Observer<? super U> child = this.downstream;
child.onNext(o);
}
}
static final class InnerObserver<T, U> extends AtomicReference<Disposable>
implements Observer<U> {
private static final long serialVersionUID = -4606175640614850599L;
final long id;
final MergeObserver<T, U> parent;
volatile boolean done;
volatile SimpleQueue<U> queue;
int fusionMode;
InnerObserver(MergeObserver<T, U> parent, long id) {
this.id = id;
this.parent = parent;
}
@Override
public void onNext(U t) {
parent.drain();
}
}
}
複製代碼
上述代碼便是faltMap
精簡後的源碼,其中大部分代碼的運做流程和前文中的map
源碼一致,咱們繼續延續上文中講解中的觀察者和被觀察者。重點關注其不一樣的地方: faltMap
返回一個新的被觀察者ObservableB
,重寫ObservableB
的subscribeActual
方法在原始的觀察者ObserverA
對其進行訂閱時新建一個觀察者ObserverB
對原始的ObservableA
進行訂閱。新的觀察者ObserverB
持有原始的ObserverA
和faltMap
接收的匿名對象實例function
。當ObserverB
監聽到原始的被觀察者ObservableA
的事件時,ObserverB
調用function
的apply
方法得到新新的被觀察者ObservableC
,再建立一個新的觀察者ObserverC
對ObservableC
進行訂閱,ObserverC
持有原始的觀察者ObserverA
,在ObserverC
觀察到被觀察者ObservableC
的時間時,調用原始的觀察者ObserverA
的方法。
至此,map和flatMap已基本分析完畢,其中map的代碼比較簡單易懂,flatMap中還涉及到大量輔助操做,文中並未涉及到其中的合併等操做,閱讀起來有些困難。若是僅僅是爲了瞭解兩者的原理,能夠閱讀Single<T>
中的代碼。其中的代碼量遠遠少於Observable
中的代碼量。若是對RxJava基本的模式還不瞭解,能夠閱讀大神博客手寫極簡版的Rxjava