RxJava2 源碼分析

前言

不少項目使用流行的Rxjava2 + Retrofit搭建網絡框架,Rxjava如今已經發展到Rxjava2,以前一直都只是再用Rxjava,但歷來沒有了解下Rxjava的內部實現,接下來一步步來分析Rxjava2的源碼,Rxjava2分Observable和Flowable兩種(無被壓和有被壓),咱們今天先從簡單的無背壓的observable來分析。源碼基於rxjava:2.1.1。java

1、Rxjava如何建立事件源、發射事件、什麼時候發射事件、如何將觀察者和被觀察者關聯起來

簡單的例子

先來段最簡單的代碼,直觀的瞭解下整個Rxjava運行的完整流程。android

 1 private void doSomeWork() {
 2         Observable<String> observable =  Observable.create(new ObservableOnSubscribe<String>() {
 3             @Override
 4             public void subscribe(ObservableEmitter<String> e) throws Exception {
 5                 e.onNext("a");
 6                 e.onComplete();
 7             }
 8         });
 9         Observer observer = new Observer<String>() {
10 
11             @Override
12             public void onSubscribe(Disposable d) {
13                 Log.i("lx", " onSubscribe : " + d.isDisposed());
14             }
15 
16             @Override
17             public void onNext(String str) {
18                 Log.i("lx", " onNext : " + str);
19             }
20 
21             @Override
22             public void onError(Throwable e) {
23                 Log.i("lx", " onError : " + e.getMessage());
24             }
25 
26             @Override
27             public void onComplete() {
28                 Log.i("lx", " onComplete");
29             }
30         };
31         observable.subscribe(observer);
32     }

 

上面代碼之因此將observable和observer單獨聲明,最後再調用observable.subscribe(observer);
是爲了分步來分析:緩存

  1. 被觀察者 Observable 如何生產事件的
  2. 被觀察者 Observable 什麼時候生產事件的
  3. 觀察者Observer是什麼時候接收到上游事件的
  4. Observable 與Observer是如何關聯在一塊兒的

Observable

Observable是數據的上游,即事件生產者
首先來分析事件是如何生成的,直接看代碼 Observable.create()方法。網絡

1 @SchedulerSupport(SchedulerSupport.NONE)
2     public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {    // ObservableOnSubscribe 是個接口,只包含subscribe方法,是事件生產的源頭。
3         ObjectHelper.requireNonNull(source, "source is null"); // 判空
4         return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
5     }

最重要的是RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));這句代碼。繼續跟蹤進去app

 1 /**
 2      * Calls the associated hook function.
 3      * @param <T> the value type
 4      * @param source the hook's input value
 5      * @return the value returned by the hook
 6      */
 7     @SuppressWarnings({ "rawtypes", "unchecked" })
 8     @NonNull
 9     public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
10         Function<? super Observable, ? extends Observable> f = onObservableAssembly;
11         if (f != null) {
12             return apply(f, source);
13         }
14         return source;
15     }

 

看註釋,原來這個方法是個hook function。 經過調試得知靜態對象onObservableAssembly默認爲null, 因此此方法直接返回傳入的參數source。
onObservableAssembly能夠經過靜態方法RxJavaPlugins. setOnObservableAssembly ()設置全局的Hook函數, 有興趣的同窗能夠本身去試試。 這裏暫且不談,咱們繼續返回代碼。
如今咱們明白了:框架

1  Observable<String> observable=Observable.create(new ObservableOnSubscribe<String>() {
2   ...
3   ...
4 })

 

至關於:異步

1 Observable<String> observable=new ObservableCreate(new ObservableOnSubscribe<String>() {
2   ...
3   ...
4 }))

 

好了,至此咱們明白了,事件的源就是new ObservableCreate()對象,將ObservableOnSubscribe做爲參數傳遞給ObservableCreate的構造函數。
事件是由接口ObservableOnSubscribe的subscribe方法上產的,至於什麼時候生產事件,稍後再分析。ide

Observer

Observer 是數據的下游,即事件消費者
Observer是個interface,包含 :函數

