Rxjava入門指南(一):響應式編程與Rxjava入門

響應式編程介紹

先來介紹一下響應式編程:響應式編程是一種,基於事件驅動的方式,處理異步數據(事件)流的編程範式。java

實際上就是 觀察者模式+數據流 + 事件控制數據庫

 

1.什麼算是數據流?編程

舉個例子,在界面中點擊登陸按鈕發出登陸請求,這個事件就是一個數據流。網絡

2.什麼算是基於事件驅動?框架

打個比方,發出登陸請求至關於擰開水龍頭(水龍頭髮生變化),而執行登陸請求就至關於接收水的水池(水池發生變化),若是水龍頭有水流出時,水池能夠接收水管中的水。異步

 

 

水池能夠根據水龍頭髮出的請求而產生響應,即從對事件的發生和對事件的響應這個角度來編程。ide

3.怎樣算是響應?異步編程

舉個維基百科中的例子:若是咱們聲明變量a = b + c,那麼當b的值改變後,a的值是不會發生改變的,而響應式編程則是當b發生改變後,可讓a響應從而發生變化。spa

和觀察者模式同樣,當一個對象改變狀態,會通知其餘訂閱者。線程

4.那爲何是異步的?由於擰開水龍頭和水池接收水不是線性操做:

假若有水龍頭A和B,水池正在接收水龍頭A中的水,當水龍頭B中的水流向水池時,不會阻塞水池接收水龍頭A中的水,二者是不相關的。

一樣,假如咱們有水池A和水池B,當水龍頭的水流向水池A的同時,不會阻塞水龍頭的水流向水池B

因此說,事件的發出(擰開水龍頭)和事件的接收(水池接收水)這兩個動做是異步的,互不相擾的,不會像同步操做那樣產生阻塞。

 

Rxjava介紹(rxjava2)

rx全稱爲Reactive Extension,即響應式、可擴展,是一套基於事件的異步編程的API。

在Rxjava中,事件的發出對應:Observable(可觀測的物),事件的接收對應:Observer(觀察者),這個過程的鏈接對應着subscribe()方法(訂閱)。

咱們來經過代碼描述這個過程:

建立Observable對象(水龍頭)

        //建立可觀測的物Observable:水龍頭
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) {
                //ObservableEmitter 事件發射器,能夠發出的事件類型有:onNext、onError、onComplete
                emitter.onNext("水流1"); //發出事件
                emitter.onNext("水流2");
                emitter.onNext("水流3");
                emitter.onComplete(); //與onError互斥,只能發出二者中的一個
                emitter.onNext("水流4"); // 當發出onComplete或onError後,雖然能夠繼續發出事件,但不會再被接收到
            }
        });

 

建立Observer對象(水池)

        //建立觀察者Observer:水池
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable disposable) {
          //這裏獲取到了Disposable對象,下面會介紹該對象 Log.d(TAG,
"subscribe:創建鏈接"); } @Override public void onNext(String water) { Log.d(TAG,"接收水流:" + water); } @Override public void onError(Throwable error) { Log.d(TAG,"報錯啦"); } @Override public void onComplete() { Log.d(TAG,"通知結束啦"); } };

 

創建訂閱關係(鏈接)

        //創建鏈接:水管
        observable.subscribe(observer);  

 

輸出結果以下:

subscribe:創建鏈接
接收水流:水流1
接收水流:水流2
接收水流:水流3
通知結束啦

能夠看到在創建鏈接後,依次接收到了水流一、二、3,而水流4沒有被接收到。 

咱們能夠把new Observer到Observable的subscirbe方法裏,把整個過程連起來:

//建立可觀測的物Observable:水龍頭
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) {
                //ObservableEmitter 事件發射器,能夠發出的事件類型有:onNext、onError、onComplete
                emitter.onNext("水流1"); //發出事件
                emitter.onNext("水流2");
                emitter.onNext("水流3");
                emitter.onComplete(); //與onError互斥,只能發出二者中的一個
                emitter.onNext("水流4"); // 當發出onComplete或onError後,會用Disposable對象的dispose()方法
// 雖然水龍頭能夠繼續發出事件,但水池不會再被接收到
} }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable disposable) { //這裏能夠獲取到Disposable對象 Log.d(TAG,"subscribe:創建鏈接"); } @Override public void onNext(String water) { Log.d(TAG,"接收水流:" + water); } @Override public void onError(Throwable error) { Log.d(TAG,"報錯啦"); } @Override public void onComplete() { Log.d(TAG,"通知結束啦"); } });

Observable的subscirbe方法還提供了其餘幾種重載形式:

public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}

