RxJava2與RxJava1的簡單對比

前言:

RxJava 2.0已經於2016年10月29日正式發佈,本人也專門抽時間研究了一下其相關特性。趁熱打鐵,在這篇文章裏對RxJava2.0的使用進行一個簡單的總結。java

閱讀本文前須要掌握RxJava 1.0的基本概念,若是從未接觸過RxJava, 請點擊這裏react

RxJava 2.0 VS RxJava 1.0:

1. RxJava 2.0 再也不支持 null 值,若是傳入一個null會拋出 NullPointerException;安全

Observable.just(null);
    Single.just(null);
    Flowable.just(null);
    Maybe.just(null);
    Observable.fromCallable(() -> null)
            .subscribe(System.out::println, Throwable::printStackTrace);
    Observable.just(1).map(v -> null)
            .subscribe(System.out::println, Throwable::printStackTrace);

2. RxJava 2.0 全部的函數接口(Function/Action/Consumer)均設計爲可拋出Exception,解決編譯異常須要轉換問題;架構

3. RxJava 1.0 中Observable不能很好支持背壓,在RxJava2.0 中將Oberservable完全實現成不支持背壓,而新增Flowable 來支持背壓。(關於背壓的概念請參考本人對ReativeX的英文原文的中文翻譯app

 

一. Observable

RxJava 1.0有四個基本概念:Observable(可觀察者,即被觀察者)、Observer(觀察者)、subscribe(訂閱)、事件。ObservableObserver經過 subscribe()方法實現訂閱關係,從而 Observable能夠在須要的時候發出事件來通知 Observer。ide

            

基於以上的概念, RxJava 1.0的基本實現主要有三點:函數

step1: 建立 Observerpost

Observer 即觀察者,它決定事件觸發的時候將有怎樣的行爲。 RxJava 中的 Observer 接口的實現方式:測試

Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

除了Observer接口以外,RxJava 還內置了一個實現了Observer的抽象類: SubscriberSubscriberObserver接口進行了一些擴展,但他們的基本使用方式是徹底同樣的:ui

Subscriber<String> subscriber = new Subscriber<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

step2:建立 Observable

Observable 即被觀察者,它決定何時觸發事件以及觸發怎樣的事件。 RxJava 使用 create() 方法來建立一個 Observable ,併爲它定義事件觸發規則:

Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});

step3:Subscribe (訂閱)

建立了 ObservableObserver以後,再用 subscrbe() 方法將它們聯結起來,整條鏈子就能夠工做了。代碼形式很簡單:

observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);

 

然而,在2.0中咱們熟悉的 Subscrber 竟然沒影了,取而代之的是 ObservableEmitter, 俗稱發射器。此外,因爲沒有了 Subscrber 的蹤跡,咱們建立觀察者時需使用 Observer。而 Observer 也不是咱們熟悉的那個 Observer,其回調的 Disposable 參數更是讓人摸不到頭腦。


step1:初始化一個Observable

Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onComplete();
            }
        });

 

step2:初始化一個Observer

Observer<Integer> observer= new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer value) {


            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
            }
        }

 

step3:創建訂閱關係

observable.subscribe(observer); //創建訂閱關係

 

不難看出,與 RxJava1.0 仍是存在着一些區別的。首先,建立Observable時,回調的是ObservableEmitter,字面意思即發射器,用於發射數據(onNext())和通知(onError()/onComplete())。其次,建立的Observer中多了一個回調方法 onSubscribe(),傳遞參數爲Disposable

ObservableEmitter: Emitter是發射器的意思,那就很好猜了,這個就是用來發出事件的,它能夠發出三種類型的事件,經過調用emitter的 onNext(T value) 、onComplete()和onError(Throwable e)就能夠分別發出next事件、complete事件和error事件

