擁抱RxJava(二):Observable究竟如何封裝數據?

這一系列文章原本我發表在簡書。最近開始轉移到掘金。之後也會在掘金髮表(慢慢拋棄簡書了應該,掘金的技術環境確實比簡書好些)。javascript

上篇簡單講到了一些關於Event/Rx bus的優缺點。而且提到了如何「正確」使用RxJava,而不是使用RxBus來本身從新發明輪子。html

放棄RxBus,擁抱RxJava(一):爲何避免使用EventBus/RxBusjava

其中也講到了一個簡單使用 create() 方法來進行封裝Observable。但也留下了許多坑,好比內存泄漏,不能Multicast(多個Subscriber訂閱同一個Observable) 等問題。因此這篇,咱們接着經過這個例子,來具體瞭解下,如何封裝Observable。react

1. Observable提供的靜態方法都作了什麼?

首先咱們來簡單看一下Observable的靜態方法,just/from/create都怎麼爲你提供Observable。
咱們先看just:編程

public static <T> Observable<T> just(T item) {
    ObjectHelper.requireNonNull(item, "The item is null");
    return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}複製代碼

咱們暫時不須要糾結 RxJavaPlugins.onAssembly() 這個方法。比較重要的是 just(T item) 方法會爲你提供一個 ObservableJust (item) 的實例,而這個 ObservableJust 類,就是一個RxJava內部的實現類。
在 RxJava 2.x 中 Observable 是一個抽象類,只有一個抽象方法,subscribeActual(Observer observer);(可是Observable的源碼足足有13518行!!!)
安全

public abstract class Observable<T> implements ObservableSource<T>{
  //implemented methods

  protected abstract void subscribeActual(Observer<? super T> observer);

  //other implements/operators
}複製代碼

那麼ObservableJust這個類究竟什麼樣呢?app

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {

    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        s.onSubscribe(sd);
        sd.run();
    }

    @Override
    public T call() {
        return value;
    }
}複製代碼

咱們首先看到構造方法裏,直接把value賦給了ObservableJust的成員。這也就是爲何Observable.just()裏的代碼會直接運行,而不是像create()方法,有Subscriber時候才能運行(Observable.create的初始化方法在subscribeAcutal裏執行)。
再來看看兩個item的just(T item1,T item2):ide

public static <T> Observable<T> just(T item1, T item2) {
    ObjectHelper.requireNonNull(item1, "The first item is null");
    ObjectHelper.requireNonNull(item2, "The second item is null");

    return fromArray(item1, item2);
}複製代碼

誒?怎麼畫風突變?不是ObservableJust了?其實除了只有一個item的just,其餘的just方法也都是調用了這個fromArray。那咱們來看看這個fromArray:源碼分析

public static <T> Observable<T> fromArray(T... items) {
    //NullCheck
    return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}複製代碼

前面一些check咱們忽略,這裏咱們發現一些熟悉的身影了ObservableFromArray (items)。又一個Observable的實現類。 post

public final class ObservableFromArray<T> extends Observable<T> {
    final T[] array;
    public ObservableFromArray(T[] array) {
        this.array = array;
    }
    @Override
    public void subscribeActual(Observer<? super T> s) {
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
        s.onSubscribe(d);
        d.run();
    }

    static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
      //implements
    }
}複製代碼

是否是更熟悉?其實Observable幾乎全部的靜態方法和操做符都是這樣,甚至包括一些著名的RxJava庫好比RxBinding,也都是使用這種封裝方法。內部實現Observable的subscribeActual()方法。對外只提供靜態方法來爲你生成Observable。爲何這麼作,咱們來了解一下subscribeActual()方法。

2. subscribeActual() 到底是什麼?

subscribeActual()其實就是Observable和Observer溝通的橋樑。這個Observer(Subscriber)就是你在Observable.subscribe()方法裏寫的那個類,或者是Consumer(只處理onNext方法)。

public final void subscribe(Observer<? super T> observer) {
        //NullCheck&Apply plugin
        subscribeActual(observer);

}複製代碼

咱們看到其實這個方法除了Check和Apply就只有這一行subscribeActual(observer),鏈接了Observable和Observer。因此咱們知道了,subscribeActual()方法裏的代碼,只有在subscribe()調用後,纔回調用。

那麼他們是如何連接的呢?其實很簡單,根據你的邏輯一句一句的調用observer.onXX()方法就能夠了。好比剛纔咱們看到的ObservableJust:

@Override
public void run() {
    if (get() == START && compareAndSet(START, ON_NEXT)) {
        observer.onNext(value);
        if (get() == ON_NEXT) {
            lazySet(ON_COMPLETE);
            observer.onComplete();
        }
    }
}複製代碼

再好比咱們的ObservableFromArray:

void run() {
    T[] a = array;
    int n = a.length;

    for (int i = 0; i < n && !isDisposed(); i++) {
        T value = a[i];
        if (value == null) {
            actual.onError(new NullPointerException("The " + i + "th element is null"));
            return;
        }
        actual.onNext(value);
    }
    if (!isDisposed()) {
        actual.onComplete();
    }
}複製代碼

複雜點的例子,好比如何封裝button的OnClick事件:

@Override protected void subscribeActual(Observer<? super Object> observer) {
  if (!checkMainThread(observer)) {
    return;
  }
  Listener listener = new Listener(view, observer);
  observer.onSubscribe(listener);
  view.setOnClickListener(listener);
}

static final class Listener extends MainThreadDisposable implements OnClickListener {
  private final View view;
  private final Observer<? super Object> observer;

