本文適合使用過Rxjava2或者瞭解Rxjava2的基本用法的同窗閱讀php
Rxjava在GitHub 主頁上的自我介紹是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一個在 Java VM 上使用可觀測的序列來組成異步的、基於事件的程序的庫)。java
通俗來講,Rxjava是一個採用了觀察者模式設計處理異步的框架。鏈式調用設計讓代碼優雅易讀。
舉個例子:app
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("a");
}
});
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
複製代碼
這是Rxjava2最簡單的用法:
1.建立一個Observable,重寫subscribe方法,這裏主要處理被觀察的事件。
2.訂閱這個Observable,事件會回調observer的方法,咱們能夠對事件作響應的處理框架
2.1. 建立Observable:
建立Observable用的是Observable.create(ObservableOnSubscribe<T> source)方法。這個方法的參數是ObservableOnSubscribe:異步
public interface ObservableOnSubscribe<T> {
/** * Called for each Observer that subscribes. * @param e the safe emitter instance, never null * @throws Exception on error */
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}
複製代碼
ObservableOnSubscribe是一個接口,惟一的方法是subscribe,參數是ObservableEmitter<T> e。ObservableEmitter是一個繼承了Emitter的接口,接口Emitter裏定義了onNext、onError、onComplete等方法,和Observer(觀察者)的方法相對應。async
public interface Emitter<T> {
/** * Signal a normal value. * @param value the value to signal, not null */
void onNext(@NonNull T value);
/** * Signal a Throwable exception. * @param error the Throwable to signal, not null */
void onError(@NonNull Throwable error);
/** * Signal a completion. */
void onComplete();
}
複製代碼
ObservableEmitter對接口Emitter進行擴展,增長了setDisposable、setCancellable等方法
基本參數瞭解了,如今看看create方法裏面作了什麼,代碼以下:ide
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
複製代碼
調用了RxJavaPlugins的onAssembly方法。又有一個新參數ObservableCreate<T>(source),咱們看看它是什麼:函數
final class ObservableCreate<T> extends Observable<T> {
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
}
複製代碼
繼承了Observable,因此也是個被觀察對象,在構造函數中咱們看到咱們new的ObservableOnSubscribe對象,被存在了ObservableCreate的source裏面
那咱們繼續看看onAssembly方法作什麼:this
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
複製代碼
一個Hook方法。onObservableAssembly是一個靜態變量,咱們沒有設置,默認爲空,因此直接返回source對象。也就是說,Observable的create方法其實就是把咱們ObservableOnSubscribe對象,存儲在ObservableCreate對象的source裏面,而後返回ObservableCreate對象。
咱們知道ObservableCreate是繼承Observable的,因此建立了ObservableCreate對象,咱們的Observable也就建立完了。spa
2.2 訂閱事件(被觀察者)
訂閱被觀察者的操做是observable.subscribe(new Observer<String>())。這個操做符實際上是個「被動」,就是事件被觀察者觀察。由於subscribe方法裏的參數Observer纔是觀察者。咱們也會在Observer裏的各個會調方法裏接收到事件相關的返回值。
咱們看看subscribe方法的源碼:
public final void subscribe(Observer<? super T> observer) {
try {
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
RxJavaPlugins.onError(e);
}
}
複製代碼
看代碼咱們知道最主要調用的方法是:subscribeActual(observer);,這個方法是Observable裏的抽象方法,而此時咱們的Observable是一個ObservableCreate對象(前面create方法返回的對象)。因此咱們去看一下ObservableCreate裏面是如何重寫這個方法的。代碼以下:
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
複製代碼
咱們一看到這個方法主要作了三件事:
①建立一個CreateEmitter對象parent。
②把parent傳給source的subscribe方法。上面咱們知道source就是剛纔存的ObservableOnSubscribe對象,subscribe也就是咱們重寫的方法:
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("a");
}
複製代碼
因此咱們在這個方法裏就能收到一個CreateEmmiter,經過CreateEmitter能夠回調相應的方法。CreateEmitter是實現ObservableEmitter接口,咱們看看它內部實現,如:onNext源碼以下:
@Override
public void onNext(T t) {
observer.onNext(t);
}
複製代碼
也就是說,當咱們在ObservableOnSubscribe的subscribe方法裏調用ObservableEmitter的onNext方法的時候,它裏面會調用observer的onNext。因而經過這樣的傳遞,咱們就能在observer裏響應的回調方法裏收到事件的相關狀態。
至此一個簡單Rxjava流式傳遞原理已經講完了,總結流程以下:
喜歡個人文章的話能夠點個贊和關注的,後面有更多系列性文章更新