1     void onSubscribe(@NonNull Disposable d);
2     void onNext(@NonNull T t);
3     void onError(@NonNull Throwable e);
4     void onComplete();

上游發送的事件就是再這幾個方法中被消費的。上游什麼時候發送事件、如何發送,稍後再表。oop

subscribe

重點來了,接下來最重要的方法來了:observable.subscribe(observer);
從這個方法的名字就知道,subscribe是訂閱,是將觀察者(observer)與被觀察者(observable)鏈接起來的方法。只有subscribe方法執行後,上游產生的事件才能被下游接收並處理。其實天然的方式應該是observer訂閱(subscribe) observable, 但這樣會打斷rxjava的鏈式結構。因此採用相反的方式。
接下來看源碼,只列出關鍵代碼

 1 public final void subscribe(Observer<? super T> observer) {
 2         ObjectHelper.requireNonNull(observer, "observer is null");
 3         ......
 4        observer = RxJavaPlugins.onSubscribe(this, observer); // hook ,默認直接返回observer
 5        ......
 6        subscribeActual(observer);  // 這個纔是真正實現訂閱的方法。
 7        ......
 8     }
 9 
10 // subscribeActual 是抽象方法,因此須要到實現類中去看具體實現,也就是說實現是在上文中提到的ObservableCreate中
11 protected abstract void subscribeActual(Observer<? super T> observer);

 

接下來咱們來看ObservableCreate.java:

 1 public ObservableCreate(ObservableOnSubscribe<T> source) {
 2         this.source = source;  // 事件源,生產事件的接口,由咱們本身實現
 3     }
 4 
 5    @Override
 6     protected void subscribeActual(Observer<? super T> observer) {
 7         CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 發射器
 8         observer.onSubscribe(parent);  //直接回調了觀察者的onSubscribe
 9 
10         try {
11             // 調用了事件源subscribe方法生產事件,同時將發射器傳給事件源。 
12             // 如今咱們明白了,數據源生產事件的subscribe方法只有在observable.subscribe(observer)被執行
13               後才執行的。 換言之,事件流是在訂閱後才產生的。
14             //而observable被建立出來時並不生產事件,同時也不發射事件。
15           source.subscribe(parent);  
16         } catch (Throwable ex) {
17             Exceptions.throwIfFatal(ex);
18             parent.onError(ex);
19         }
20     }

 

如今咱們明白了,數據源生產事件的subscribe方法只有在observable.subscribe(observer)被執行後才執行的。 換言之,事件流是在訂閱後才產生的。而observable被建立出來時並不生產事件,同時也不發射事件。
接下來咱們再來看看事件是如何被髮射出去,同時observer是如何接收到發射的事件的
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
CreateEmitter 實現了ObservableEmitter接口,同時ObservableEmitter接口又繼承了Emitter接口。
CreateEmitter 還實現了Disposable接口,這個disposable接口是用來判斷是否中斷事件發射的。
從名稱上就能看出,這個是發射器,故名思議是用來發射事件的,正是它將上游產生的事件發射到下游的。
Emitter是事件源與下游的橋樑。
CreateEmitter 主要包括方法:

1 void onNext(@NonNull T value);
2     void onError(@NonNull Throwable error);
3     void onComplete();
4     public void dispose() ;
5     public boolean isDisposed();

 

是否是跟observer的方法很像?
咱們來看看CreateEmitter中這幾個方法的具體實現:
只列出關鍵代碼

 1 public void onNext(T t) {
 2          if (!isDisposed()) { // 判斷事件是否須要被丟棄
 3              observer.onNext(t); // 調用Emitter的onNext,它會直接調用observer的onNext
 4          }
 5       }
 6       public void onError(Throwable t) {
 7            if (!isDisposed()) {
 8                 try {
 9                     observer.onError(t); // 調用Emitter的onError,它會直接調用observer的onError
10                 } finally {
11                     dispose();  // 當onError被觸發時,執行dispose(), 後續onNext,onError, onComplete就不會繼
12                                     續發射事件了
13                 }
14             }
15         }
16 
17        @Override
18         public void onComplete() {
19             if (!isDisposed()) {
20                 try {
21                     observer.onComplete(); // 調用Emitter的onComplete,它會直接調用observer的onComplete
22                 } finally {
23                     dispose();  // 當onComplete被觸發時,也會執行dispose(), 後續onNext,onError, onComplete
24                                       一樣不會繼續發射事件了
25                 }
26             }
27         }

 