  Listener(View view, Observer<? super Object> observer) {
    this.view = view;
    this.observer = observer;
  }

  @Override public void onClick(View v) {
    if (!isDisposed()) {
      observer.onNext(Notification.INSTANCE);
    }
  }

  @Override protected void onDispose() {
    view.setOnClickListener(null);
  }
  }
}複製代碼

可是細心的同窗應該看到了,每一個subscribeActual()方法裏,都會有 observer.onSubscribe(disposable)這句。那麼這句又是作什麼的呢?根據Observable Contract,onSubscribe是告知已經準備好接收item。並且經過這個方法將Disposable傳回給Subscriber。
Disposable其實就是控制你取消訂閱的。他只有兩個方法 dispose() 取消訂閱,和 isDisposed() 來通知是否已經取消了訂閱。
取消訂閱時,要根據需求釋放資源。
在subscribeActual()裏邏輯要嚴謹,好比onComplete()以後不要有onNext()。須要注意的點不少,因此可能這也就是爲何RxJava推薦用戶使用靜態方法生成Observable吧。若是有興趣,能夠直接閱讀

Observable Contract

3 Observable.create()

create()方法是一個歷史遺留問題了。因爲這個命名,不少人都以爲Observable.create()不就應該是生成Obseravble最早想到的方法嗎? 在 RxJava 1.x 這是錯誤的,Observable.create()在 1.x 版本幾乎飽受詬病。不是他很差,而是他太難操控。 RxJava必定要遵循Observable Contract纔會按照預期執行,而使用create()你能夠徹底無視這個規則。你能夠在onComplete以後繼續發送onNext事件,下游仍會收到事件。若是在1.x想正確的使用Observable.create()你必須首先了解幾乎全部的規則。因此一直以來 RxJava 1.x 版本使用Observable.create是不推薦的。(在新版的RxJava 1.3中,create()方法已經標記@deprecated

在經歷了1.x的失敗後,RxJava 2.x 提供了安全的create()方法。他經過ObservableEmitter做爲中間人,代替處理。使得即使你在Emitter中沒有參照ObservableContract,下游仍會按照預期的進行。

4 關於操做符

咱們上文說到的just,from,create等等是生成Observable的操做符,那麼如map,filter等等的操做符會有什麼區別嗎?
咱們來看下源碼:
map:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}複製代碼

filter:

public final Observable<T> filter(Predicate<? super T> predicate) {
    ObjectHelper.requireNonNull(predicate, "predicate is null");
    return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
}複製代碼

咱們看到,這個區別就是在生成新Observable的時候,會須要兩個參數,一個是這個Observable自己,也就是代碼中的this,另外一個就是須要進行操做的接口實現(固然也有更多參數的好比Schduler等等,大同小異,再也不贅述)。而這個Observable自己,也就是咱們口中常說的上游。上游下游是根據操做符的來講,對於一個操做符,在這個操做符以前的就是上游,而這個操做符以後的就是下游。
好比咱們的map:

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
}複製代碼

source就是咱們的上游。而這個MapObserver就是咱們的中間人(其實也算是操做符自己),將數據根據需求,處理後發給下游。
操做符原理很是複雜,map能夠說是最簡單的了。若是有興趣我推薦能夠看一下publish(selector)等等複雜的操做符。更深刻理解操做符。固然,有毅力的同窗也能夠關注RxJava 主要負責人的系列博客(純英文,並且很難懂,不是英語難懂,是原理很難懂)。

Advanced Reactive Java

關於lift

讀過扔物線大神文章入門的同窗應該對lift有一個瞭解。RxJava 1.x 幾乎全部操做符都是基於lift完成的。可是RxJava 2.x 能夠說幾乎看不到lift。 目前lift僅僅做爲提供自定義操做符的一個接口(雖然更推薦使用簡單好用的compose,由於lift須要複寫七個抽象方法。)。
最後再說一下幾點:

  • Flowable:Floawble其實在實現上和Observable相似,區別是Observable同過 Disposable控制取消訂閱。而Flowable同過Subscription。其中還須要request()方法控制流量。具體關於這個問題,我推薦這篇文章

    給初學者的RxJava2.0教程

總結:

  • 咱們從源碼分析角度來講,RxJava 2.x 也是同過subscribeActual來連接Observable和Observer(Subscriber)。本質上和Listener沒什麼太大區別。可是,RxJava的確是諸多一線Java/Android開發者的結晶。豐富的操做符,線程調度等等諸多優點。並且保證類型安全。這裏再次感謝他們,畢竟咱們仍是站在他們肩膀上編程

小彩蛋:關於Reactive Streams 和 RxJava

其實 Reactive Programming在Java上的實現不止 RxJava 一個。比較出名的還有Project Reactor和 google 的 agera 等等。 可是綜合考慮,不管是性能,擴展性上RxJava在Android平臺上是最優秀的。 因爲都在JVM上,你們都決定統一接口因此推出 Reactive Streams定義了這一套的幾個基本接口:

包括了 :

//對應RxJava中的Flowable
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

//RxJava並無直接對應,而是各類形式的實現類。
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

//同上,RxJava在flowable中直接使用Subscription
public interface Subscription {
    public void request(long n);
    public void cancel();
}

//Flowable版本的Subject
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}複製代碼

正由於這四個接口的命名關係。本在RxJava 1.x 的Observable更名爲Flowable。而RxJava 2.x的 Observable是徹底沒有backpressure支持。由於起名衝突的緣由,將原本的Subscription改成Disposable,Subscriber改成Observer。

相關文章
相關標籤/搜索