Disposable:這個單詞的字面意思是一次性用品,用完便可丟棄的。 那麼在RxJava中怎麼去理解它呢, 對應於上面的水管的例子, 咱們能夠把它理解成兩根管道之間的一個機關, 當調用它的 dispose() 方法時, 它就會將兩根管道切斷, 從而致使下游收不到事件,即至關於 Subsciption

注意: 調用dispose()並不會致使上游再也不繼續發送事件, 上游會繼續發送剩餘的事件.

來看個例子, 咱們讓上游依次發送 1,2,complete,4,在下游收到第二個事件以後, 切斷水管, 看看運行結果:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
                Log.d(TAG, "emit 4");
                emitter.onNext(4);
            }
        }).subscribe(new Observer<Integer>() {
            private Disposable mDisposable;
            private int i;

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "subscribe");
                mDisposable = d;
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "onNext: " + value);
                i++;
                if (i == 2) {
                    Log.d(TAG, "dispose");
                    mDisposable.dispose();
                    Log.d(TAG, "isDisposed : " + mDisposable.isDisposed());
                }
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "error");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "complete");
            }
        });

運行結果爲:

12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: subscribe
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 1
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: onNext: 1
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 2
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: onNext: 2
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: dispose
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: isDisposed : true
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 3
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit complete
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 4

從運行結果咱們看到, 在收到onNext 2這個事件後, 切斷了水管, 可是上游仍然發送了3, complete, 4這幾個事件, 並且上游並無由於發送了onComplete而中止。 同時能夠看到下游的 onSubscibe()方法是最早調用的.

Disposable的用處不止這些, 後面講解到了線程的調度以後, 咱們會發現它的重要性. 隨着後續深刻的講解, 咱們會在更多的地方發現它的身影.