CreateEmitter 的onError和onComplete方法任何一個執行完都會執行dispose()中斷事件發射,因此observer中的onError和onComplete也只能有一個被執行。
如今終於明白了,事件是如何被髮射給下游的。當訂閱成功後,數據源ObservableOnSubscribe開始生產事件,調用Emitter的onNext,onComplete向下遊發射事件,

Emitter包含了observer的引用,又調用了observer onNext,onComplete,這樣下游observer就接收到了上游發射的數據。

總結

Rxjava的流程大概是:

  1. Observable.create 建立事件源,但並不生產也不發射事件。
  2. 實現observer接口,但此時沒有也沒法接受到任何發射來的事件。
  3. 訂閱 observable.subscribe(observer), 此時會調用具體Observable的實現類中的subscribeActual方法,
    此時會纔會真正觸發事件源生產事件,事件源生產出來的事件經過Emitter的onNext,onError,onComplete發射給observer對應的方法由下游observer消費掉。從而完成整個事件流的處理。

     observer中的onSubscribe在訂閱時即被調用,並傳回了Disposable, observer中能夠利用Disposable來隨時中斷事件流的發射。

今天所列舉的例子是最簡單的一個事件處理流程,沒有使用線程調度,Rxjava最強大的就是異步時對線程的調度和隨時切換觀察者線程,未完待續。

 

上面分析了Rxjava是如何建立事件源,如何發射事件,什麼時候發射事件,也清楚了上游和下游是如何關聯起來的。
下面着重來分析下Rxjava強大的線程調度是如何實現的。

2、RxJava的線程調度機制

簡單的例子

 1 private void doSomeWork() {
 2         Observable.create(new ObservableOnSubscribe<String>() {
 3             @Override
 4             public void subscribe(ObservableEmitter<String> e) throws Exception {
 5                 Log.i("lx", " subscribe: " + Thread.currentThread().getName());
 6                 Thread.sleep(2000);
 7                 e.onNext("a");
 8                 e.onComplete();
 9             }
10         }).subscribe(new Observer<String>() {
11             @Override
12             public void onSubscribe(Disposable d) {
13                 Log.i("lx", " onSubscribe: " + Thread.currentThread().getName());
14             }
15             @Override
16             public void onNext(String str) {
17                 Log.i("lx", " onNext: " + Thread.currentThread().getName());
18             }
19             @Override
20             public void onError(Throwable e) {
21                 Log.i("lx", " onError: " + Thread.currentThread().getName());
22             }
23             @Override
24             public void onComplete() {
25                 Log.i("lx", " onComplete: " + Thread.currentThread().getName());
26             }
27         });
28     }

運行結果:

1 com.rxjava2.android.samples I/lx:  onSubscribe: main
2 com.rxjava2.android.samples I/lx:  subscribe: main
3 com.rxjava2.android.samples I/lx:  onNext: main
4 com.rxjava2.android.samples I/lx:  onComplete: main

 

