RxJava的鼎鼎大名相信Android開發的同窗都很是熟悉了,其實不只僅有RxJava,還有RxJs,RxKotlin等等一系列。能夠說Rx並非一種侷限於Android的框架,Rx是一種思想,咱們深刻了解了RxJava,一樣會加深咱們對其餘Rx系列的認知。java
咱們來看一個常見的例子:併發
Observable.create(ObservableOnSubscribe<Int> { e -> e.onNext(1) e.onComplete() }).map { Log.d("map-thread : ", Thread.currentThread().name) Log.d("--", "------------------------------") "result : $it" }.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(object : Observer<String> { override fun onSubscribe(d: Disposable) { Log.d("onSubscribe-thread : ", Thread.currentThread().name) Log.d("--", "------------------------------") } override fun onNext(s: String) { Log.d("onNext-thread : ", Thread.currentThread().name) Log.d("onNext-result", s) Log.d("--", "------------------------------") } override fun onError(e: Throwable) { Log.d("onError-thread : ", Thread.currentThread().name) Log.d("onError-message : ", e.message) Log.d("--", "------------------------------") Log.d("--", "------------------------------") Log.d("--", "------------------------------") } override fun onComplete() { Log.d("onComplete-thread : ", Thread.currentThread().name) Log.d("--", "------------------------------") Log.d("--", "------------------------------") Log.d("--", "------------------------------") } })
這是一個使用Kotlin寫的例子,對Kotlin不熟悉的同窗無需關注代碼細節,大體能看懂什麼意思就行。首先發送一個數字1,而後經過 map 操做符把數字1變成 result : 1 ,將以前的操做切換到IO線程,將以後的操做切換到主線程。app
整個RxJava的使用能夠分爲三個部分:框架
接下來我會對這三個部分作詳細的說明。異步
建立發送數據的原始Observable採用的是 Observable.create() ,固然使用 Observable.just(1) 、 Observable.from(list) 等等這些方法也能夠建立發送數據的Observable,其實這些操做符的本質都是建立了一個新的Observable,在訂閱的時候發送了一些數據。咱們着重看一下 Observable.create() :ide
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { //確保source非空 ObjectHelper.requireNonNull(source, "source is null"); //返回一個ObservableCreate對象 return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
第一句很好理解,確保傳入的 ObservableOnSubscribe 對象非空,第二句本質是直接返回了 new ObservableCreate<T>(source)
執行的結果,咱們之後在看到 RxJavaPlugins.onAssembly(obj)
相似的代碼時,直接能夠理解爲返回了一個 obj 。因此這個方法實際上作了一件事:建立了一個 ObservableCreate 類型的對象並返回。函數
這裏咱們先了解一下在訂閱時,也就是調用 subscribeWith(observer) 時具體是作了什麼:源碼分析
public final <E extends Observer<? super T>> E subscribeWith(E observer) { subscribe(observer); return observer; } public final void subscribe(Observer<? super T> observer) { //確保observer非空 ObjectHelper.requireNonNull(observer, "observer is null"); try { //這裏通常都是直接返回傳入的observer observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); //不一樣類型的Observable的具體訂閱的方法,全部的數據發送操做都在這個方法中 subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
先看第一個方法 subscribeWith(E observer) ,在這個方法內部調用了 subscribe(observer) 這個方法。 subscribe(observer) 這個方法的核心是 subscribeActual(observer)
這一句。subscribeActual(observer) 這個方法是Observable內的一個抽象方法,Observable通過create操做符後會變成一個 ObservableCreate,通過map操做符會變成一個 ObservableMap 其餘的操做符都是一個套路。通過操做符轉化而來的Observable都是Observable的子類,在這些子類的內部都會實現 subscribeActual(observer) 這個抽象方法,並在這個方法內作發送事件的相關操做。post
通過上面的分析,咱們知道了 subscribeActual(observer) 這個方法是不一樣Observable的關鍵,有了這個前置知識,咱們接着上面的進度看一下 ObservableCreate 這個Observable子類中的 subscribeActual(observer) 方法:ui
@Override protected void subscribeActual(Observer<? super T> observer) { //對observer的包裝 CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { //這是關鍵 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
關鍵是 source.subscribe(parent)
這一句。這個source就是 ObservableOnSubscribe 的實例,是在 Observable.create(source) 中傳入的參數。 ObservableOnSubscribe 是一個接口,在這個接口的 subscribe 方法中咱們定義了數據的發送方式。這個 ObservableOnSubscribe 能夠理解爲一個 數據發送的劇本 ,這個劇本具體的實現細節寫在了 subscribe方法內。在最上面的例子中,咱們經過 e.onNext(1);e.onComplete()
發送了一個數字1,緊接着通知事件已經發送完畢。
因爲RxJava的事件操做符太多,咱們這裏只講map操做符的源碼分析:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { //確保mapper非空 ObjectHelper.requireNonNull(mapper, "mapper is null"); //建立並返回一個ObservableMap對象 return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); }
是否是很熟悉,map操做符和create操做符是一個套路,事實上全部的操做符都是一個套路: 建立並返回了一個對原有Observable的包裝類 ,那咱們來看一下這個 ObservableMap :
@Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } //對原始observer的包裝類 static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { U v; try { //在這裏將T類型的原始數據t變換爲類型U的新數據v v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } //調用原始observer的onNext actual.onNext(v); } //忽略無關代碼...... }
ObservableMap 內部有有兩個值得咱們注意的點:一個是咱們上面提過的 subscribeActual 方法;另外一個是它的內部類 MapObserver。
subscribeActual 中依然是 source.subscribe(new MapObserver<T, U>(t, function))
這句熟悉的代碼,值得注意的是這句代碼並無傳入原始的observer,而是傳入了 MapObserver 對象。簡而言之, MapObserver 是一個包裝類,它的內部包含咱們原始的observer: actual 和咱們變換的具體操做: mapper 。新的 MapObserver 包裝類在它的 onNext 方法中作了兩步操做:
其實在不瞭解RxJava的源碼以前,咱們經常會驚歎於RxJava的神奇:明明我發送的是數字1,我要接收的倒是字符串類型的數據,這中間僅僅是經過了一個map操做符,簡直太神奇了!在咱們瞭解了map操做符的源碼以後,就會知道:咱們在訂閱的時候map操做符將原始的observer包裝成了一個 MapObserver 對象,在這個 MapObserver 內部的 onNext 方法中首先會將原始數據 1 經過咱們傳入的變換規則變換爲 result : 1,在變換以後纔會調用咱們編寫的原始observer來處理新的 String 類型的數據。
在分析線程切換以前,咱們先明確一些前置知識:從主線程切換到子線程的操做很簡單,新開一條線程,在新的線程執行便可。從子線程切換到主線程,在Android開發中,全部的第三方框架使用的都是Handler機制。因此不要把這些第三方框架想的特別難,他們的本質都是Android中的基礎知識。
咱們以 subscribeOn(Schedulers.io())
這個例子來分析,首先傳入的是 Schedulers.io() ,直接說結論吧,它就是一個 IoScheduler 類型的單例對象。繼續看代碼:
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
咱們都很熟悉了,本質是建立並返回了一個 ObservableSubscribeOn 類型的對象。咱們繼續看 ObservableSubscribeOn 的內部:
public void subscribeActual(final Observer<? super T> s) { //建立一個對原始observer的包裝對象 final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); //這裏是關鍵 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable { private static final long serialVersionUID = 8094547886072529208L; final Observer<? super T> actual; final AtomicReference<Disposable> s; SubscribeOnObserver(Observer<? super T> actual) { this.actual = actual; this.s = new AtomicReference<Disposable>(); } @Override public void onNext(T t) { actual.onNext(t); } @Override public void onError(Throwable t) { actual.onError(t); } @Override public void onComplete() { actual.onComplete(); } //忽略無關代碼...... } final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { //這一句咱們很是熟悉了,在這裏實現了事件的訂閱 source.subscribe(parent); } }
這裏跟 ObservableMap 中的 subscribeActual 也是一個套路:
經過看 SubscribeOnObserver 這個包裝類中的 onNext 方法也能夠明白:它的內部直接調用的就是 actual.onNext(t) ,沒有像map同樣作數據的變換,這很好理解,由於 subscribeOn(schedule) 自己就只是爲了切換線程,並不作其餘多餘的操做,因此這個包裝類中的 onNext 纔會直接調用下一級observer的onNext。
可能有的小夥伴要有問題了:你說經過 source.subscribe(parent) 實現事件的訂閱,但 subscribeActual 中並無你說的代碼啊?其實在這裏 scheduler.scheduleDirect(new SubscribeTask(parent))
這裏傳入了一個 SubscribeTask 對象,這個對象實際上是個 Runnable ,在它的 run() 方法中調用了 source.subscribe(parent)
。到目前爲止,咱們惟一陌生的就是 scheduleDirect 這個方法了,這個方法定義在 Scheduler 這個線程調度類中,它實際調用的是:
@NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { //createWorker是一個抽象方法,不一樣的線程調度器有不一樣的實現 final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); //事件調度處理 w.schedule(task, delay, unit); return task; }
scheduleDirect 這個方法作了兩件事:建立一個 Worker 工做類,調用工做類的 schedule 來進行事件調度。不一樣的線程調度器會有不一樣的處理,對於 IoSchedule 這個調度器來講,它最終是執行這個方法:
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { //忽略無關代碼...... Future<?> f; try { if (delayTime <= 0) { //經過線程池來開啓一個子線程執行 f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; }
經過上面的分析咱們能夠知道, subscribeOn(Schedulers.io()) 本質上是經過線程池開啓了一個線程,在這個新的線程中仍是經過調用 source.subscribe(observer) 來訂閱事件。
subscribeOn(Schedulers.io()) 分析過了,咱們再來看一下 subscribeOn(AndroidSchedulers.mainThread()) 這個線程切換。
AndroidSchedulers.mainThread() 返回的是一個 HandlerScheduler 類型的線程調度器,它的 scheduleDirect 方法定義以下:
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); //本質是經過handler機制實現線程切換 handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay))); return scheduled; }
從它的方法內部能夠看到,它是經過 handler.postDelayed()
來實現切換到主線程的功能的,這個handler是定義在主線程的handler。其餘類型的線程調度器就再也不分析了,本質都是大同小異。
接下來咱們來看一下 observeOn(AndroidSchedulers.mainThread()) ,額,其實它的套路也是同樣的,建立並返回了一個 ObservableObserveOn ,咱們主要關注它的 subscribeActual 方法:
protected void subscribeActual(Observer<? super T> observer) { //忽略無關代碼...... Scheduler.Worker w = scheduler.createWorker(); //訂閱包裝類ObserveOnObserver source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { //下一級觀察者 final Observer<? super T> actual; //對應Scheduler裏的Worker final Scheduler.Worker worker; //上一級Observable發送的數據隊列 SimpleQueue<T> queue; Disposable s; //若是onError了,保存對應的異常 Throwable error; //是否完成 volatile boolean done; //是否取消 volatile boolean cancelled; // 表明同步發送 異步發送 int sourceMode; .... @Override public void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; //忽略無關代碼...... queue = new SpscLinkedArrayQueue<T>(bufferSize); actual.onSubscribe(this); } } @Override public void onNext(T t) { //執行過error / complete ,直接返回 if (done) { return; } //若是數據源類型不是異步的, 默認不是 if (sourceMode != QueueDisposable.ASYNC) { //將上一級Observable發送的數據加入隊列中 queue.offer(t); } //這句代碼是線程調度的核心,進入相應Worker線程,在相應線程發送數據隊列中的數據 schedule(); } @Override public void onError(Throwable t) { if (done) { RxJavaPlugins.onError(t); return; } error = t; done = true; //這句代碼是線程調度的核心,進入相應Worker線程,在相應線程發送數據隊列中的數據 schedule(); } @Override public void onComplete() { if (done) { return; } done = true; //開始調度 schedule(); } void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } } @Override public void run() { //默認是false if (outputFused) { drainFused(); } else { //取出隊列中的數據併發送 drainNormal(); } } void drainNormal() { int missed = 1; final SimpleQueue<T> q = queue; final Observer<? super T> a = actual; for (;;) { //若是已經結束或隊列爲空,跳出循環 if (checkTerminated(done, q.isEmpty(), a)) { return; } for (;;) { boolean d = done; T v; try { //從隊列裏取出一個值 v = q.poll(); } catch (Throwable ex) { //異常處理並跳出函數 Exceptions.throwIfFatal(ex); s.dispose(); q.clear(); a.onError(ex); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { return; } if (empty) { break; } //調用下一級的observer的onNext方法發送事件 a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) { break; } } } boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) { //若是已經disposed if (cancelled) { queue.clear(); return true; } // 若是已經結束 if (d) { Throwable e = error; //若是是延遲發送錯誤 if (delayError) { //若是空 if (empty) { if (e != null) { a.onError(e); } else { a.onComplete(); } //中止worker(線程) worker.dispose(); return true; } } else { //發送錯誤 if (e != null) { queue.clear(); a.onError(e); worker.dispose(); return true; } else //發送complete if (empty) { a.onComplete(); worker.dispose(); return true; } } } return false; } }
能夠看到 subscribeOn 與 observeOn 不一樣的是兩者線程切換的地方。 subscribeOn 是將 source.subscribe(observer) 整個部分切換到了相應的線程,而 observeOn 則是在 ObserveOnObserver 這個包裝類中的 onNext 、 onError 方法中作了線程的切換,具體是在 onNext 等方法中建立了相關的 Worker 工做類,並調用了這個類的 schedule 方法,這個跟咱們講 subscribeOn 是同樣的。
到此爲止,RxJava2的源碼就基本分析完了。最後根據最初的例子,用一幅圖來展現RxJava的流程: