RxJava 2.0已經於2016年10月29日正式發佈,本人也專門抽時間研究了一下其相關特性。趁熱打鐵,在這篇文章裏對RxJava2.0的使用進行一個簡單的總結。java
閱讀本文前須要掌握RxJava 1.0的基本概念,若是從未接觸過RxJava, 請點擊這裏react
1. RxJava 2.0 再也不支持 null 值,若是傳入一個null會拋出 NullPointerException;安全
Observable.just(null); Single.just(null); Flowable.just(null); Maybe.just(null); Observable.fromCallable(() -> null) .subscribe(System.out::println, Throwable::printStackTrace); Observable.just(1).map(v -> null) .subscribe(System.out::println, Throwable::printStackTrace);
2. RxJava 2.0 全部的函數接口(Function/Action/Consumer)均設計爲可拋出Exception,解決編譯異常須要轉換問題;架構
3. RxJava 1.0 中Observable不能很好支持背壓,在RxJava2.0 中將Oberservable完全實現成不支持背壓,而新增Flowable 來支持背壓。(關於背壓的概念請參考本人對ReativeX的英文原文的中文翻譯)app
RxJava 1.0有四個基本概念:Observable(可觀察者,即被觀察者)、Observer(觀察者)、subscribe(訂閱)、事件。Observable和 Observer經過 subscribe()方法實現訂閱關係,從而 Observable能夠在須要的時候發出事件來通知 Observer。ide
基於以上的概念, RxJava 1.0的基本實現主要有三點:函數
step1: 建立 Observerpost
Observer 即觀察者,它決定事件觸發的時候將有怎樣的行爲。 RxJava 中的 Observer 接口的實現方式:測試
Observer<String> observer = new Observer<String>() { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "Completed!"); } @Override public void onError(Throwable e) { Log.d(tag, "Error!"); } };
除了Observer接口以外,RxJava 還內置了一個實現了Observer的抽象類: Subscriber。Subscriber對 Observer接口進行了一些擴展,但他們的基本使用方式是徹底同樣的:ui
Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "Completed!"); } @Override public void onError(Throwable e) { Log.d(tag, "Error!"); } };
step2:建立 Observable
Observable 即被觀察者,它決定何時觸發事件以及觸發怎樣的事件。 RxJava 使用 create() 方法來建立一個 Observable ,併爲它定義事件觸發規則:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("Hi"); subscriber.onNext("Aloha"); subscriber.onCompleted(); } });
step3:Subscribe (訂閱)
建立了 Observable和 Observer以後,再用 subscrbe() 方法將它們聯結起來,整條鏈子就能夠工做了。代碼形式很簡單:
observable.subscribe(observer); // 或者: observable.subscribe(subscriber);
然而,在2.0中咱們熟悉的 Subscrber 竟然沒影了,取而代之的是 ObservableEmitter, 俗稱發射器。此外,因爲沒有了 Subscrber 的蹤跡,咱們建立觀察者時需使用 Observer。而 Observer 也不是咱們熟悉的那個 Observer,其回調的 Disposable 參數更是讓人摸不到頭腦。
step1:初始化一個Observable
Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onComplete(); } });
step2:初始化一個Observer
Observer<Integer> observer= new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }
step3:創建訂閱關係
observable.subscribe(observer); //創建訂閱關係
不難看出,與 RxJava1.0 仍是存在着一些區別的。首先,建立Observable時,回調的是ObservableEmitter,字面意思即發射器,用於發射數據(onNext())和通知(onError()/onComplete())。其次,建立的Observer中多了一個回調方法 onSubscribe(),傳遞參數爲Disposable。
ObservableEmitter: Emitter是發射器的意思,那就很好猜了,這個就是用來發出事件的,它能夠發出三種類型的事件,經過調用emitter的 onNext(T value) 、onComplete()和onError(Throwable e)就能夠分別發出next事件、complete事件和error事件
Disposable:這個單詞的字面意思是一次性用品,用完便可丟棄的。 那麼在RxJava中怎麼去理解它呢, 對應於上面的水管的例子, 咱們能夠把它理解成兩根管道之間的一個機關, 當調用它的 dispose() 方法時, 它就會將兩根管道切斷, 從而致使下游收不到事件,即至關於 Subsciption。
注意: 調用dispose()並不會致使上游再也不繼續發送事件, 上游會繼續發送剩餘的事件.
來看個例子, 咱們讓上游依次發送 1,2,complete,4,在下游收到第二個事件以後, 切斷水管, 看看運行結果:
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { Log.d(TAG, "emit 1"); emitter.onNext(1); Log.d(TAG, "emit 2"); emitter.onNext(2); Log.d(TAG, "emit 3"); emitter.onNext(3); Log.d(TAG, "emit complete"); emitter.onComplete(); Log.d(TAG, "emit 4"); emitter.onNext(4); } }).subscribe(new Observer<Integer>() { private Disposable mDisposable; private int i; @Override public void onSubscribe(Disposable d) { Log.d(TAG, "subscribe"); mDisposable = d; } @Override public void onNext(Integer value) { Log.d(TAG, "onNext: " + value); i++; if (i == 2) { Log.d(TAG, "dispose"); mDisposable.dispose(); Log.d(TAG, "isDisposed : " + mDisposable.isDisposed()); } } @Override public void onError(Throwable e) { Log.d(TAG, "error"); } @Override public void onComplete() { Log.d(TAG, "complete"); } });
運行結果爲:
12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: subscribe 12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 1 12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: onNext: 1 12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 2 12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: onNext: 2 12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: dispose 12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: isDisposed : true 12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 3 12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit complete 12-02 06:54:07.728 7404-7404/zlc.season.rxjava2demo D/TAG: emit 4
從運行結果咱們看到, 在收到onNext 2這個事件後, 切斷了水管, 可是上游仍然發送了3, complete, 4這幾個事件, 並且上游並無由於發送了onComplete而中止。 同時能夠看到下游的 onSubscibe()方法是最早調用的.
Disposable的用處不止這些, 後面講解到了線程的調度以後, 咱們會發現它的重要性. 隨着後續深刻的講解, 咱們會在更多的地方發現它的身影.
此外,RxJava2.x中仍然保留了其餘簡化訂閱方法,咱們能夠根據需求,選擇相應的簡化訂閱。只不過傳入的對象改成了 Consumer。`
Disposable disposable = observable.subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { //這裏接收數據項 } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { //這裏接收onError } }, new Action() { @Override public void run() throws Exception { //這裏接收onComplete。 } });
不一樣於RxJava 1.0,RxJava 2.0中沒有了一系列的Action/Func接口,取而代之的是與Java8命名相似的函數式接口,以下圖:
其中Action相似於RxJava 1.0中的Action0,區別在於Action容許拋出異常。
public interface Action { /** * Runs the action and optionally throws a checked exception * @throws Exception if the implementation wishes to throw a checked exception */ void run() throws Exception; }
而Consumer即消費者,用於接收單個值, BigConsumer則是接收兩個值, Function用於變換對象, Predicate用於判斷。這些接口命名大多參照了Java8,熟悉Java8新特性的應該都知道意思,這裏也就再也不贅述了。
關於線程切換這點,RxJava1.x和RxJava2.x的實現思路是同樣的。這裏就簡單看下相關源碼。
同RxJava1.x同樣, subscribeOn 用於指定 subscribe() 時所發生的線程,從源碼角度能夠看出,內部線程調度是經過 ObservableSubscribeOn 來實現的。
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
ObservableSubscribeOn 的核心源碼在 subscribeActual方法中,經過代理的方式使用SubscribeOnObserver 包裝Observer後,設置 Disposable 來將 subscribe 切換到 Scheduler 線程中
@Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); //回調Disposable parent.setDisposable(scheduler.scheduleDirect(new Runnable() { //設置`Disposable` @Override public void run() { source.subscribe(parent); //使Observable的subscribe發生在Scheduler線程中 } })); }
observeOn 方法用於指定下游Observer回調發生的線程。
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { //.. //驗證安全 return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); }
主要實如今 ObservableObserveOn 中的subscribeActual, 能夠看出,不一樣於subscribeOn, 沒有將subscribe 操做所有切換到Scheduler中,而是經過ObserveOnSubscriber 與 Scheduler配合,經過schedule()達到切換下游Observer回調發生的線程,這一點與RxJava 1.0實現幾乎相同。關於ObserveOnSubscriber 的源碼這裏再也不重複描述了。
@Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnSubscriber<T>(observer, w, delayError, bufferSize)); } }
Flowable是RxJava 2.0中新增的類,專門用於應對背壓(Backpressure)問題,但這並非RxJava 2.0中新引入的概念。所謂背壓,即生產者的速度大於消費者的速度帶來的問題,好比在Android中常見的點擊事件,點擊過快則會形成點擊兩次的效果。
咱們知道,在RxJava 1.0中背壓控制是由Observable完成的,使用以下:
Observable.range(1,10000) .onBackpressureDrop() .subscribe(integer -> Log.d("JG",integer.toString()));
而在RxJava 2.0中將其獨立了出來,取名爲Flowable。所以,原先的Observable已經不具有背壓處理能力。
經過 Flowable, 咱們能夠自定義背壓處理策略。
/** * Represents the options for applying backpressure to a source sequence. */ public enum BackpressureStrategy { /** * OnNext events are written without any buffering or dropping. * Downstream has to deal with any overflow. * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators. */ MISSING, /** * Signals a MissingBackpressureException in case the downstream can't keep up. */ ERROR, /** * Buffers <em>all</em> onNext values until the downstream consumes it. */ BUFFER, /** * Drops the most recent onNext value if the downstream can't keep up. */ DROP, /** * Keeps only the latest onNext value, overwriting any previous value if the * downstream can't keep up. */ LATEST }
測試Flowable例子以下:
Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> e) throws Exception { for(int i=0;i<10000;i++){ e.onNext(i); } e.onComplete(); } }, FlowableEmitter.BackpressureStrategy.ERROR) //指定背壓處理策略,拋出異常 .subscribeOn(Schedulers.computation()) .observeOn(Schedulers.newThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d("JG", integer.toString()); Thread.sleep(1000); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d("JG",throwable.toString()); } });
或者可使用相似RxJava 1.0的方式來控制。
Flowable.range(1,10000) .onBackpressureDrop() .subscribe(integer -> Log.d("JG",integer.toString()));
其中還須要注意的一點在於,Flowable並非訂閱就開始發送數據,而是需等到執行Subscription.request()才能開始發送數據。固然,使用簡化subscribe訂閱方法會默認指定Long.MAX_VALUE。手動指定的例子以下:
Flowable.range(1,10).subscribe(new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { s.request(Long.MAX_VALUE);//設置請求數 } @Override public void onNext(Integer integer) { } @Override public void onError(Throwable t) { } @Override public void onComplete() { } });
不一樣於RxJava 1.0中的 SingleSubscriber,RxJava 2.0中的 SingleObserver多了一個回調方法 onSubscribe。
interface SingleObserver<T> { void onSubscribe(Disposable d); void onSuccess(T value); void onError(Throwable error); }
同Single,Completable也被從新設計爲Reactive-Streams架構,RxJava 1.0 的 CompletableSubscriber改成 CompletableObserver,源碼以下:
interface CompletableObserver<T> { void onSubscribe(Disposable d); void onComplete(); void onError(Throwable error); }
Processor 和 Subject 的做用是相同的。關於Subject部分,RxJava 1.0與RxJava 2.0在用法上沒有顯著區別,這裏就不介紹了。其中Processor是RxJava 2.0新增的,繼承自 Flowable, 因此支持背壓控制。而Subject則不支持背壓控制。使用以下:
//Subject AsyncSubject<String> subject = AsyncSubject.create(); subject.subscribe(o -> Log.d("JG",o));//three subject.onNext("one"); subject.onNext("two"); subject.onNext("three"); subject.onComplete(); //Processor AsyncProcessor<String> processor = AsyncProcessor.create(); processor.subscribe(o -> Log.d("JG",o)); //three processor.onNext("one"); processor.onNext("two"); processor.onNext("three"); processor.onComplete();
關於操做符,RxJava 1.0與RxJava 2.0在命名和行爲上大多數保持了一致,須要強調的是subscribeWith操做符和compose操做符。
RxJava 2.0中,subscribe 操做再也不返回 Subscription 也就是現在的 Disposable,爲了保持向後的兼容, Flowable 提供了subscribeWith方法返回當前的觀察者Subscriber對象, 而且同時提供了DefaultSubsriber, ResourceSubscriber, DisposableSubscriber接口,讓他們提供 Disposable對象, 從而能夠管理其生命週期。
RxJava 1.0用法:
private static <T> Observable.Transformer<T, T> createIOSchedulers() { return new Observable.Transformer<T, T>() { @Override public Observable<T> call(Observable<T> tObservable) { return tObservable.subscribeOn(Schedulers.io()) .unsubscribeOn(AndroidSchedulers.mainThread()) .observeOn(AndroidSchedulers.mainThread()); } }; } public static <T> Observable.Transformer<JsonResult<T>,T> applySchedulers() { return createIOSchedulers(); }
Action1<Integer> onNext = null; String[] items = { "item1", "item2", "item3" }; Subscription subscription = Observable.from(items) .compose(RxUtil.<String>applySchedulers()) .map(new Func1<String, Integer>() { @Override public Integer call(String s) { return Integer.valueOf(s); } }) .subscribe(onNext);
RxJava 2.0用法:
public static <T> ObservableTransformer<T, T> io2MainObservable() { return new ObservableTransformer<T, T>() { @Override public ObservableSource<T> apply(Observable<T> upstream) { return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); } }; } public static <T> ObservableTransformer<T, T> applySchedulers() { return io2MainObservable(); }
Consumer<Integer> onNext = null; String[] items = { "item1", "item2", "item3" }; Disposable disposable = Observable.fromArray(items) .compose(RxUtil.<String>applySchedulers()) .map(new Function<String, Integer>() { @Override public Integer apply(String s) throws Exception { return Integer.valueOf(s); } }) .subscribe(onNext);
能夠注意到,RxJava 1.0中實現的是rx.Observable.Transformer接口, 該接口繼承自Func1<Observable<T>, Observable<R>>, 而2.0繼承自io.reactivex.ObservableTansformer<Upstream, Downstream>, 是一個獨立的接口。
除此以外,RxJava 2.0還提供了 FlowableTransformer接口,用於Flowable下的compose操做符,使用以下:
public static <T> FlowableTransformer<T, T> io2MainFlowable() { return new FlowableTransformer<T, T>() { @Override public Publisher<T> apply(Flowable<T> upstream) { return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); } }; } public static <T> FlowableTransformer<T, T> applySchedulers() { return io2MainFlowable(); }
Consumer<Integer> onNext = null; Disposable disposable = Flowable.create(new FlowableOnSubscribe<Integer>() { @Override public void subscribe(FlowableEmitter<Integer> e) throws Exception { for(int i=0;i<10000;i++){ e.onNext(i); } e.onComplete(); } }, FlowableEmitter.BackpressureStrategy.ERROR) //指定背壓處理策略,拋出異常 .compose(RxUtil.<String>applySchedulers()) .subscribe(onNext);