由於此方法筆者是在main線程中調用的,因此沒有進行線程調度的狀況下,全部方法都運行在main線程中。但咱們知道Android的UI線程是不能作網絡操做,也不能作耗時操做,因此通常咱們把網絡或耗時操做都放在非UI線程中執行。接下來咱們就來感覺下Rxjava強大的線程調度能力。

 1 private void doSomeWork() {
 2         Observable.create(new ObservableOnSubscribe<String>() {
 3             @Override
 4             public void subscribe(ObservableEmitter<String> e) throws Exception {
 5                 Log.i("lx", " subscribe: " + Thread.currentThread().getName());
 6                 Thread.sleep(2000);
 7                 e.onNext("a");
 8                 e.onComplete();
 9             }
10         }).subscribeOn(Schedulers.io()) //增長了這一句
11           .subscribe(new Observer<String>() {
12             @Override
13             public void onSubscribe(Disposable d) {
14                 Log.i("lx", " onSubscribe: " + Thread.currentThread().getName());
15             }
16             @Override
17             public void onNext(String str) {
18                 Log.i("lx", " onNext: " + Thread.currentThread().getName());
19             }
20             @Override
21             public void onError(Throwable e) {
22                 Log.i("lx", " onError: " + Thread.currentThread().getName());
23             }
24             @Override
25             public void onComplete() {
26                 Log.i("lx", " onComplete: " + Thread.currentThread().getName());
27             }
28         });
29     }

運行結果:

1 com.rxjava2.android.samples I/lx:  onSubscribe: main
2 com.rxjava2.android.samples I/lx:  subscribe: RxCachedThreadScheduler-1
3 com.rxjava2.android.samples I/lx:  onNext: RxCachedThreadScheduler-1
4 com.rxjava2.android.samples I/lx:  onComplete: RxCachedThreadScheduler-1

只增長了subscribeOn這一句代碼, 就發生如此神奇的現象,除了onSubscribe方法還運行在main線程(訂閱發生的線程)其它方法所有都運行在一個名爲RxCachedThreadScheduler-1的線程中。咱們來看看rxjava是怎麼完成這個線程調度的。

線程調度subscribeOn

首先咱們先分析下Schedulers.io()這個東東。

1  @NonNull
2     public static Scheduler io() {
3         return RxJavaPlugins.onIoScheduler(IO); // hook function
4         // 等價於
5         return IO;
6     }

再看看IO是什麼, IO是個static變量,初始化的地方是

1 IO = RxJavaPlugins.initIoScheduler(new IOTask()); // 又是hook function
2 // 等價於
3 IO = callRequireNonNull(new IOTask());
4 // 等價於
5 IO = new IOTask().call();

繼續看看IOTask

1 static final class IOTask implements Callable<Scheduler> {
2         @Override
3         public Scheduler call() throws Exception {
4             return IoHolder.DEFAULT;
5             // 等價於
6             return new IoScheduler();
7         }
8     }

代碼層次很深,爲了便於記憶,咱們再回顧一下:

1 Schedulers.io()等價於 new IoScheduler()
2 
3     // Schedulers.io()等價於
4     @NonNull
5     public static Scheduler io() {
6         return new IoScheduler();
7     }

好了,排除了其餘干擾代碼,接下來看看IoScheduler()是什麼東東了

IoScheduler看名稱就知道是個IO線程調度器,根據代碼註釋得知,它就是一個用來建立和緩存線程的線程池。看到這個豁然開朗了,原來Rxjava就是經過這個調度器來調度線程的,至於具體怎麼實現咱們接着往下看

 1 public IoScheduler() {
 2         this(WORKER_THREAD_FACTORY);
 3     }
 4     
 5     public IoScheduler(ThreadFactory threadFactory) {
 6         this.threadFactory = threadFactory;
 7         this.pool = new AtomicReference<CachedWorkerPool>(NONE);
 8         start();
 9     }
10 
11     @Override
12     public void start() {
13         CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
14         if (!pool.compareAndSet(NONE, update)) {
15             update.shutdown();
16         }
17     }
18     
19     CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
20             this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
21             this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
22             this.allWorkers = new CompositeDisposable();
23             this.threadFactory = threadFactory;
24 
25             ScheduledExecutorService evictor = null;
26             Future<?> task = null;
27             if (unit != null) {
28                 evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
29                 task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
30             }
31             evictorService = evictor;
32             evictorTask = task;
33         }

從上面的代碼能夠看出,new IoScheduler()後Rxjava會建立CachedWorkerPool的線程池,同時也建立並運行了一個名爲RxCachedWorkerPoolEvictor的清除線程,主要做用是清除再也不使用的一些線程。

但目前只建立了線程池並無實際的thread,因此Schedulers.io()至關於只作了線程調度的前期準備。

OK,終於能夠開始分析Rxjava是如何實現線程調度的。回到Demo來看subscribeOn()方法的內部實現:

1 public final Observable<T> subscribeOn(Scheduler scheduler) {
2         ObjectHelper.requireNonNull(scheduler, "scheduler is null");
3         return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
4     }

很熟悉的代碼RxJavaPlugins.onAssembly,上一篇已經分析過這個方法,就是個hook function, 等價於直接return new ObservableSubscribeOn<T>(this, scheduler);, 如今知道了這裏的scheduler其實就是IoScheduler。

跟蹤代碼進入ObservableSubscribeOn
能夠看到這個ObservableSubscribeOn 繼承自Observable,而且擴展了一些屬性,增長了scheduler。 各位看官,這不就是典型的裝飾模式嘛,Rxjava中大量用到了裝飾模式,後面還會常常看到這種wrap類。

上篇文章咱們已經知道了Observable.subscribe()方法最終都是調用了對應的實現類的subscribeActual方法。咱們重點分析下subscribeActual:

 1 @Override
 2     public void subscribeActual(final Observer<? super T> s) {
 3         final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
 4 
 5         // 沒有任何線程調度,直接調用的,因此下游的onSubscribe方法沒有切換線程, 
 6         //本文demo中下游就是觀察者,因此咱們明白了爲何只有onSubscribe還運行在main線程
 7         s.onSubscribe(parent);
 8 
 9         parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
10     }

SubscribeOnObserver也是裝飾模式的體現, 是對下游observer的一個wrap,只是添加了Disposable的管理。

接下來分析最重要的scheduler.scheduleDirect(new SubscribeTask(parent))

 1 // 這個類很簡單,就是一個Runnable,最終運行上游的subscribe方法
 2     final class SubscribeTask implements Runnable {
 3         private final SubscribeOnObserver<T> parent;
 4 
 5         SubscribeTask(SubscribeOnObserver<T> parent) {
 6             this.parent = parent;
 7         }
 8 
 9         @Override
10         public void run() {
11             source.subscribe(parent);
12         }
13     }
14     @NonNull
15     public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
16         // IoSchedular 中的createWorker()
17         final Worker w = createWorker();
18         // hook decoratedRun=run;
19         final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
20         // decoratedRun的wrap,增長了Dispose的管理
21         DisposeTask task = new DisposeTask(decoratedRun, w);
22         // 線程調度
23         w.schedule(task, delay, unit);
24 
25         return task;
26     }

回到IoSchedular

 1 public Worker createWorker() {
 2         // 工做線程是在此時建立的
 3         return new EventLoopWorker(pool.get());
 4     }
 5     
 6     public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
 7             if (tasks.isDisposed()) {
 8                 // don't schedule, we are unsubscribed
 9                 return EmptyDisposable.INSTANCE;
10             }
11             // action 中就包含上游subscribe的runnable
12             return threadWorker.scheduleActual(action, delayTime, unit, tasks);
13         }

最終線程是在這個方法內調度並執行的。

 1 public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
 2         // decoratedRun = run, 包含上游subscribe方法的runnable
 3         Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
 4 
 5         // decoratedRun的wrap,增長了dispose的管理
 6         ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
 7 
 8         if (parent != null) {
 9             if (!parent.add(sr)) {
10                 return sr;
11             }
12         }
13 
14         // 最終decoratedRun被調度到以前建立或從線程池中取出的線程,
15         // 也就是說在RxCachedThreadScheduler-x運行
16         Future<?> f;
17         try {
18             if (delayTime <= 0) {
19                 f = executor.submit((Callable<Object>)sr);
20             } else {
21                 f = executor.schedule((Callable<Object>)sr, delayTime, unit);
22             }
23             sr.setFuture(f);
24         } catch (RejectedExecutionException ex) {
25             if (parent != null) {
26                 parent.remove(sr);
27             }
28             RxJavaPlugins.onError(ex);
29         }
30 
31         return sr;
32     }

