不少項目使用流行的Rxjava2 + Retrofit搭建網絡框架,Rxjava如今已經發展到Rxjava2,以前一直都只是再用Rxjava,但歷來沒有了解下Rxjava的內部實現,接下來一步步來分析Rxjava2的源碼,Rxjava2分Observable和Flowable兩種(無被壓和有被壓),咱們今天先從簡單的無背壓的observable來分析。源碼基於rxjava:2.1.1。java
先來段最簡單的代碼,直觀的瞭解下整個Rxjava運行的完整流程。android
1 private void doSomeWork() { 2 Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { 3 @Override 4 public void subscribe(ObservableEmitter<String> e) throws Exception { 5 e.onNext("a"); 6 e.onComplete(); 7 } 8 }); 9 Observer observer = new Observer<String>() { 10 11 @Override 12 public void onSubscribe(Disposable d) { 13 Log.i("lx", " onSubscribe : " + d.isDisposed()); 14 } 15 16 @Override 17 public void onNext(String str) { 18 Log.i("lx", " onNext : " + str); 19 } 20 21 @Override 22 public void onError(Throwable e) { 23 Log.i("lx", " onError : " + e.getMessage()); 24 } 25 26 @Override 27 public void onComplete() { 28 Log.i("lx", " onComplete"); 29 } 30 }; 31 observable.subscribe(observer); 32 }
上面代碼之因此將observable和observer單獨聲明,最後再調用observable.subscribe(observer);
是爲了分步來分析:緩存
Observable是數據的上游,即事件生產者
首先來分析事件是如何生成的,直接看代碼 Observable.create()
方法。網絡
1 @SchedulerSupport(SchedulerSupport.NONE) 2 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { // ObservableOnSubscribe 是個接口,只包含subscribe方法,是事件生產的源頭。 3 ObjectHelper.requireNonNull(source, "source is null"); // 判空 4 return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); 5 }
最重要的是RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));這句代碼。繼續跟蹤進去app
1 /** 2 * Calls the associated hook function. 3 * @param <T> the value type 4 * @param source the hook's input value 5 * @return the value returned by the hook 6 */ 7 @SuppressWarnings({ "rawtypes", "unchecked" }) 8 @NonNull 9 public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { 10 Function<? super Observable, ? extends Observable> f = onObservableAssembly; 11 if (f != null) { 12 return apply(f, source); 13 } 14 return source; 15 }
看註釋,原來這個方法是個hook function。 經過調試得知靜態對象onObservableAssembly默認爲null, 因此此方法直接返回傳入的參數source。
onObservableAssembly能夠經過靜態方法RxJavaPlugins. setOnObservableAssembly ()設置全局的Hook函數, 有興趣的同窗能夠本身去試試。 這裏暫且不談,咱們繼續返回代碼。
如今咱們明白了:框架
1 Observable<String> observable=Observable.create(new ObservableOnSubscribe<String>() { 2 ... 3 ... 4 })
至關於:異步
1 Observable<String> observable=new ObservableCreate(new ObservableOnSubscribe<String>() { 2 ... 3 ... 4 }))
好了,至此咱們明白了,事件的源就是new ObservableCreate()
對象,將ObservableOnSubscribe
做爲參數傳遞給ObservableCreate
的構造函數。
事件是由接口ObservableOnSubscribe
的subscribe方法上產的,至於什麼時候生產事件,稍後再分析。ide
Observer 是數據的下游,即事件消費者
Observer是個interface,包含 :函數
1 void onSubscribe(@NonNull Disposable d); 2 void onNext(@NonNull T t); 3 void onError(@NonNull Throwable e); 4 void onComplete();
上游發送的事件就是再這幾個方法中被消費的。上游什麼時候發送事件、如何發送,稍後再表。oop
重點來了,接下來最重要的方法來了:observable.subscribe(observer);
從這個方法的名字就知道,subscribe是訂閱,是將觀察者(observer)與被觀察者(observable)鏈接起來的方法。只有subscribe方法執行後,上游產生的事件才能被下游接收並處理。其實天然的方式應該是observer訂閱(subscribe) observable, 但這樣會打斷rxjava的鏈式結構。因此採用相反的方式。
接下來看源碼,只列出關鍵代碼
1 public final void subscribe(Observer<? super T> observer) { 2 ObjectHelper.requireNonNull(observer, "observer is null"); 3 ...... 4 observer = RxJavaPlugins.onSubscribe(this, observer); // hook ,默認直接返回observer 5 ...... 6 subscribeActual(observer); // 這個纔是真正實現訂閱的方法。 7 ...... 8 } 9 10 // subscribeActual 是抽象方法,因此須要到實現類中去看具體實現,也就是說實現是在上文中提到的ObservableCreate中 11 protected abstract void subscribeActual(Observer<? super T> observer);
接下來咱們來看ObservableCreate.java:
1 public ObservableCreate(ObservableOnSubscribe<T> source) { 2 this.source = source; // 事件源,生產事件的接口,由咱們本身實現 3 } 4 5 @Override 6 protected void subscribeActual(Observer<? super T> observer) { 7 CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 發射器 8 observer.onSubscribe(parent); //直接回調了觀察者的onSubscribe 9 10 try { 11 // 調用了事件源subscribe方法生產事件,同時將發射器傳給事件源。 12 // 如今咱們明白了,數據源生產事件的subscribe方法只有在observable.subscribe(observer)被執行 13 後才執行的。 換言之,事件流是在訂閱後才產生的。 14 //而observable被建立出來時並不生產事件,同時也不發射事件。 15 source.subscribe(parent); 16 } catch (Throwable ex) { 17 Exceptions.throwIfFatal(ex); 18 parent.onError(ex); 19 } 20 }
如今咱們明白了,數據源生產事件的subscribe方法只有在observable.subscribe(observer)被執行後才執行的。 換言之,事件流是在訂閱後才產生的。而observable被建立出來時並不生產事件,同時也不發射事件。
接下來咱們再來看看事件是如何被髮射出去,同時observer是如何接收到發射的事件的
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
CreateEmitter 實現了ObservableEmitter接口,同時ObservableEmitter接口又繼承了Emitter接口。
CreateEmitter 還實現了Disposable接口,這個disposable接口是用來判斷是否中斷事件發射的。
從名稱上就能看出,這個是發射器,故名思議是用來發射事件的,正是它將上游產生的事件發射到下游的。
Emitter是事件源與下游的橋樑。
CreateEmitter 主要包括方法:
1 void onNext(@NonNull T value); 2 void onError(@NonNull Throwable error); 3 void onComplete(); 4 public void dispose() ; 5 public boolean isDisposed();
是否是跟observer的方法很像?
咱們來看看CreateEmitter中這幾個方法的具體實現:
只列出關鍵代碼
1 public void onNext(T t) { 2 if (!isDisposed()) { // 判斷事件是否須要被丟棄 3 observer.onNext(t); // 調用Emitter的onNext,它會直接調用observer的onNext 4 } 5 } 6 public void onError(Throwable t) { 7 if (!isDisposed()) { 8 try { 9 observer.onError(t); // 調用Emitter的onError,它會直接調用observer的onError 10 } finally { 11 dispose(); // 當onError被觸發時,執行dispose(), 後續onNext,onError, onComplete就不會繼 12 續發射事件了 13 } 14 } 15 } 16 17 @Override 18 public void onComplete() { 19 if (!isDisposed()) { 20 try { 21 observer.onComplete(); // 調用Emitter的onComplete,它會直接調用observer的onComplete 22 } finally { 23 dispose(); // 當onComplete被觸發時,也會執行dispose(), 後續onNext,onError, onComplete 24 一樣不會繼續發射事件了 25 } 26 } 27 }
CreateEmitter 的onError和onComplete方法任何一個執行完都會執行dispose()中斷事件發射,因此observer中的onError和onComplete也只能有一個被執行。
如今終於明白了,事件是如何被髮射給下游的。當訂閱成功後,數據源ObservableOnSubscribe開始生產事件,調用Emitter的onNext,onComplete向下遊發射事件,
Emitter包含了observer的引用,又調用了observer onNext,onComplete,這樣下游observer就接收到了上游發射的數據。
Rxjava的流程大概是:
observer中的onSubscribe在訂閱時即被調用,並傳回了Disposable, observer中能夠利用Disposable來隨時中斷事件流的發射。
今天所列舉的例子是最簡單的一個事件處理流程,沒有使用線程調度,Rxjava最強大的就是異步時對線程的調度和隨時切換觀察者線程,未完待續。
上面分析了Rxjava是如何建立事件源,如何發射事件,什麼時候發射事件,也清楚了上游和下游是如何關聯起來的。
下面着重來分析下Rxjava強大的線程調度是如何實現的。
1 private void doSomeWork() { 2 Observable.create(new ObservableOnSubscribe<String>() { 3 @Override 4 public void subscribe(ObservableEmitter<String> e) throws Exception { 5 Log.i("lx", " subscribe: " + Thread.currentThread().getName()); 6 Thread.sleep(2000); 7 e.onNext("a"); 8 e.onComplete(); 9 } 10 }).subscribe(new Observer<String>() { 11 @Override 12 public void onSubscribe(Disposable d) { 13 Log.i("lx", " onSubscribe: " + Thread.currentThread().getName()); 14 } 15 @Override 16 public void onNext(String str) { 17 Log.i("lx", " onNext: " + Thread.currentThread().getName()); 18 } 19 @Override 20 public void onError(Throwable e) { 21 Log.i("lx", " onError: " + Thread.currentThread().getName()); 22 } 23 @Override 24 public void onComplete() { 25 Log.i("lx", " onComplete: " + Thread.currentThread().getName()); 26 } 27 }); 28 }
運行結果:
1 com.rxjava2.android.samples I/lx: onSubscribe: main 2 com.rxjava2.android.samples I/lx: subscribe: main 3 com.rxjava2.android.samples I/lx: onNext: main 4 com.rxjava2.android.samples I/lx: onComplete: main
由於此方法筆者是在main線程中調用的,因此沒有進行線程調度的狀況下,全部方法都運行在main線程中。但咱們知道Android的UI線程是不能作網絡操做,也不能作耗時操做,因此通常咱們把網絡或耗時操做都放在非UI線程中執行。接下來咱們就來感覺下Rxjava強大的線程調度能力。
1 private void doSomeWork() { 2 Observable.create(new ObservableOnSubscribe<String>() { 3 @Override 4 public void subscribe(ObservableEmitter<String> e) throws Exception { 5 Log.i("lx", " subscribe: " + Thread.currentThread().getName()); 6 Thread.sleep(2000); 7 e.onNext("a"); 8 e.onComplete(); 9 } 10 }).subscribeOn(Schedulers.io()) //增長了這一句 11 .subscribe(new Observer<String>() { 12 @Override 13 public void onSubscribe(Disposable d) { 14 Log.i("lx", " onSubscribe: " + Thread.currentThread().getName()); 15 } 16 @Override 17 public void onNext(String str) { 18 Log.i("lx", " onNext: " + Thread.currentThread().getName()); 19 } 20 @Override 21 public void onError(Throwable e) { 22 Log.i("lx", " onError: " + Thread.currentThread().getName()); 23 } 24 @Override 25 public void onComplete() { 26 Log.i("lx", " onComplete: " + Thread.currentThread().getName()); 27 } 28 }); 29 }
運行結果:
1 com.rxjava2.android.samples I/lx: onSubscribe: main 2 com.rxjava2.android.samples I/lx: subscribe: RxCachedThreadScheduler-1 3 com.rxjava2.android.samples I/lx: onNext: RxCachedThreadScheduler-1 4 com.rxjava2.android.samples I/lx: onComplete: RxCachedThreadScheduler-1
只增長了subscribeOn
這一句代碼, 就發生如此神奇的現象,除了onSubscribe方法還運行在main線程(訂閱發生的線程)其它方法所有都運行在一個名爲RxCachedThreadScheduler-1的線程中。咱們來看看rxjava是怎麼完成這個線程調度的。
首先咱們先分析下Schedulers.io()
這個東東。
1 @NonNull 2 public static Scheduler io() { 3 return RxJavaPlugins.onIoScheduler(IO); // hook function 4 // 等價於 5 return IO; 6 }
再看看IO是什麼, IO是個static變量,初始化的地方是
1 IO = RxJavaPlugins.initIoScheduler(new IOTask()); // 又是hook function 2 // 等價於 3 IO = callRequireNonNull(new IOTask()); 4 // 等價於 5 IO = new IOTask().call();
繼續看看IOTask
1 static final class IOTask implements Callable<Scheduler> { 2 @Override 3 public Scheduler call() throws Exception { 4 return IoHolder.DEFAULT; 5 // 等價於 6 return new IoScheduler(); 7 } 8 }
代碼層次很深,爲了便於記憶,咱們再回顧一下:
1 Schedulers.io()等價於 new IoScheduler() 2 3 // Schedulers.io()等價於 4 @NonNull 5 public static Scheduler io() { 6 return new IoScheduler(); 7 }
好了,排除了其餘干擾代碼,接下來看看IoScheduler()是什麼東東了
IoScheduler看名稱就知道是個IO線程調度器,根據代碼註釋得知,它就是一個用來建立和緩存線程的線程池。看到這個豁然開朗了,原來Rxjava就是經過這個調度器來調度線程的,至於具體怎麼實現咱們接着往下看
1 public IoScheduler() { 2 this(WORKER_THREAD_FACTORY); 3 } 4 5 public IoScheduler(ThreadFactory threadFactory) { 6 this.threadFactory = threadFactory; 7 this.pool = new AtomicReference<CachedWorkerPool>(NONE); 8 start(); 9 } 10 11 @Override 12 public void start() { 13 CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory); 14 if (!pool.compareAndSet(NONE, update)) { 15 update.shutdown(); 16 } 17 } 18 19 CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { 20 this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L; 21 this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>(); 22 this.allWorkers = new CompositeDisposable(); 23 this.threadFactory = threadFactory; 24 25 ScheduledExecutorService evictor = null; 26 Future<?> task = null; 27 if (unit != null) { 28 evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY); 29 task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS); 30 } 31 evictorService = evictor; 32 evictorTask = task; 33 }
從上面的代碼能夠看出,new IoScheduler()
後Rxjava會建立CachedWorkerPool
的線程池,同時也建立並運行了一個名爲RxCachedWorkerPoolEvictor
的清除線程,主要做用是清除再也不使用的一些線程。
但目前只建立了線程池並無實際的thread,因此Schedulers.io()
至關於只作了線程調度的前期準備。
OK,終於能夠開始分析Rxjava是如何實現線程調度的。回到Demo來看subscribeOn()
方法的內部實現:
1 public final Observable<T> subscribeOn(Scheduler scheduler) { 2 ObjectHelper.requireNonNull(scheduler, "scheduler is null"); 3 return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); 4 }
很熟悉的代碼RxJavaPlugins.onAssembly
,上一篇已經分析過這個方法,就是個hook function, 等價於直接return new ObservableSubscribeOn<T>(this, scheduler);
, 如今知道了這裏的scheduler其實就是IoScheduler。
跟蹤代碼進入ObservableSubscribeOn
,
能夠看到這個ObservableSubscribeOn 繼承自Observable,而且擴展了一些屬性,增長了scheduler。 各位看官,這不就是典型的裝飾模式嘛,Rxjava中大量用到了裝飾模式,後面還會常常看到這種wrap類。
上篇文章咱們已經知道了Observable.subscribe()
方法最終都是調用了對應的實現類的subscribeActual
方法。咱們重點分析下subscribeActual
:
1 @Override 2 public void subscribeActual(final Observer<? super T> s) { 3 final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); 4 5 // 沒有任何線程調度,直接調用的,因此下游的onSubscribe方法沒有切換線程, 6 //本文demo中下游就是觀察者,因此咱們明白了爲何只有onSubscribe還運行在main線程 7 s.onSubscribe(parent); 8 9 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); 10 }
SubscribeOnObserver
也是裝飾模式的體現, 是對下游observer
的一個wrap
,只是添加了Disposable
的管理。
接下來分析最重要的scheduler.scheduleDirect(new SubscribeTask(parent))
1 // 這個類很簡單,就是一個Runnable,最終運行上游的subscribe方法 2 final class SubscribeTask implements Runnable { 3 private final SubscribeOnObserver<T> parent; 4 5 SubscribeTask(SubscribeOnObserver<T> parent) { 6 this.parent = parent; 7 } 8 9 @Override 10 public void run() { 11 source.subscribe(parent); 12 } 13 } 14 @NonNull 15 public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { 16 // IoSchedular 中的createWorker() 17 final Worker w = createWorker(); 18 // hook decoratedRun=run; 19 final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); 20 // decoratedRun的wrap,增長了Dispose的管理 21 DisposeTask task = new DisposeTask(decoratedRun, w); 22 // 線程調度 23 w.schedule(task, delay, unit); 24 25 return task; 26 }
回到IoSchedular
1 public Worker createWorker() { 2 // 工做線程是在此時建立的 3 return new EventLoopWorker(pool.get()); 4 } 5 6 public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { 7 if (tasks.isDisposed()) { 8 // don't schedule, we are unsubscribed 9 return EmptyDisposable.INSTANCE; 10 } 11 // action 中就包含上游subscribe的runnable 12 return threadWorker.scheduleActual(action, delayTime, unit, tasks); 13 }
最終線程是在這個方法內調度並執行的。
1 public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { 2 // decoratedRun = run, 包含上游subscribe方法的runnable 3 Runnable decoratedRun = RxJavaPlugins.onSchedule(run); 4 5 // decoratedRun的wrap,增長了dispose的管理 6 ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); 7 8 if (parent != null) { 9 if (!parent.add(sr)) { 10 return sr; 11 } 12 } 13 14 // 最終decoratedRun被調度到以前建立或從線程池中取出的線程, 15 // 也就是說在RxCachedThreadScheduler-x運行 16 Future<?> f; 17 try { 18 if (delayTime <= 0) { 19 f = executor.submit((Callable<Object>)sr); 20 } else { 21 f = executor.schedule((Callable<Object>)sr, delayTime, unit); 22 } 23 sr.setFuture(f); 24 } catch (RejectedExecutionException ex) { 25 if (parent != null) { 26 parent.remove(sr); 27 } 28 RxJavaPlugins.onError(ex); 29 } 30 31 return sr; 32 }
至此咱們終於明白了Rxjava是如何調度線程並執行的,經過subscribeOn方法將上游生產事件的方法運行在指定的調度線程中。
1 com.rxjava2.android.samples I/lx: onSubscribe: main 2 com.rxjava2.android.samples I/lx: subscribe: RxCachedThreadScheduler-1 3 com.rxjava2.android.samples I/lx: onNext: RxCachedThreadScheduler-1 4 com.rxjava2.android.samples I/lx: onComplete: RxCachedThreadScheduler-1
從上面的運行結果來看,由於上游生產者已被調度到RxCachedThreadScheduler-1
線程中,同時發射事件並無切換線程,因此發射後消費事件的onNext onErro onComplete
也在RxCachedThreadScheduler-1
線程中。
Schedulers.io()
等價於 new IoScheduler()。
new IoScheduler()
Rxjava
建立了線程池,爲後續建立線程作準備,同時建立並運行了一個清理線程RxCachedWorkerPoolEvictor
,按期執行清理任務。subscribeOn()
返回一個ObservableSubscribeOn
對象,它是Observable
的一個裝飾類,增長了scheduler
。subscribe()
方法,在這個方法調用後,subscribeActual()
被調用,才真正執行了IoSchduler
中的createWorker()
建立線程並運行,最終將上游Observable
的subscribe()
方法調度到新建立的線程中運行。如今瞭解了被觀察者執行線程是如何被調度到指定線程中執行的,但不少狀況下,咱們但願觀察者(事件下游)處理事件最好在UI線程執行,好比更新UI操做等。下面分析下游什麼時候調度,如何調度因爲篇幅問題。
1 private void doSomeWork() { 2 Observable.create(new ObservableOnSubscribe<String>() { 3 @Override 4 public void subscribe(ObservableEmitter<String> e) throws Exception { 5 Log.i("lx", " subscribe: " + Thread.currentThread().getName()); 6 e.onNext("a"); 7 e.onComplete(); 8 } 9 }).subscribeOn(Schedulers.io()) 10 .observeOn(AndroidSchedulers.mainThread()) 11 .subscribe(new Observer<String>() { 12 @Override 13 public void onSubscribe(Disposable d) { 14 Log.i("lx", " onSubscribe: " + Thread.currentThread().getName()); 15 } 16 @Override 17 public void onNext(String str) { 18 Log.i("lx", " onNext: " + Thread.currentThread().getName()); 19 } 20 @Override 21 public void onError(Throwable e) { 22 Log.i("lx", " onError: " + Thread.currentThread().getName()); 23 } 24 @Override 25 public void onComplete() { 26 Log.i("lx", " onComplete: " + Thread.currentThread().getName()); 27 } 28 }); 29 }
看看運行結果:
1 com.rxjava2.android.samples I/lx: onSubscribe: main 2 com.rxjava2.android.samples I/lx: subscribe: RxCachedThreadScheduler-1 3 com.rxjava2.android.samples I/lx: onNext: main 4 com.rxjava2.android.samples I/lx: onComplete: main
從結果能夠看出,事件的生產線程運行在RxCachedThreadScheduler-1中,而事件的消費線程則被調度到了main線程中。關鍵代碼是由於這句.observeOn(AndroidSchedulers.mainThread())
。 下面咱們着重分析下這句代碼都作了哪些事情。
先來看看AndroidSchedulers.mainThread()
是什麼?貼代碼
1 /** A {@link Scheduler} which executes actions on the Android main thread. */ 2 public static Scheduler mainThread() { 3 return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); 4 }
註釋已經說的很明白了,是一個在主線程執行任務的scheduler,接着看
1 private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler( 2 new Callable<Scheduler>() { 3 @Override public Scheduler call() throws Exception { 4 return MainHolder.DEFAULT; 5 } 6 }); 7 8 public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) { 9 if (scheduler == null) { 10 throw new NullPointerException("scheduler == null"); 11 } 12 Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler; 13 if (f == null) { 14 return callRequireNonNull(scheduler); 15 } 16 return applyRequireNonNull(f, scheduler); 17 }
代碼很簡單,這個AndroidSchedulers.mainThread()
想當於new HandlerScheduler(new Handler(Looper.getMainLooper()))
,原來是利用Android
的Handler
來調度到main
線程的。
咱們再看看HandlerScheduler
,它與咱們上節分析的IOScheduler
相似,都是繼承自Scheduler
,因此AndroidSchedulers.mainThread()
其實就是是建立了一個運行在main thread
上的scheduler。
好了,咱們再回過頭來看observeOn
方法。
1 public final Observable<T> observeOn(Scheduler scheduler) { 2 return observeOn(scheduler, false, bufferSize()); 3 } 4 5 public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { 6 ObjectHelper.requireNonNull(scheduler, "scheduler is null"); 7 ObjectHelper.verifyPositive(bufferSize, "bufferSize"); 8 return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); 9 } 10
重點是這個new ObservableObserveOn
,看名字是否是有種似成相識的感受,還記得上篇的ObservableSubscribeOn
嗎? 它倆就是親兄弟,是繼承自同一個父類。
重點仍是這個方法,咱們前文已經提到了,Observable的subscribe方法最終都是調用subscribeActual
方法。下面看看這個方法的實現:
1 @Override 2 protected void subscribeActual(Observer<? super T> observer) { 3 // scheduler 就是前面提到的 HandlerScheduler,因此進入else分支 4 if (scheduler instanceof TrampolineScheduler) { 5 source.subscribe(observer); 6 } else { 7 // 建立 HandlerWorker 8 Scheduler.Worker w = scheduler.createWorker(); 9 // 調用上游Observable的subscribe,將訂閱向上傳遞 10 source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); 11 } 12 }
從上面代碼能夠看到使用了ObserveOnObserver
類對observer
進行裝飾,好了,咱們再來看看ObserveOnObserver
。
咱們已經知道了,事件源發射的事件,是經過observer的onNext
,onError
,onComplete
發射到下游的。因此看看ObserveOnObserver
的這三個方法是如何實現的。
因爲篇幅問題,咱們只分析onNext
方法,onError
和onComplete
方法有興趣的同窗能夠本身分析下。
1 @Override 2 public void onNext(T t) { 3 if (done) { 4 return; 5 } 6 7 // 若是是非異步方式,將上游發射的時間加入到隊列 8 if (sourceMode != QueueDisposable.ASYNC) { 9 queue.offer(t); 10 } 11 schedule(); 12 } 13 14 void schedule() { 15 // 保證只有惟一任務在運行 16 if (getAndIncrement() == 0) { 17 // 調用的就是HandlerWorker的schedule方法 18 worker.schedule(this); 19 } 20 } 21 22 @Override 23 public Disposable schedule(Runnable run, long delay, TimeUnit unit) { 24 if (run == null) throw new NullPointerException("run == null"); 25 if (unit == null) throw new NullPointerException("unit == null"); 26 27 if (disposed) { 28 return Disposables.disposed(); 29 } 30 31 run = RxJavaPlugins.onSchedule(run); 32 33 ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); 34 35 Message message = Message.obtain(handler, scheduled); 36 message.obj = this; // Used as token for batch disposal of this worker's runnables. 37 38 handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay))); 39 40 // Re-check disposed state for removing in case we were racing a call to dispose(). 41 if (disposed) { 42 handler.removeCallbacks(scheduled); 43 return Disposables.disposed(); 44 } 45 46 return scheduled; 47 }
schedule
方法將傳入的run
調度到對應的handle
所在的線程來執行,這個例子裏就是有main
線程來完成。 再回去看看前面傳入的run
吧。
回到ObserveOnObserver
中的run
方法:
1 @Override 2 public void run() { 3 // 此例子中代碼不會進入這個分支,至於這個drainFused是什麼,後面章節再討論。 4 if (outputFused) { 5 drainFused(); 6 } else { 7 drainNormal(); 8 } 9 } 10 11 void drainNormal() { 12 int missed = 1; 13 14 final SimpleQueue<T> q = queue; 15 final Observer<? super T> a = actual; 16 17 for (;;) { 18 if (checkTerminated(done, q.isEmpty(), a)) { 19 return; 20 } 21 22 for (;;) { 23 boolean d = done; 24 T v; 25 26 try { 27 // 從隊列中queue中取出事件 28 v = q.poll(); 29 } catch (Throwable ex) { 30 Exceptions.throwIfFatal(ex); 31 s.dispose(); 32 q.clear(); 33 a.onError(ex); 34 worker.dispose(); 35 return; 36 } 37 boolean empty = v == null; 38 39 if (checkTerminated(d, empty, a)) { 40 return; 41 } 42 43 if (empty) { 44 break; 45 } 46 //調用下游observer的onNext將事件v發射出去 47 a.onNext(v); 48 } 49 50 missed = addAndGet(-missed); 51 if (missed == 0) { 52 break; 53 } 54 } 55 }
至此咱們明白了RXjava是如何調度消費者線程了。
Rxjava調度消費者如今的流程,以observeOn(AndroidSchedulers.mainThread())
爲例。
AndroidSchedulers.mainThread()
先建立一個包含handler
的Scheduler
, 這個handler
是主線程的handler
。observeOn
方法建立ObservableObserveOn
,它是上游Observable
的一個裝飾類,其中包含前面建立的Scheduler
和bufferSize
等.subscribe
被調用後,ObservableObserveOn
的subscribeActual
方法建立Scheduler.Worker
並調用上游的subscribe
方法,同時將自身接收的參數'observer'用裝飾類ObserveOnObserver
裝飾後傳遞給上游。ObserveOnObserver
的onNext
、onError
和onComplete
方法時,ObserveOnObserver
將上游發送的事件統統加入到隊列queue
中,而後再調用scheduler
將處理事件的方法調度到對應的線程中(本例會調度到main thread)。 處理事件的方法將queue
中保存的事件取出來,調用下游原始的observer再發射出去。observeOn
調度後的thread中。通過前面兩節的分析,咱們已經明白了Rxjava是如何對線程進行調度的。
subscribe
方法是由下游一步步向上游進行傳遞的。會調用上游的subscribe
,直到調用到事件源。而上游的source
每每是通過裝飾後的Observable
, Rxjava就是利用ObservableSubscribeOn
將subscribe
方法調度到了指定線程運行,生產者線程最終會運行在被調度後的線程中。但屢次調用subscribeOn
方法會怎麼樣呢? 咱們知道由於subscribe
方法是由下而上傳遞的,因此事件源的生產者線程最終都只會運行在第一次執行subscribeOn
所調度的線程中,換句話就是屢次調用subscribeOn
方法,只有第一次有效。
onNext
、onError
、onComplete
方法會調用下游傳入的observer的對應方法。每每下游傳遞的observer對象也是通過裝飾後的observer對象。Rxjava就是利用ObserveOnObserver
將執行線程調度後,再調用下游對應的onNext
、onError
、onComplete
方法,這樣下游消費者就運行再了指定的線程內。 那麼屢次調用observeOn
調度不一樣的線程會怎麼樣呢? 由於事件是由上而下發射的,因此每次用observeOn
切換完線程後,對下游的事件消費都有效,好比下游的map操做符。最終的事件消費線程運行在最後一個observeOn
切換後線程中。onSubscribe
運行在subscribe
的調用線程中,這個就不具體分析了。