咱們能夠傳入Consumer對象,讓觀察者處理本身關心的事件類型

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) {
                //ObservableEmitter 事件發射器,能夠發出的事件類型有:onNext、onError、onComplete
                emitter.onNext("水流1"); //發出事件
                emitter.onNext("水流2");
                emitter.onNext("水流3");
                emitter.onComplete(); //與onError互斥,只能發出二者中的一個
                emitter.onNext("水流4"); // 當發出onComplete或onError後,雖然能夠繼續發出事件,但不會再被接收到
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String str) {
                Log.d(TAG, "onNext: " + str);
            }
        });

如今這個方法的返回結果再也不是void而是返回了Disposable對象。

以前咱們在Observer的onSubscribe中就提到了該對象,在onSubscribe方法中能夠獲取到Disposable對象,因此不須要返回對象。

Disposable對象

  • dispose():解除訂閱
  • isDisposed():查詢是否解除了訂閱

通常在activity或者fragment銷燬時,咱們能夠調用dispose方法來取消訂閱,釋放數據流中剩餘的全部內部資源,防止內存泄漏。

Rxjava提供了CompositeDisposable容器管理多個Disposable對象,咱們能夠在獲取到Disposable對象後把它添加到容器中,而後在onDestroy方法中調用compositeDisposable.dispose()清除全部訂閱:

//添加Disposable對象
    protected void addDisposable(Disposable disposable) {
        if (null == compositeDisposable || compositeDisposable.isDisposed()) {
            compositeDisposable = new CompositeDisposable();
        }
        compositeDisposable.add(disposable);
    }

    //清除全部訂閱
    protected void clearDisposable() {
        compositeDisposable.clear();
    }

異步

咱們簡單的實現了事件的發出和響應,可是這個過程還發生在同一個線程內,並非異步的。

rxjava提供了方便的線程調度方法用於切換線程:

//事件的發出在新線程newThread()
observable.subscribeOn(Schedulers.newThread())
       //事件的響應在主線程 .observeOn(AndroidSchedulers.mainThread())
       //創建鏈接 .subscribe(consumer對象);

以前的例子能夠就成了這樣:

//建立可觀測的物Observable:水龍頭
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) {
                //ObservableEmitter 事件發射器,能夠發出的事件類型有:onNext、onError、onComplete
                emitter.onNext("水流1"); //發出事件
                emitter.onNext("水流2");
                emitter.onNext("水流3");
                emitter.onComplete(); //與onError互斥,只能發出二者中的一個
                emitter.onNext("水流4"); // 當發出onComplete或onError後,雖然能夠繼續發出事件,但不會再被接收到
            }
        }).subscribeOn(Schedulers.newThread())
                //事件的響應在主線程                                              
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() { //創建鏈接
                    @Override
                    public void accept(String str) {
                        Log.d(TAG, "onNext: " + str);
                    }
                });

這樣就實現了異步,事件發出時所在的線程爲RxNewThreadScheduler-1,而接收事件所在的線程爲main。

RxJava內置的線程包括(這些線程之間的具體區別暫且不提,主要是討論如何使用):

  • AndroidSchedulers.mainThread() 主線程
  • Schedulers.io() IO線程,線程名RxCachedThreadScheduler-1
  • Schedulers.newThread() 新線程
  • Schedulers.computation() 須要大量計算的線程

之後咱們就能夠在IO線程中執行讀寫文件、讀寫數據庫、網絡操做等操做,在computation線程中執行計算操做。

如今咱們已經經過rxjava實現了簡單的異步觀察者模式,模擬出了事件響應。

應用舉例:

 Retrofit網絡請求框架提供了對Rxjava的支持,在IO線程中進行網絡請求,在主線程中去處理結果。

 

Disposable disposable = retrofit.create(ILoginApi.class).login(loginDTO)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<ResultVO<LoginVO>>() {
                               @Override
                               public void accept(ResultVO<LoginVO> loginResultVO) throws Exception {
                                   //成功獲取
                               }
                           },
                        new Consumer<Throwable>() {
                            @Override
                            public void accept(Throwable throwable) throws Exception {
                                //打印異常信息
                            }
                        });

 

參考:通俗易懂的rxjava2教程:https://www.jianshu.com/p/464fa025229e

若有紕漏,請大佬及時批評指出

相關文章
相關標籤/搜索