至此咱們終於明白了Rxjava是如何調度線程並執行的,經過subscribeOn方法將上游生產事件的方法運行在指定的調度線程中。

1 com.rxjava2.android.samples I/lx:  onSubscribe: main
2 com.rxjava2.android.samples I/lx:  subscribe: RxCachedThreadScheduler-1
3 com.rxjava2.android.samples I/lx:  onNext: RxCachedThreadScheduler-1
4 com.rxjava2.android.samples I/lx:  onComplete: RxCachedThreadScheduler-1

從上面的運行結果來看,由於上游生產者已被調度到RxCachedThreadScheduler-1線程中,同時發射事件並無切換線程,因此發射後消費事件的onNext onErro onComplete也在RxCachedThreadScheduler-1線程中。

總結

  1. Schedulers.io()等價於 new IoScheduler()。
  2. new IoScheduler() Rxjava建立了線程池,爲後續建立線程作準備,同時建立並運行了一個清理線程RxCachedWorkerPoolEvictor,按期執行清理任務。
  3. subscribeOn()返回一個ObservableSubscribeOn對象,它是Observable的一個裝飾類,增長了scheduler
  4. 調用subscribe()方法,在這個方法調用後,subscribeActual()被調用,才真正執行了IoSchduler中的createWorker()建立線程並運行,最終將上游Observablesubscribe()方法調度到新建立的線程中運行。

如今瞭解了被觀察者執行線程是如何被調度到指定線程中執行的,但不少狀況下,咱們但願觀察者(事件下游)處理事件最好在UI線程執行,好比更新UI操做等。下面分析下游什麼時候調度,如何調度因爲篇幅問題。

3、Rxjava如何對觀察者線程進行調度

簡單的例子

 1 private void doSomeWork() {
 2         Observable.create(new ObservableOnSubscribe<String>() {
 3             @Override
 4             public void subscribe(ObservableEmitter<String> e) throws Exception {
 5                 Log.i("lx", " subscribe: " + Thread.currentThread().getName());
 6                 e.onNext("a");
 7                 e.onComplete();
 8             }
 9         }).subscribeOn(Schedulers.io())
10           .observeOn(AndroidSchedulers.mainThread())
11           .subscribe(new Observer<String>() {
12             @Override
13             public void onSubscribe(Disposable d) {
14                 Log.i("lx", " onSubscribe: " + Thread.currentThread().getName());
15             }
16             @Override
17             public void onNext(String str) {
18                 Log.i("lx", " onNext: " + Thread.currentThread().getName());
19             }
20             @Override
21             public void onError(Throwable e) {
22                 Log.i("lx", " onError: " + Thread.currentThread().getName());
23             }
24             @Override
25             public void onComplete() {
26                 Log.i("lx", " onComplete: " + Thread.currentThread().getName());
27             }
28         });
29     }

看看運行結果:

1 com.rxjava2.android.samples I/lx:  onSubscribe: main
2 com.rxjava2.android.samples I/lx:  subscribe: RxCachedThreadScheduler-1
3 com.rxjava2.android.samples I/lx:  onNext: main
4 com.rxjava2.android.samples I/lx:  onComplete: main

從結果能夠看出,事件的生產線程運行在RxCachedThreadScheduler-1中,而事件的消費線程則被調度到了main線程中。關鍵代碼是由於這句.observeOn(AndroidSchedulers.mainThread())。 下面咱們着重分析下這句代碼都作了哪些事情。

AndroidSchedulers.mainThread()

先來看看AndroidSchedulers.mainThread()是什麼?貼代碼

1  /** A {@link Scheduler} which executes actions on the Android main thread. */
2     public static Scheduler mainThread() {
3         return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
4     }

