RxJava是Reactive Extensions的Java VM實現:一個庫,用於經過使用可觀察序列來編寫異步和基於事件的程序。它擴展了觀察者模式,以支持數據/事件序列,並添加了容許您以聲明方式組合序列的運算符,同時抽象出對低級線程,同步,線程安全和併發數據結構等問題的關注。編程
簡單理解就是,ReactiveX用更好的方式爲異步編程提供一套API接口。爲何說更好的方式呢?這裏就得說說現有的Android實現異步的方式AsyncTask和Handler了。這兩種方式均可以處理異步操做,而不阻塞線程。可是它們抽象不夠,實現比較複雜,並且難以處理比較複雜的業務邏輯。好比頁面有兩個請求,當兩個請求都成功的時候才能展現頁面。像這種須要處理多個異步線程的時候,AsyncTask和Handler就比較難以實現。這時RxJava就登場了,它能夠很好的解決線程切換,處理多個線程的關係,同時可使業務邏輯扁平化,邏輯更清晰。安全
RxJava是基於觀察者模式的,因此有Observable和Observer,經過subscribe來實現訂閱。bash
public abstract class Observable<T> implements ObservableSource<T> {
void subscribe(@NonNull Observer<? super T> observer);
}複製代碼
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();}複製代碼
Observable經過subscribe拿到Observer的引用,當Observable有數據的時候通知Observer,數據從Observable(被觀察者)流向Observer(觀察者)數據結構
簡單的代碼實現併發
//1.建立Observable
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
})
//2.訂閱觀察者
.subscribe(
//3.建立觀察者
new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.i(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.i(TAG, "onNext:" + integer);
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError:" + e.getMessage());
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}});複製代碼
源碼分析流程:異步
步驟1. Observable.create()建立了一個被觀察者,經過subscribe方法訂閱了一個觀察者ide
Observable.create():
異步編程
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}複製代碼
返回的是ObservableCreate這個對象就是返回的Observable對象,走到這一步只是建立了Observable,並把建立的ObservableOnSubscribe對象傳遞進來。類中subscribeActual()這個方法和CreateEmitter這個類如今尚未調用,後面分析。源碼分析
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
//在Observable調用subscribe(Observer)方法時候調用,這裏先不看 @Override
protected void subscribeActual(Observer<? super T> observer) {
}
}複製代碼
步驟2.如今建立了ObservableCreate這個Observable,而後它調用了subscribe()方法訂閱觀察者ui
public final void subscribe(Observer<? super T> observer) {
//檢查是否爲空
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//用來調試的,,真實環境返回的仍是observer,忽略此步驟
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
}
}複製代碼
能夠看到這裏就是調用了subscribeActual(observer);這個抽象方法。其實調用的是步驟1中建立的ObservableCreate這個對象的subscribeActual()方法,並把第3步建立的Observer傳入
如今後頭看看步驟1中建立的ObservableCreate這個類subscribeActual()
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//這是一個封裝了Observer的類,是步驟1中onSubscribe方法中傳入的參數
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//這裏調用了步驟3中的onSubscribe回調。
observer.onSubscribe(parent);
try {
//這裏調用了步驟1中建立的ObservableOnSubscribe的subscribe這個方法。
//方法中調用的emitter.onNext(1);實際上是這裏的parent這個類的onNext()方法
//最終調用了步驟3中建立的Observer的onNext方法,這樣就走完了流程
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
return;
}
if (!isDisposed()) {
//調用真正的步驟3中建立的observer
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
}複製代碼
這裏有點繞,其實ObservableCreate是一個被觀察者,有個內部類CreateEmitter,內部類持有觀察者,被觀察者要發送數據就是經過這個內部類調用onNext(T t),繼而調用內部的觀察者。