Rxjava 2.x 源碼系列 - 變換操做符 Map(上)

Rxjava 2.x 源碼系列 - 基礎框架分析java

Rxjava 2.x 源碼系列 - 線程切換 (上)緩存

Rxjava 2.x 源碼系列 - 線程切換 (下)bash

Rxjava 2.x 源碼系列 - 變換操做符 Map(上)併發

前言

在前幾篇博客中,咱們介紹了 Rxjava Observable 與 Observer 之間是如何訂閱與取消訂閱的,以及 Rxjava 是如何控制 subsribe 線程和 observer 的回調線程的。app

今天,讓咱們一塊兒來看一下 Rxjava 中另一個比較重要的功能,操做符變化功能框架


基礎知識

經常使用的變換操做符

操做符 做用
map 映射,將一種類型的數據流/Observable映射爲另一種類型的數據流/Observable
cast 強轉 傳入一個class,對Observable的類型進行強轉.
flatMap 平鋪映射,從數據流的每一個數據元素中映射出多個數據,並將這些數據依次發射。(注意是無序的)
concatMap concatMap 與 flatMap 的功能很是相似,只不過發送的數據是有序的
buffer 緩存/打包 按照必定規則從Observable收集一些數據到一個集合,而後把這些數據做爲集合打包發射。
groupby 分組,將原來的Observable分拆爲Observable集合,將原始Observable發射的數據按Key分組,每個Observable發射一組不一樣的數據
to... 將數據流中的對象轉換爲List/SortedList/Map/MultiMap集合對象,並打包發射
timeInterval 將每一個數據都換爲包含本次數據和離上次發射數據時間間隔的對象併發射
timestamp 將每一個數據都轉換爲包含本次數據和發射數據時的時間戳的對象併發射

從 Demo 提及

接下來,咱們一塊兒來看一下一個 demo,咱們經過 map 操做符將 Integer 轉化爲 String。ide

// 採用RxJava基於事件流的鏈式操做
Observable.create(new ObservableOnSubscribe<Integer>() {

    // 1. 被觀察者發送事件 = 參數爲整型 = 一、二、3
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);

    }
    // 2. 使用Map變換操做符中的Function函數對被觀察者發送的事件進行統一變換:整型變換成字符串類型
}).map(new Function<Integer, String>() {
    @Override
    public String apply(Integer integer) throws Exception {
        return "使用 Map變換操做符 將事件" + integer +"的參數從 整型"+integer + " 變換成 字符串類型" + integer ;
    }
    // 3. 觀察者接收事件時,是接收到變換後的事件 = 字符串類型
}).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        
    }

    @Override
    public void onNext(String s) {
        Log.d(TAG, s);
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onComplete() {

    }
});


複製代碼

輸出結果函數

使用 Map變換操做符 將事件1的參數從 整型1 變換成 字符串類型1
使用 Map變換操做符 將事件2的參數從 整型2 變換成 字符串類型2
使用 Map變換操做符 將事件3的參數從 整型3 變換成 字符串類型3

複製代碼

map 源碼分析

  • 借鑑前面幾篇博客的分析,咱們先來看一下 Observable 的 map 方法,它的套路跟 create 方法的套路也是類似的,判空是否爲 null,爲 null 拋出異常。
  • 接着,用一個包裝類包裝當前的 Observable 實例,只不過這個包裝類是 ObservableMap。在 ObsevableMap 裏面持有上游 observable 實例的引用,這個是典型的裝飾者模式. 關於裝飾者模式,能夠參考個人這一篇博客。裝飾者模式及其應用
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}


複製代碼

接下來,咱們一塊兒來看一下 ObservableMap。源碼分析

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
}
複製代碼

在前面博客中,咱們已經說到,當咱們調用 observable.subscribe(observer) 的時候,代碼調用邏輯是這樣的。ui

在 observable 的 subscribeActual 方法中

  • 若是有上游的話,會調用上游的 subscribe 方法(即 source.subscribe() 方法),而在 subscribe 方法中,又會調用當前 observable 的 subcribeActual 方法
  • 若是沒有上游的話,會直接調用當前 Observable 的 subscirbe 方法,並調用 observable 的 onSuscribe 方法

observable 的 subscribe 流程圖

在 ObservableMap 的 subscribeActual 方法裏面,MapObserver 類對 Observer 進行包裝,又是這樣的套路,裝飾者模式。

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
    final Function<? super T, ? extends U> mapper;

    MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
        super(actual);
        this.mapper = mapper;
    }

    @Override
    public void onNext(T t) {
       // 1 判斷是否 done,若是已經 done ,直接返回
        if (done) {
            return;
        }

        if (sourceMode != NONE) {
            actual.onNext(null);
            return;
        }

        U v;
      
        try {
            // 2 調用 mapper.apply(t) ,進行相應的轉化
            v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
        } catch (Throwable ex) {
            fail(ex);
            return;
        }
        // 3 調用下游的 onNext 方法,並將 V 暴露出去
        actual.onNext(v);
    }
    
    
    ----

}
複製代碼

首先看他的構造方法,有兩個參數, actual,mapper。 actual 表明下游的 Observer,mapper 爲傳入的 Function。

接着咱們來看下 onNext 方法

  1. 判斷是否 done,若是已經 done ,直接返回
  2. 調用 mapper.apply(t) ,進行相應的轉化
  3. 調用下游的 onNext 方法,並將 V 暴露出去

這樣就完成了操做符的操做功能

總結

OK,咱們在回到上面的 demo,來整理一下他的流程

當咱們調用 observable.subscribe(observer) 的時候

  • 會促發第二個 Observable 的 subscribeAtActual 方法,在該方法中,又會調用上游 Observable 的 subscribe 方法,即第一個 Observable 的 subscribe 方法
  • 在第一個 Observable 的 subscribe 方法裏面,又會調用當前 Observable 的 subscribeAtActual 方法,會調用 observer.onSubscribe(parent) 方法,並調用 source.subscribe(parent) 將咱們的 observer 的包裝類 parent 暴露出去
  • 當咱們在咱們建立的 ObservableOnSubscribe 的 subscribe 方法中,調用 emitter 的 onNext 方法的時候,這個時候會調用到咱們的 MapObserver 的 onNext 方法
  • 在 MapObserver 的 onNext 方法,有會調用到下游 Observer 的 onNext 方法,進而調用咱們外部的 observer 的 onNext 方法

小結

  • map 的操做過程跟以前的線程切換的實現原理基本同樣,經過在中間使用裝飾者模式插入一箇中間的 Observable 和 Observer,你能夠想象爲代理。
  • 代理 Observable 作的事就是接收下游 Obsever 的訂閱事件,而後經過代理 Obsever 訂閱上游 Observer,而後在上游 Observer 下發數據給代理 Observer 時,經過先調用 mapper.apply 轉換回調函數得到轉換後的數據,而後下發給下游 Obsever。

Android 技術人

掃一掃,歡迎關注個人公衆號。若是你有好的文章,也歡迎你的投稿。

相關文章
相關標籤/搜索