註釋已經說的很明白了,是一個在主線程執行任務的scheduler,接着看

 1 private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
 2             new Callable<Scheduler>() {
 3                 @Override public Scheduler call() throws Exception {
 4                     return MainHolder.DEFAULT;
 5                 }
 6             });
 7             
 8     public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) {
 9     if (scheduler == null) {
10         throw new NullPointerException("scheduler == null");
11     }
12     Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler;
13     if (f == null) {
14         return callRequireNonNull(scheduler);
15     }
16     return applyRequireNonNull(f, scheduler);
17     }

代碼很簡單,這個AndroidSchedulers.mainThread()想當於new HandlerScheduler(new Handler(Looper.getMainLooper())),原來是利用AndroidHandler來調度到main線程的。

咱們再看看HandlerScheduler,它與咱們上節分析的IOScheduler相似,都是繼承自Scheduler,因此AndroidSchedulers.mainThread()其實就是是建立了一個運行在main thread上的scheduler。
好了,咱們再回過頭來看observeOn方法。

observeOn

 1 public final Observable<T> observeOn(Scheduler scheduler) {
 2         return observeOn(scheduler, false, bufferSize());
 3     }
 4     
 5     public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
 6         ObjectHelper.requireNonNull(scheduler, "scheduler is null");
 7         ObjectHelper.verifyPositive(bufferSize, "bufferSize");
 8         return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
 9     }
10     

重點是這個new ObservableObserveOn,看名字是否是有種似成相識的感受,還記得上篇的ObservableSubscribeOn嗎? 它倆就是親兄弟,是繼承自同一個父類。

重點仍是這個方法,咱們前文已經提到了,Observable的subscribe方法最終都是調用subscribeActual方法。下面看看這個方法的實現:

 1   @Override
 2     protected void subscribeActual(Observer<? super T> observer) {
 3         // scheduler 就是前面提到的 HandlerScheduler,因此進入else分支
 4         if (scheduler instanceof TrampolineScheduler) {
 5             source.subscribe(observer);
 6         } else {
 7             // 建立 HandlerWorker
 8             Scheduler.Worker w = scheduler.createWorker();
 9             // 調用上游Observable的subscribe,將訂閱向上傳遞
10             source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
11         }
12     }

從上面代碼能夠看到使用了ObserveOnObserver類對observer進行裝飾,好了,咱們再來看看ObserveOnObserver

咱們已經知道了,事件源發射的事件,是經過observer的onNext,onError,onComplete發射到下游的。因此看看ObserveOnObserver的這三個方法是如何實現的。
因爲篇幅問題,咱們只分析onNext方法,onErroronComplete方法有興趣的同窗能夠本身分析下。

 1 @Override
 2     public void onNext(T t) {
 3         if (done) {
 4             return;
 5         }
 6         
 7         // 若是是非異步方式,將上游發射的時間加入到隊列
 8         if (sourceMode != QueueDisposable.ASYNC) {
 9             queue.offer(t);
10         }
11         schedule();
12     }
13     
14     void schedule() {
15         // 保證只有惟一任務在運行
16         if (getAndIncrement() == 0) {
17             // 調用的就是HandlerWorker的schedule方法
18             worker.schedule(this);
19         }
20     }
21     
22         @Override
23         public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
24             if (run == null) throw new NullPointerException("run == null");
25             if (unit == null) throw new NullPointerException("unit == null");
26 
27             if (disposed) {
28                 return Disposables.disposed();
29             }
30 
31             run = RxJavaPlugins.onSchedule(run);
32 
33             ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
34 
35             Message message = Message.obtain(handler, scheduled);
36             message.obj = this; // Used as token for batch disposal of this worker's runnables.
37 
38             handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
39 
40             // Re-check disposed state for removing in case we were racing a call to dispose().
41             if (disposed) {
42                 handler.removeCallbacks(scheduled);
43                 return Disposables.disposed();
44             }
45 
46             return scheduled;
47         }

schedule方法將傳入的run調度到對應的handle所在的線程來執行,這個例子裏就是有main線程來完成。 再回去看看前面傳入的run吧。