此外,RxJava2.x中仍然保留了其餘簡化訂閱方法,咱們能夠根據需求,選擇相應的簡化訂閱。只不過傳入的對象改成了 Consumer。`

Disposable disposable = observable.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                  //這裏接收數據項
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
              //這裏接收onError
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
              //這裏接收onComplete。
            }
        });

 

不一樣於RxJava 1.0,RxJava 2.0中沒有了一系列的Action/Func接口,取而代之的是與Java8命名相似的函數式接口,以下圖: 
image_1arse84b03rfoo89mr1m53151a9.png-30.7kB 
其中Action相似於RxJava 1.0中的Action0,區別在於Action容許拋出異常。

public interface Action {
    /**
     * Runs the action and optionally throws a checked exception
     * @throws Exception if the implementation wishes to throw a checked exception
     */
    void run() throws Exception;
}

 

Consumer即消費者,用於接收單個值, BigConsumer則是接收兩個值, Function用於變換對象, Predicate用於判斷。這些接口命名大多參照了Java8,熟悉Java8新特性的應該都知道意思,這裏也就再也不贅述了。

線程調度

關於線程切換這點,RxJava1.x和RxJava2.x的實現思路是同樣的。這裏就簡單看下相關源碼。

1. subscribeOn

同RxJava1.x同樣, subscribeOn 用於指定 subscribe() 時所發生的線程,從源碼角度能夠看出,內部線程調度是經過 ObservableSubscribeOn 來實現的。

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

 

ObservableSubscribeOn 的核心源碼在 subscribeActual方法中,經過代理的方式使用SubscribeOnObserver 包裝Observer後,設置 Disposable 來將 subscribe 切換到 Scheduler 線程中

@Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent); //回調Disposable

        parent.setDisposable(scheduler.scheduleDirect(new Runnable() { //設置`Disposable`
            @Override
            public void run() {
                source.subscribe(parent); //使Observable的subscribe發生在Scheduler線程中
            }
        }));
    }

 

2. observeOn

observeOn 方法用於指定下游Observer回調發生的線程。

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
         //..
         //驗證安全
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

 

主要實如今 ObservableObserveOn 中的subscribeActual, 能夠看出,不一樣於subscribeOn, 沒有將subscribe 操做所有切換到Scheduler中,而是經過ObserveOnSubscriberScheduler配合,經過schedule()達到切換下游Observer回調發生的線程,這一點與RxJava 1.0實現幾乎相同。關於ObserveOnSubscriber 的源碼這裏再也不重複描述了。

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnSubscriber<T>(observer, w, delayError, bufferSize));
        }
    }

 

二. Flowable

Flowable是RxJava 2.0中新增的類,專門用於應對背壓(Backpressure)問題,但這並非RxJava 2.0中新引入的概念。所謂背壓,即生產者的速度大於消費者的速度帶來的問題,好比在Android中常見的點擊事件,點擊過快則會形成點擊兩次的效果。 

咱們知道,在RxJava 1.0中背壓控制是由Observable完成的,使用以下:

Observable.range(1,10000)
            .onBackpressureDrop()
            .subscribe(integer -> Log.d("JG",integer.toString()));

而在RxJava 2.0中將其獨立了出來,取名爲Flowable。所以,原先的Observable已經不具有背壓處理能力。 
經過 Flowable, 咱們能夠自定義背壓處理策略。 

/**
 * Represents the options for applying backpressure to a source sequence.
 */
public enum BackpressureStrategy {
    /**
     * OnNext events are written without any buffering or dropping.
     * Downstream has to deal with any overflow.
     * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
     */
    MISSING,
    /**
     * Signals a MissingBackpressureException in case the downstream can't keep up.
     */
    ERROR,
    /**
     * Buffers <em>all</em> onNext values until the downstream consumes it.
     */
    BUFFER,
    /**
     * Drops the most recent onNext value if the downstream can't keep up.
     */
    DROP,
    /**
     * Keeps only the latest onNext value, overwriting any previous value if the
     * downstream can't keep up.
     */
    LATEST
}

測試Flowable例子以下:

Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {

                for(int i=0;i<10000;i++){
                    e.onNext(i);
                }
                e.onComplete();
            }
        }, FlowableEmitter.BackpressureStrategy.ERROR) //指定背壓處理策略,拋出異常
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.newThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d("JG", integer.toString());
                        Thread.sleep(1000);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.d("JG",throwable.toString());
                    }
                });

 

或者可使用相似RxJava 1.0的方式來控制。

Flowable.range(1,10000)
                .onBackpressureDrop()
                .subscribe(integer -> Log.d("JG",integer.toString()));

 

其中還須要注意的一點在於,Flowable並非訂閱就開始發送數據,而是需等到執行Subscription.request()才能開始發送數據。固然,使用簡化subscribe訂閱方法會默認指定Long.MAX_VALUE。手動指定的例子以下:

Flowable.range(1,10).subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(Long.MAX_VALUE);//設置請求數
            }

            @Override
            public void onNext(Integer integer) {

            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {

            }
        });

 

三. Single

不一樣於RxJava 1.0中的 SingleSubscriber,RxJava 2.0中的 SingleObserver多了一個回調方法 onSubscribe

interface SingleObserver<T> {
    void onSubscribe(Disposable d);
    void onSuccess(T value);
    void onError(Throwable error);
}

 

四. Completable

同Single,Completable也被從新設計爲Reactive-Streams架構,RxJava 1.0 的 CompletableSubscriber改成 CompletableObserver,源碼以下:

interface CompletableObserver<T> {
    void onSubscribe(Disposable d);
    void onComplete();
    void onError(Throwable error);
}

 

五. Subject/Processor

Processor 和 Subject 的做用是相同的。關於Subject部分,RxJava 1.0與RxJava 2.0在用法上沒有顯著區別,這裏就不介紹了。其中Processor是RxJava 2.0新增的,繼承自 Flowable, 因此支持背壓控制。而Subject則不支持背壓控制。使用以下:

//Subject
    AsyncSubject<String> subject = AsyncSubject.create();
    subject.subscribe(o -> Log.d("JG",o));//three
    subject.onNext("one");
    subject.onNext("two");
    subject.onNext("three");
    subject.onComplete();

    //Processor
    AsyncProcessor<String> processor = AsyncProcessor.create();
    processor.subscribe(o -> Log.d("JG",o)); //three
    processor.onNext("one");
    processor.onNext("two");
    processor.onNext("three");
    processor.onComplete();

 

六. 操做符

關於操做符,RxJava 1.0與RxJava 2.0在命名和行爲上大多數保持了一致,須要強調的是subscribeWith操做符和compose操做符。

1. subscribeWith

RxJava 2.0中,subscribe 操做再也不返回 Subscription 也就是現在的 Disposable,爲了保持向後的兼容, Flowable 提供了subscribeWith方法返回當前的觀察者Subscriber對象, 而且同時提供了DefaultSubsriber, ResourceSubscriber, DisposableSubscriber接口,讓他們提供 Disposable對象, 從而能夠管理其生命週期。

2. compose

RxJava 1.0用法:

private static <T> Observable.Transformer<T, T> createIOSchedulers() {
		return new Observable.Transformer<T, T>() {
			@Override
			public Observable<T> call(Observable<T> tObservable) {
				return tObservable.subscribeOn(Schedulers.io())
						.unsubscribeOn(AndroidSchedulers.mainThread())
						.observeOn(AndroidSchedulers.mainThread());
			}
		};
	}

	public static <T> Observable.Transformer<JsonResult<T>,T> applySchedulers() {
		return createIOSchedulers();
	}
Action1<Integer> onNext = null;
String[] items = { "item1", "item2", "item3" };
Subscription subscription = Observable.from(items)
                                      .compose(RxUtil.<String>applySchedulers())
                                      .map(new Func1<String, Integer>() {
                                                  @Override public Integer call(String s) {
                                                      return Integer.valueOf(s);
                                                  }
                                              })
                                      .subscribe(onNext);

RxJava 2.0用法:

public static <T> ObservableTransformer<T, T> io2MainObservable() {
        return new ObservableTransformer<T, T>() {
            @Override
            public ObservableSource<T> apply(Observable<T> upstream) {
                return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
            }
        };
    }

	public static <T> ObservableTransformer<T, T> applySchedulers() {
		return io2MainObservable();
	}
Consumer<Integer> onNext = null;
String[] items = { "item1", "item2", "item3" };
Disposable disposable = Observable.fromArray(items)
                                  .compose(RxUtil.<String>applySchedulers())
                                  .map(new Function<String, Integer>() {
                                              @Override public Integer apply(String s) throws Exception {
                                                  return Integer.valueOf(s);
                                              }
                                          })
                                  .subscribe(onNext);

能夠注意到,RxJava 1.0中實現的是rx.Observable.Transformer接口, 該接口繼承自Func1<Observable<T>, Observable<R>>,  而2.0繼承自io.reactivex.ObservableTansformer<Upstream, Downstream>, 是一個獨立的接口。

除此以外,RxJava 2.0還提供了 FlowableTransformer接口,用於Flowable下的compose操做符,使用以下:

public static <T> FlowableTransformer<T, T> io2MainFlowable() {
        return new FlowableTransformer<T, T>() {
            @Override
            public Publisher<T> apply(Flowable<T> upstream) {
                return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
            }
        };
    }

	public static <T> FlowableTransformer<T, T> applySchedulers() {
		return io2MainFlowable();
	}
Consumer<Integer> onNext = null;
Disposable disposable = Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {

                for(int i=0;i<10000;i++){
                    e.onNext(i);
                }
                e.onComplete();
            }
        }, FlowableEmitter.BackpressureStrategy.ERROR) //指定背壓處理策略,拋出異常
        .compose(RxUtil.<String>applySchedulers())
        .subscribe(onNext);
相關文章
相關標籤/搜索