rxjava介紹

Observable

在RxJava1.x中,最熟悉的莫過於Observable這個類了,筆者剛使用RxJava2.x時,建立一個Observable後,頓時是懵逼的。由於咱們熟悉的Subscriber竟然沒影了,取而代之的是ObservableEmitter,俗稱發射器。此外,因爲沒有了Subscriber的蹤跡,咱們建立觀察者時需使用Observer。而Observer也不是咱們熟悉的那個Observer,其回調的Disposable參數更是讓人摸不到頭腦。安全

廢話很少說,從會用開始,還記得使用RxJava的三部曲嗎? 
第一步:初始化一個Observable架構

1        Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() {
2  
3             @Override
4             public void subscribe(ObservableEmitter<Integer> e) throws Exception {
5                 e.onNext(1);
6                 e.onNext(2);
7                 e.onComplete();
8             }
9         });

 

第二步:初始化一個Observeride

Observer<Integer> observer= new Observer<Integer>() {
 
            @Override
            public void onSubscribe(Disposable d) {
 
            }
 
            @Override
            public void onNext(Integer value) {
 
 
            }
 
            @Override
            public void onError(Throwable e) {
 
            }
 
            @Override
            public void onComplete() {
            }
        }

 

第三部:創建訂閱關係函數

observable.subscribe(observer); //創建訂閱關係

不難看出,與RxJava1.x仍是存在着一些區別的。首先,建立Observable時,回調的是ObservableEmitter,字面意思即發射器,用於發射數據(onNext)和通知(onError/onComplete)。其次,建立的Observer中多了一個回調方法onSubscribe,傳遞參數爲Disposable ,Disposable至關於RxJava1.x中的Subscription,用於解除訂閱。你可能納悶爲何不像RxJava1.x中訂閱時返回Disposable,而是選擇回調出來呢。官方說是爲了設計成Reactive-Streams架構。不過仔細想一想這麼一個場景仍是頗有用的,假設Observer須要在接收到異常數據項時解除訂閱,在RxJava2.x中則很是簡便,以下操做便可。測試

  Observer<Integer> observer = new Observer<Integer>() {
            private Disposable disposable;
 
            @Override
            public void onSubscribe(Disposable d) {
                disposable = d;
            }
 
            @Override
            public void onNext(Integer value) {
                Log.d("JG", value.toString());
                if (value > 3) {   // >3 時爲異常數據,解除訂閱
                    disposable.dispose();
                }
            }
 
            @Override
            public void onError(Throwable e) {
 
            }
 
            @Override
            public void onComplete() {
 
            }
        }

 

此外,RxJava2.x中仍然保留了其餘簡化訂閱方法,咱們能夠根據需求,選擇相應的簡化訂閱。只不過傳入的對象改成了Consumer。ui

Disposable disposable = observable.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                  //這裏接收數據項
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
              //這裏接收onError
            }
        }, new Action() {
            @Override
            public void run() throws Exception {
              //這裏接收onComplete。
            }
        });

 

不一樣於RxJava1.x,RxJava2.x中沒有了一系列的Action/Func接口,取而代之的是與Java8命名相似的函數式接口,以下圖: this

image_1arse84b03rfoo89mr1m53151a9.png

其中Action相似於RxJava1.x中的Action0,區別在於Action容許拋出異常。spa

public interface Action {
    /**
     * Runs the action and optionally throws a checked exception
     * @throws Exception if the implementation wishes to throw a checked exception
     */
    void run() throws Exception;
}

 

而Consumer即消費者,用於接收單個值,BiConsumer則是接收兩個值,Function用於變換對象,Predicate用於判斷。這些接口命名大多參照了Java8,熟悉Java8新特性的應該都知道意思,這裏也就再也不贅述了。.net

線程調度

關於線程切換這點,RxJava1.x和RxJava2.x的實現思路是同樣的。這裏就簡單看下相關源碼。線程

subscribeOn

同RxJava1.x同樣,subscribeOn用於指定subscribe()時所發生的線程,從源碼角度能夠看出,內部線程調度是經過ObservableSubscribeOn來實現的。

 public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

 

ObservableSubscribeOn的核心源碼在subscribeActual方法中,經過代理的方式使用SubscribeOnObserver包裝Observer後,設置Disposable來將subscribe切換到Scheduler線程中

@Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
 
        s.onSubscribe(parent); //回調Disposable
 
        parent.setDisposable(scheduler.scheduleDirect(new Runnable() { //設置`Disposable`
            @Override
            public void run() {
                source.subscribe(parent); //使Observable的subscribe發生在Scheduler線程中
            }
        }));
    }

 

observeOn

observeOn方法用於指定下游Observer回調發生的線程。

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
         //..
         //驗證安全
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

 

主要實如今ObservableObserveOn中的subscribeActual,能夠看出,不一樣於subscribeOn,沒有將suncribe操做所有切換到Scheduler中,而是經過ObserveOnSubscriber與Scheduler配合,經過schedule()達到切換下游Observer回調發生的線程,這一點與RxJava1.x實現幾乎相同。關於ObserveOnSubscriber的源碼這裏再也不重複描述了,有興趣的能夠查看本人RxJava源碼解讀這篇文章

 @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
 
            source.subscribe(new ObserveOnSubscriber<T>(observer, w, delayError, bufferSize));
        }
    }

 

Flowable

Flowable是RxJava2.x中新增的類,專門用於應對背壓(Backpressure)問題,但這並非RxJava2.x中新引入的概念。所謂背壓,即生產者的速度大於消費者的速度帶來的問題,好比在Android中常見的點擊事件,點擊過快則會形成點擊兩次的效果。 
咱們知道,在RxJava1.x中背壓控制是由Observable完成的,使用以下:

Observable.range(1,10000)
            .onBackpressureDrop()
            .subscribe(integer -> Log.d("JG",integer.toString()));

 

而在RxJava2.x中將其獨立了出來,取名爲Flowable。所以,原先的Observable已經不具有背壓處理能力。 

經過Flowable咱們能夠自定義背壓處理策略。 

image_1arsktnsv1uk810abcki15ljp91m.png

測試Flowable例子以下:

Flowable.create(new FlowableOnSubscribe<Integer>() {
 
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
 
                for(int i=0;i<10000;i++){
                    e.onNext(i);
                }
                e.onComplete();
            }
        }, FlowableEmitter.BackpressureMode.ERROR) //指定背壓處理策略,拋出異常
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.newThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d("JG", integer.toString());
                        Thread.sleep(1000);
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.d("JG",throwable.toString());
                    }
                });

其中還須要注意的一點在於,Flowable並非訂閱就開始發送數據,而是需等到執行Subscription#request才能開始發送數據。固然,使用簡化subscribe訂閱方法會默認指定Long.MAX_VALUE。手動指定的例子以下:

Flowable.range(1,10).subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(Long.MAX_VALUE);//設置請求數
            }
 
            @Override
            public void onNext(Integer integer) {
 
            }
 
            @Override
            public void onError(Throwable t) {
 
            }
 
            @Override
            public void onComplete() {
 
            }
        });

 

Single

不一樣於RxJava1.x中的SingleSubscriber,RxJava2中的SingleObserver多了一個回調方法onSubscribe。

interface SingleObserver<T> {
    void onSubscribe(Disposable d);
    void onSuccess(T value);
    void onError(Throwable error);
}

 

Completable

同Single,Completable也被從新設計爲Reactive-Streams架構,RxJava1.x的CompletableSubscriber改成CompletableObserver,源碼以下:

interface CompletableObserver<T> {
    void onSubscribe(Disposable d);
    void onComplete();
    void onError(Throwable error);
}

 

Subject/Processor

Processor和Subject的做用是相同的。關於Subject部分,RxJava1.x與RxJava2.x在用法上沒有顯著區別,這裏就不介紹了。其中Processor是RxJava2.x新增的,繼承自Flowable,因此支持背壓控制。而Subject則不支持背壓控制。使用以下:

    //Subject
        AsyncSubject<String> subject = AsyncSubject.create();
        subject.subscribe(o -> Log.d("JG",o));//three
        subject.onNext("one");
        subject.onNext("two");
        subject.onNext("three");
        subject.onComplete();
 
       //Processor
        AsyncProcessor<String> processor = AsyncProcessor.create();
        processor.subscribe(o -> Log.d("JG",o)); //three
        processor.onNext("one");
        processor.onNext("two");
        processor.onNext("three");
        processor.onComplete();

 

關於操做符,RxJava1.x與RxJava2.x在命名和行爲上大多數保持了一致,部分操做符請查閱文檔。

相關文章
相關標籤/搜索