Android的三方庫 - RxJava:RxJava的使用和基本訂閱流程

一:什麼是RxJava?

GitHub關於RxJava的介紹:編程

a library for composing asynchronous and event-based programs by using observable sequences安全

他的意思就是 一個經過可觀測的序列來組成異步和基於事件的庫。bash

RxJava的出現消除同步問題、線程安全等問題app

總的來講就是方便咱們異步編程。異步

二:RxJava的優勢和缺點

優勢

異步async

鏈式調用結構ide

使用複雜的異步調用方式的時候依舊能夠保持簡潔異步編程

缺點

學習成本比較高,入門的門檻比較高學習

難以理解的API,須要查看源碼才能理解API的具體效果ui

三:RxJava的基礎使用

首先明白他的基礎使用步驟:

  1. 建立被觀察者(Observable)
  2. 建立觀察者(Observer)
  3. 訂閱(subscribe)

1.建立被觀察者

正常建立被觀察者:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("ONE");
                emitter.onNext("TWO");
                emitter.onNext("THREE");
                emitter.onComplete();
            }
        });
複製代碼

在這裏面一共產生了四個事件:One、Two、Three、結束。

PS:

非正常建立第一彈:

Observable observable = Observable.just("ONE","TWO","THREE");

非正常建立第二彈:

String[] values = {"ONE", "TWO", "THREE"};
Observable observable = Observable.fromArray(values);
複製代碼

其實這樣的非正常建立是內部將這些信息包裝成onNext()這樣的事件發送給觀察者。

2.建立觀察者

正常建立:

Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i("z", "onSubscribe: ");
            }

            @Override
            public void onNext(String s) {
                Log.i("z", "onNext: s = " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.i("z", "onError: ");
            }

            @Override
            public void onComplete() {
                Log.i("z", "onComplete: ");
            }
        };
複製代碼

非正常建立:

Consumer<String> observer = new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.i("z", "accept: s = " + s);
            }
        };
        
複製代碼

3.訂閱

observable.subscribe(observer);

你已經注意到不同的地方,爲何被觀察者訂閱了觀察者?

之因此會這樣,是由於RxJava爲了保持鏈式調用的流暢性。

4. 異步調用

RxJava既然是異步庫,固然對於異步的處理會更好

在咱們看RxJava的異步調用以前,咱們先來學習下其中比較重要的兩個點

  • subscribeOn()
  • observeOn()
subscribeOn

這個表示Observable在一個指定的環境下建立,只能使用一次,屢次建立的話會以第一次爲準。

observeOn

表示 事件傳遞和 最終處理髮生在哪一個環境下,能夠屢次調用,每次指定以後,下一步就生效。

好比:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("ONE");
                emitter.onNext("TWO");
                emitter.onNext("THREE");
                emitter.onComplete();
            }
        }) // 被觀察者在一個新的線程中建立
        .subscribeOn(Schedulers.newThread()) 
                // 下面這個操做是在io線程中
                .observeOn(Schedulers.io())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        return s.toLowerCase();
                    }
                })
                  // 切換,觀察者是在主線程中
                .observeOn(AndroidSchedulers.mainThread()) 
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        
                    }
                });

複製代碼

四:RxJava的基礎訂閱流程

先看一下基礎的調用方式:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.i(TAG, "subscribe: ");
                emitter.onNext("ONE");
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.i(TAG, "onSubscribe: ");
            }


            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext: s = " + s);
            }


            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError: e = " + e.getMessage());
            }


            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete: ");
            }
        });
    }
});
複製代碼

結果:

onSubscribe:
subscribe:
onNext: s = ONE
複製代碼

咱們先從訂閱開始看,也就是subscribe方法

public final void subscribe(Observer<? super T> observer) {
        ... // 忽略部分源碼
        subscribeActual(observer);
        ... // 忽略部分源碼
}
複製代碼

直接找到主要的方法subscribeActual(observer),這個是抽象的方法,會被實如今子類中。

因此咱們接着看看Observable的子類實現:

咱們進入到create方法中:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
複製代碼

其實 返回的就是 ObservableCreate 的對象,

須要注意的是: ObservableCreateObservable的一個子類 ObservableCreate 被建立都會傳入一個source的字段,這個source就是 ObservableOnSubscribe

ObservableCreate 具體實現了 subscribeActual 方法

@Override
protected void subscribeActual(Observer<? super T> observer) {
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    // 在這裏觸發 observer#onSubscribe()
    observer.onSubscribe(parent);

    try {
        // 在這裏回調
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}
複製代碼

在這裏方法能夠看到 觀察者observeronSubscribe 會先於回調發生。

而後調用 ObservableOnSubscribe 的方法 subscribe

具體的事件後由開發者去作, 能夠看到在案例中調用了 CreateEmitter,能夠進入到 CreateEmitter 看看onNext() 的實現

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}
複製代碼

能夠看到 在CreateEmitteronNext() 中調用了 觀察者observeronNext() 方法.

而後能夠看到案例中的調用:

@Override
public void onNext(String s) {
   Log.i(TAG, "onNext: s = " + s);
 }
複製代碼
相關文章
相關標籤/搜索