咱們看下RxJava最簡單的寫法java
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("1"); emitter.onComplete(); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); } @Override public void onNext(String s) { System.out.println(s); } @Override public void onError(Throwable e) { System.out.println("onError"+e.getLocalizedMessage()); } @Override public void onComplete() { System.out.println("onComplete"); } })
很簡單的3個步驟:segmentfault
一個個來看微信
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); //建立了一個ObservableCreate類,裏面包裝了咱們傳入的source參數 return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; }
這裏很簡單,只是經過new方法生成了一個簡單的Observer對象。ide
訂閱是經過subscribe方法來執行的,咱們來跟蹤一下,這個方法是屬於Observable類的函數
public final void subscribe(Observer<? super T> observer) { //校驗觀察者不爲空 ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); //調用subscribeActual方法,而後入參是observer(被觀察者)。這個方法是抽象方法,具體的實現是交給子類的 subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } } /** * Operator implementations (both source and intermediate) should implement this method that * performs the necessary business logic. * <p>There is no need to call any of the plugin hooks on the current Observable instance or * the Subscriber. * @param observer the incoming Observer, never null */ protected abstract void subscribeActual(Observer<? super T> observer);
最終經過 subscribeActual(observer) 來實現功能,而這個方法是有具體的子類去實現的。從第一步中咱們經過Observable.create()來生成的被觀察者。裏面最終的生成的是 ObservableCreate 這個類。也就是說,這個subscribeActual(observer) 方法是由 ObservableCreate 這個類去實現的,咱們去裏面找一下。oop
@Override protected void subscribeActual(Observer<? super T> observer) { //這裏將咱們傳入的被觀察者進行了一層封裝,裏面實現了ObservableEmitter<T>, Disposable等接口->裝飾者模式 CreateEmitter<T> parent = new CreateEmitter<T>(observer); //調用被觀察者的onSubscribe方法(這裏很神奇,調起者是observer,而不是被訂閱者,是爲了兼容Rxajva1麼?) observer.onSubscribe(parent); try { //這裏的source就是咱們本身寫的那個ObservableOnSubscribe了,調用了裏面的subscriber方法,而後參數是封裝後的觀察者。 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } Observable.create(new ObservableOnSubscribe<String>() { //看到了哈,實際是執行的這個方法,這裏面的emitter是咱們封裝以後的CreateEmitter,那麼這裏面的onNext(),onComplete()又是誰呢? @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("1"); emitter.onComplete(); } })
咱們如今回到咱們封裝生成的 CreateEmitter 這個類測試
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { private static final long serialVersionUID = -3434801548987643227L; final Observer<? super T> observer; //定義的觀察者 CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } //調用的是觀察者的onNext()方法 if (!isDisposed()) { observer.onNext(t); } } @Override public void onComplete() { if (!isDisposed()) { try { //調用的是觀察者的onComplete()方法 observer.onComplete(); } finally { //執行完onComplete()方法後要取消訂閱 dispose(); } } } ..... }
到這裏爲知,最簡單的一個流程基本已經走通了。。ui
RxJava中咱們使用的最多的應該就是進行線程切換了吧?經過 observeOn() 方法來進行線程的隨意切換,舒舒服服,不再用進行噁心的線程處理了。this
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("1"); emitter.onComplete(); } }).observeOn(Schedulers.io())
observeOn() 方法是屬於Observable這個類的。咱們跟蹤進去這個方法去看看。spa
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { //進行空校驗 ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); }
這裏建立了一個 ObservableObserveOn 對象,因此和以前基礎裏面將的同樣,當調用 subscribe() 方法的時候,會先調用觀察者的 onSubscribe() 方法,而後經過subscribe的層層處理,調用這個被觀察者裏面的 subscribeActual() 方法。
@Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) {//若是傳入的scheduler是TrampolineScheduler,那麼線程不須要切換,直接調用subscribe方法便可 source.subscribe(observer); } else { //根據傳入的scheduler,建立Worker Scheduler.Worker w = scheduler.createWorker(); //將傳入的observer進行包裝,包裝爲ObserveOnObserver類。 source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
這裏能夠依據基礎篇的進行整理一下,這裏將觀察者進行了一層包裝,也就是咱們的觀察者由原來的observaer變爲了ObserveOnObserver對象。而被觀察者仍是以前的ObservableCreate(注意,這裏只是依據基礎中.create()建立的類,因此是ObservableCreate,若是是其餘方式建立的被觀察者,那麼這裏可能就是另外一個具體的實現類了),並未改變。以前咱們講過,當調用subscribe方法的onNext(),onComplete()方法,實際上是調用的觀察者的方法。咱們如今看一下ObserveOnObserver的onNext和onComplete方法又是作了什麼神奇的操做。
@Override public void onNext(T t) { if (done) {//若是已經完成,直接返回 return; } if (sourceMode != QueueDisposable.ASYNC) { //將onNext的數據放入隊列queue queue.offer(t); } //進行線程切換 schedule(); } void schedule() { if (getAndIncrement() == 0) { //調用了worker的方法,這裏經過調用線程池,調用了自身的run方法 worker.schedule(this); } }
這裏咱們使用的是IO線程,那麼在 scheduler.createWorker() 中的生成worker時
@NonNull @Override public Worker createWorker() { return new EventLoopWorker(pool.get()); }
那麼跟到這個類裏面的 schedule 方法
@Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } //這裏調用了線程worker的scheduleActual方法,並把Runable對象傳進去 return threadWorker.scheduleActual(action, delayTime, unit, tasks); } public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { //留下鉤子 Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); .... Future<?> f; try { if (delayTime <= 0) { //在線程池中調用封裝以後的Runnable f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; }
能夠看到,其實最終是經過線程池調用了 ObserveOnObserver 自己,這個類實現了 Runnable 接口,咱們看一下run方法裏面作了什麼。
@Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } } //具體的操做 void drainNormal() { int missed = 1; //被觀察者onNext發送的數據隊列 final SimpleQueue<T> q = queue; //實際的觀察者 final Observer<? super T> a = downstream; for (;;) { //檢測是否有異常信息 if (checkTerminated(done, q.isEmpty(), a)) { return; } //遍歷 for (;;) { boolean d = done; T v; //取出隊列中的數據 try { v = q.poll(); } catch (Throwable ex) { //發生異常,則直接調用dispose()和onError()方法 Exceptions.throwIfFatal(ex); disposed = true; upstream.dispose(); q.clear(); a.onError(ex); worker.dispose(); return; } .... //調用實際的觀察者的onNext()方法 a.onNext(v); } ... } }
由於這個操做最終是在scheduler.createWorker()建立的地方進行了處理,才實現了對於以後代碼處理都在io線程中進行了調用。從而實現線程的切換功能。這裏咱們對以前的測試代碼流程作一個總結。
先看一下對於觀察者的onSubscribe()方法的調用流程:
這裏面咱們本身定義的觀察者經過subscribe()方法層層往上調用,最後調用了咱們定義的被觀察者裏面的onSubscribe方法,再一層層的往下調用,最後到咱們本身定義的onSubscribe()方法,裏面不多有線程的切換處理,因此這段代碼在哪兒執行,那麼這段代碼在那裏執行,這個onSubscribe()方法就是在哪一個線程執行。
繼續,咱們看一下onNext()方法
除了 observeOn 方法來處理咱們操做流的下層線程處理以外,咱們也能夠經過 subscribeOn 方法來進行對上層流的線程處理。
測試用代碼:
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("1"); emitter.onComplete(); } }).subscribeOn(Schedulers.io())
如今咱們跟蹤進 subscribeOn 方法
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); // return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
這裏看到,跟咱們基礎篇裏面的 create() 方法有殊途同歸之妙,這裏面生成了一個ObservableSubscribeOn類,這個類也是繼承Observable類的,咱們跟蹤進去看一下。
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); //調用訂閱者的onSubscribe方法,這裏的線程還未進行切換 observer.onSubscribe(parent); //進行線程的切換處理 //1.創造一個SubscribeTask的Runable方法 //2.經過scheduler的scheduleDirect進行線程的切換 //3.經過parent.setDisposable來進行Disposable的切換 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
看起來是否是很像?在基礎篇咱們知道了,這個 subscribeActual 方法裏面的參數就是咱們的觀察者。
咱們看一下里面和以前分析所不一樣的地方,也就是線程的切換
final class SubscribeTask implements Runnable { ... @Override public void run() { //source是咱們上一層的被觀察者,parent是包裝以後的觀察者. //因此會在相關的worker裏面調用source的subscribe方法, //即上層的數據調用已經在woker裏面了(若是是IoScheduler,那麼這裏就是在RxCachedThreadScheduler線程池調用了這個方法 ) source.subscribe(parent); } }
而後看一下這裏面最重要的 scheduler.scheduleDirect 這個方法
@NonNull public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); } @NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { //建立一個Worker,這個是有具體的實現類來實現的,好比咱們的IOScheduler,ImmediateThinScheduler等,具體要看咱們切換傳參 final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; }
這裏咱們對上層切換的流程作一個總結:當調用 subscribeOn 方法的時候,會在建立的調度器中來執行被觀察者的執行代碼,從而實現了對上層的線程切換功能。
先看一下測試代碼中的onNext()方法的調用流程:
其實對於線程的切換,主要是根據裏面傳遞的線程切換函數,將上游或者下游的代碼在指定的線程裏面去執行來實現。
本文由 開了肯 發佈!