回到ObserveOnObserver中的run方法:

 1 @Override
 2     public void run() {
 3         // 此例子中代碼不會進入這個分支,至於這個drainFused是什麼,後面章節再討論。
 4         if (outputFused) {
 5             drainFused();
 6         } else {
 7             drainNormal();
 8         }
 9     }
10     
11     void drainNormal() {
12         int missed = 1;
13 
14         final SimpleQueue<T> q = queue;
15         final Observer<? super T> a = actual;
16 
17         for (;;) {
18             if (checkTerminated(done, q.isEmpty(), a)) {
19                 return;
20             }
21 
22             for (;;) {
23                 boolean d = done;
24                 T v;
25 
26                 try {
27                     // 從隊列中queue中取出事件
28                     v = q.poll();
29                 } catch (Throwable ex) {
30                     Exceptions.throwIfFatal(ex);
31                     s.dispose();
32                     q.clear();
33                     a.onError(ex);
34                     worker.dispose();
35                     return;
36                 }
37                 boolean empty = v == null;
38 
39                 if (checkTerminated(d, empty, a)) {
40                     return;
41                 }
42 
43                 if (empty) {
44                     break;
45                 }
46                 //調用下游observer的onNext將事件v發射出去
47                 a.onNext(v);
48             }
49 
50             missed = addAndGet(-missed);
51             if (missed == 0) {
52                 break;
53             }
54         }
55     }

至此咱們明白了RXjava是如何調度消費者線程了。

消費者線程調度流程歸納

Rxjava調度消費者如今的流程,以observeOn(AndroidSchedulers.mainThread())爲例。

  1. AndroidSchedulers.mainThread()先建立一個包含handlerScheduler, 這個handler是主線程的handler
  2. observeOn方法建立ObservableObserveOn,它是上游Observable的一個裝飾類,其中包含前面建立的SchedulerbufferSize等.
  3. 當訂閱方法subscribe被調用後,ObservableObserveOnsubscribeActual方法建立Scheduler.Worker並調用上游的subscribe方法,同時將自身接收的參數'observer'用裝飾類ObserveOnObserver裝飾後傳遞給上游。
  4. 當上遊調用被ObserveOnObserveronNextonErroronComplete方法時,ObserveOnObserver將上游發送的事件統統加入到隊列queue中,而後再調用scheduler將處理事件的方法調度到對應的線程中(本例會調度到main thread)。 處理事件的方法將queue中保存的事件取出來,調用下游原始的observer再發射出去。
  5. 通過以上流程,下游處理事件的消費者線程就運行在了observeOn調度後的thread中。

總結

通過前面兩節的分析,咱們已經明白了Rxjava是如何對線程進行調度的。

  • Rxjava的subscribe方法是由下游一步步向上游進行傳遞的。會調用上游的subscribe,直到調用到事件源。
    如: source.subscribe(xxx);

而上游的source每每是通過裝飾後的Observable, Rxjava就是利用ObservableSubscribeOnsubscribe方法調度到了指定線程運行,生產者線程最終會運行在被調度後的線程中。但屢次調用subscribeOn方法會怎麼樣呢? 咱們知道由於subscribe方法是由下而上傳遞的,因此事件源的生產者線程最終都只會運行在第一次執行subscribeOn所調度的線程中,換句話就是屢次調用subscribeOn方法,只有第一次有效。

  • Rxjava發射事件是由上而下發射的,上游的onNextonErroronComplete方法會調用下游傳入的observer的對應方法。每每下游傳遞的observer對象也是通過裝飾後的observer對象。Rxjava就是利用ObserveOnObserver將執行線程調度後,再調用下游對應的onNextonErroronComplete方法,這樣下游消費者就運行再了指定的線程內。 那麼屢次調用observeOn調度不一樣的線程會怎麼樣呢? 由於事件是由上而下發射的,因此每次用observeOn切換完線程後,對下游的事件消費都有效,好比下游的map操做符。最終的事件消費線程運行在最後一個observeOn切換後線程中。
  • 另外經過源碼能夠看到onSubscribe運行在subscribe的調用線程中,這個就不具體分析了。
相關文章
相關標籤/搜索