不忘初心 砥礪前行, Tomorrow Is Another Day !java
本文概要:bash
對應源碼app
//Observable.java
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//參數檢查
ObjectHelper.requireNonNull(source, "source is null");
//裝配Observable,返回一個ObservableCreate對象
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
//鉤子方法
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
@NonNull
static <T, R> R apply(@NonNull Function<T, R> f, @NonNull T t) {
try {
return f.apply(t);
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
複製代碼
當咱們經過Create方法建立一個Observable對象時,框架
當咱們建立完Observable對象時,會調用subscribe去綁定觀察者Observer.因此直接看該方法.ide
對應源碼學習
//Observable.java
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);//此方法是重中之重.
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
複製代碼
一樣的方式,對咱們傳入的Observer觀察者對象進行檢查以及預處理,最終調用subscribeActual方法,該方法是一個抽象方法.因此咱們直接看它的實現類,也就是ObservableCreate對象下的subscribeActual方法.ui
對應源碼this
//ObservableCreate.java
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//1. 建立一個發射器對象
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//2. 回調onSubscribe,通知訂閱成功
observer.onSubscribe(parent);
try {
//3. 回調subscribe,開始發送事件.
//source對象就是建立被觀察者傳入的.
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
複製代碼
上面註釋已經寫得很詳細了這裏再重複一次.整個訂閱過程就是.spa
接着咱們來看發射器對象CreateEmitter是如何將事件發送給訂閱者的.線程
對應源碼
//ObservableCreate#CreateEmitter.java
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
//回調onNext
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
//回調onError
observer.onError(t);
} finally {
//最後斷開訂閱
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
//回調onComplete.
observer.onComplete();
} finally {
//最後斷開訂閱
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
//DisposableHelper.java
//斷開訂閱方法
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
複製代碼
從上面註釋可知,發射器CreateEmitter直接回調了觀察者Observer的相關方法.當調用dispose斷開dispose訂閱時,此時和線程中斷處理同樣,僅僅只是做爲一個標識,標識當前發射器已經被中斷.
這裏最後給出一張關係圖對上面流程進行概括.
從上面可知,RxJava的總體流程框架仍是挺清晰的,但有時咱們須要多它進行一些附加的操做如切線程,map,filter進行轉換等.這裏以切線程爲例進行分析看如何工做的.
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//新建一個ObservableSubscribeOn.
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
複製代碼
當咱們切換下游線程時,也返回了一個新建的Observable-ObservableSubscribeOn.接着咱們直接看ObservableSubscribeOn類.
對應源碼
//ObservableSubscribeOn.java
//中間的Observable.
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);//上游的源Observable
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
//建立一箇中間的Observer
//建立一箇中間的Observer
//建立一箇中間的Observer
//重要的話說三遍.
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//回調onSubscribe,通知訂閱成功
s.onSubscribe(parent);
//切換線程
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
//線程任務類SubscribeTask
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//回調subscribe,進行訂閱
//source是源Observable,parent則是中間Observer,不是最終目標的Observer.這裏必定要清楚.
source.subscribe(parent);
}
}
複製代碼
從上面註釋可知,當咱們調用subscribeOn切換上游線程時
這也就是解釋了最開始文章所說的subscribeOn不管調用幾回,爲何只有第一次是生效的.由於每次都建立新的Observable與Observer,線程調度器裏將源Observable與中間的Observer進行綁定訂閱時,源Observable僅僅是指上一個,已經不是第一個建立出來的.
經過上面流程分析能夠總結出RxJava操做符一些通用的流程.對於Map等操做符均可以參考.如圖
最後咱們根據上面總結出的通用流程,去分析下切換下游線程的過程.
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//返回一個新建的Observable
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
複製代碼
一樣當調用observeOn過程切換下游線程時,果不其然也返回了一箇中間的Observable-ObservableObserveOn.接着看ObservableObserveOn代碼.
對應源碼
//中間的Observable
//ObservableObserveOn.java
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//線程調度器
Scheduler.Worker w = scheduler.createWorker();
//回調subscribe進行訂閱(中間的Observer-ObserveOnObserver)
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
複製代碼
一樣,果不其然
以前咱們知道每次執行observeOn都會切換一次下游線程,從上面源碼可知,每次都會新建一箇中間的Observer綁定新指定的線程調度器,因此接收事件都是在新的線程中執行啦.
至此,RxJava基本原理就差很少分析完成,最重要的是記住兩張流程圖,都遵循這個規律.
因爲本人技術有限,若有錯誤的地方,麻煩你們給我提出來,本人不勝感激,你們一塊兒學習進步.