Rxjava 2.x 源碼系列 - 線程切換 (下)bash
Rxjava 2.x 源碼系列 - 變換操做符 Map(上)併發
在前幾篇博客中,咱們介紹了 Rxjava Observable 與 Observer 之間是如何訂閱與取消訂閱的,以及 Rxjava 是如何控制 subsribe 線程和 observer 的回調線程的。app
今天,讓咱們一塊兒來看一下 Rxjava 中另一個比較重要的功能,操做符變化功能框架
操做符 | 做用 |
---|---|
map | 映射,將一種類型的數據流/Observable映射爲另一種類型的數據流/Observable |
cast | 強轉 傳入一個class,對Observable的類型進行強轉. |
flatMap | 平鋪映射,從數據流的每一個數據元素中映射出多個數據,並將這些數據依次發射。(注意是無序的) |
concatMap | concatMap 與 flatMap 的功能很是相似,只不過發送的數據是有序的 |
buffer | 緩存/打包 按照必定規則從Observable收集一些數據到一個集合,而後把這些數據做爲集合打包發射。 |
groupby | 分組,將原來的Observable分拆爲Observable集合,將原始Observable發射的數據按Key分組,每個Observable發射一組不一樣的數據 |
to... | 將數據流中的對象轉換爲List/SortedList/Map/MultiMap集合對象,並打包發射 |
timeInterval | 將每一個數據都換爲包含本次數據和離上次發射數據時間間隔的對象併發射 |
timestamp | 將每一個數據都轉換爲包含本次數據和發射數據時的時間戳的對象併發射 |
接下來,咱們一塊兒來看一下一個 demo,咱們經過 map 操做符將 Integer 轉化爲 String。ide
// 採用RxJava基於事件流的鏈式操做
Observable.create(new ObservableOnSubscribe<Integer>() {
// 1. 被觀察者發送事件 = 參數爲整型 = 一、二、3
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
// 2. 使用Map變換操做符中的Function函數對被觀察者發送的事件進行統一變換:整型變換成字符串類型
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "使用 Map變換操做符 將事件" + integer +"的參數從 整型"+integer + " 變換成 字符串類型" + integer ;
}
// 3. 觀察者接收事件時,是接收到變換後的事件 = 字符串類型
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
複製代碼
輸出結果函數
使用 Map變換操做符 將事件1的參數從 整型1 變換成 字符串類型1
使用 Map變換操做符 將事件2的參數從 整型2 變換成 字符串類型2
使用 Map變換操做符 將事件3的參數從 整型3 變換成 字符串類型3
複製代碼
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
複製代碼
接下來,咱們一塊兒來看一下 ObservableMap。源碼分析
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
}
複製代碼
在前面博客中,咱們已經說到,當咱們調用 observable.subscribe(observer) 的時候,代碼調用邏輯是這樣的。ui
在 observable 的 subscribeActual 方法中
在 ObservableMap 的 subscribeActual 方法裏面,MapObserver 類對 Observer 進行包裝,又是這樣的套路,裝飾者模式。
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
// 1 判斷是否 done,若是已經 done ,直接返回
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
// 2 調用 mapper.apply(t) ,進行相應的轉化
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
// 3 調用下游的 onNext 方法,並將 V 暴露出去
actual.onNext(v);
}
----
}
複製代碼
首先看他的構造方法,有兩個參數, actual,mapper。 actual 表明下游的 Observer,mapper 爲傳入的 Function。
接着咱們來看下 onNext 方法
這樣就完成了操做符的操做功能
OK,咱們在回到上面的 demo,來整理一下他的流程
當咱們調用 observable.subscribe(observer) 的時候
掃一掃,歡迎關注個人公衆號。若是你有好的文章,也歡迎你的投稿。