轉載請註明出處:https://zhuanlan.zhihu.com/p/22338235java
RxJava系列2(基本概念及使用介紹)github
RxJava系列3(轉換操做符)設計模式
經過前面五個篇幅的介紹,相信你們對RxJava的基本使用以及操做符應該有了必定的認識。可是知其然還要知其因此然;因此從這一章開始咱們聊聊源碼,分析RxJava的實現原理。本文咱們主要從三個方面來分析RxJava的實現:函數
RxJava基本流程分析oop
操做符原理分析
線程調度原理分析
本章節基於RxJava1.1.9版本的源碼
在RxJava系列2(基本概念及使用介紹)中咱們介紹過,一個最基本的RxJava調用是這樣的:
示例A
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello RxJava!"); subscriber.onCompleted(); } }).subscribe(new Subscriber<String>() { @Override public void onCompleted() { System.out.println("completed!"); } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println(s); } });
首先調用Observable.create()
建立一個被觀察者Observable
,同時建立一個OnSubscribe
做爲create()
方法的入參;接着建立一個觀察者Subscriber
,而後經過subseribe()
實現兩者的訂閱關係。這裏涉及到三個關鍵對象和一個核心的方法:
Observable(被觀察者)
OnSubscribe (從純設計模式的角度來理解,OnSubscribe.call()
能夠看作是觀察者模式中被觀察者用來通知觀察者的notifyObservers()
方法)
Subscriber (觀察者)
subscribe() (實現觀察者與被觀察者訂閱關係的方法)
首先咱們來看看Observable.create()
的實現:
public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(RxJavaHooks.onCreate(f)); }
這裏建立了一個被觀察者Observable
,同時將RxJavaHooks.onCreate(f)
做爲構造函數的參數,源碼以下:
protected Observable(OnSubscribe<T> f) { this.onSubscribe = f; }
咱們看到源碼中直接將參數RxJavaHooks.onCreate(f)
賦值給了當前咱們構造的被觀察者Observable
的成員變量onSubscribe
。那麼RxJavaHooks.onCreate(f)
返回的又是什麼呢?咱們接着往下看:
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) { Func1<OnSubscribe, OnSubscribe> f = onObservableCreate; if (f != null) { return f.call(onSubscribe); } return onSubscribe; }
因爲咱們並沒調用RxJavaHooks.initCreate()
,因此上面代碼中的onObservableCreate
爲null;所以RxJavaHooks.onCreate(f)
最終返回的就是f
,也就是咱們在Observable.create()
的時候new出來的OnSubscribe
。(因爲對RxJavaHooks的理解並不影響咱們對RxJava執行流程的分析,所以在這裏咱們不作進一步的探討。爲了方便理解咱們只須要知道RxJavaHooks一系列方法的返回值就是入參自己就OK了,例如這裏的RxJavaHooks.onCreate(f)
返回的就是f
)。
至此咱們作下邏輯梳理:Observable.create()
方法構造了一個被觀察者Observable
對象,同時將new出來的OnSubscribe
賦值給了該Observable
的成員變量onSubscribe
。
接着咱們看下觀察者Subscriber
的源碼,爲了增長可讀性,我去掉了源碼中的註釋和部分代碼。
public abstract class Subscriber<T> implements Observer<T>, Subscription { private final SubscriptionList subscriptions;//訂閱事件集,全部發送給當前Subscriber的事件都會保存在這裏 ... protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) { this.subscriber = subscriber; this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList(); } ... @Override public final void unsubscribe() { subscriptions.unsubscribe(); } @Override public final boolean isUnsubscribed() { return subscriptions.isUnsubscribed(); } public void onStart() { } ... }
public interface Subscription { void unsubscribe(); boolean isUnsubscribed(); }
Subscriber
實現了Subscription
接口,從而對外提供isUnsubscribed()
和unsubscribe()
方法。前者用於判斷是否已經取消訂閱;後者用於將訂閱事件列表(也就是當前觀察者的成員變量subscriptions
)中的全部Subscription
取消訂閱,而且再也不接受觀察者Observable
發送的後續事件。
前面咱們分析了觀察者和被觀察者相關的源碼,那麼接下來即是整個訂閱流程中最最關鍵的環節了。
public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); }
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) { ... subscriber.onStart(); if (!(subscriber instanceof SafeSubscriber)) { subscriber = new SafeSubscriber<T>(subscriber); } try { RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { ... return Subscriptions.unsubscribed(); } }
subscribe()
方法中將傳進來的subscriber
包裝成了SafeSubscriber
,SafeSubscriber
實際上是subscriber
的一個代理,對subscriber
的一系列方法作了更加嚴格的安全校驗。保證了onCompleted()
和onError()
只會有一個被執行且只執行一次,一旦它們其中方法被執行事後onNext()
就不在執行了。
上述代碼中最關鍵的就是RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)
。這裏的RxJavaHooks和以前提到的同樣,RxJavaHooks.onObservableStart(observable, observable.onSubscribe)
返回的正是他的第二個入參observable.onSubscribe
,也就是當前observable
的成員變量onSubscribe
。而這個成員變量咱們前面提到過,它是咱們在Observable.create()
的時候new出來的。因此這段代碼能夠簡化爲onSubscribe.call(subscriber)
。這也印證了我在RxJava系列2(基本概念及使用介紹)中說的,onSubscribe.call(subscriber)
中的subscriber
正是咱們在subscribe()
方法中new出來的觀察者。
到這裏,咱們對RxJava的執行流程作個總結:首先咱們調用crate()
建立一個觀察者,同時建立一個OnSubscribe
做爲該方法的入參;接着調用subscribe()
來訂閱咱們本身建立的觀察者Subscriber
。
一旦調用subscribe()
方法後就會觸發執行OnSubscribe.call()
。而後咱們就能夠在call方法調用觀察者subscriber
的onNext()
,onCompleted()
,onError()
。
最後我用張圖來總結下以前的分析結果:
以前咱們介紹過幾十個操做符,要一一分析它們的源碼顯然不太現實。在這裏我拋磚引玉,選取一個相對簡單且經常使用的map
操做符來分析。
咱們先來看一個map
操做符的簡單應用:
示例B
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onCompleted(); } }).map(new Func1<Integer, String>() { @Override public String call(Integer integer) { return "This is " + integer; } }).subscribe(new Subscriber<String>() { @Override public void onCompleted() { System.out.println("onCompleted!"); } @Override public void onError(Throwable e) { System.out.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } });
爲了便於表述,我將上面的代碼作了以下拆解:
Observable<Integer> observableA = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onCompleted(); } }); Subscriber<String> subscriberOne = new Subscriber<String>() { @Override public void onCompleted() { System.out.println("onCompleted!"); } @Override public void onError(Throwable e) { System.out.println(e.getMessage()); } @Override public void onNext(String s) { System.out.println(s); } }; Observable<String> observableB = observableA.map(new Func1<Integer, String>() { @Override public String call(Integer integer) { return "This is " + integer;; } }); observableB.subscribe(subscriberOne);
map()
的源碼和上一小節介紹的create()
同樣位於Observable
這個類中。
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) { return create(new OnSubscribeMap<T, R>(this, func)); }
經過查看源碼咱們發現調用map()
的時候其實是建立了一個新的被觀察者Observable
,咱們姑且稱它爲ObservableB
;一開始經過Observable.create()
建立的Observable
咱們稱之爲ObservableA
。在建立ObservableB
的時候同時建立了一個OnSubscribeMap
,而ObservableA
和變換函數Func1
則做爲構造OnSubscribeMap
的參數。
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> { final Observable<T> source;//ObservableA final Func1<? super T, ? extends R> transformer;//map操做符中的轉換函數Func1。T爲轉換前的數據類型,在上面的例子中爲Integer;R爲轉換後的數據類型,在該例中爲String。 public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) { this.source = source; this.transformer = transformer; } @Override public void call(final Subscriber<? super R> o) {//結合第一小節的分析結果,咱們知道這裏的入參o其實就是咱們本身new的觀察者subscriberOne。 MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer); o.add(parent); source.unsafeSubscribe(parent); } static final class MapSubscriber<T, R> extends Subscriber<T> { final Subscriber<? super R> actual;//這裏的actual就是咱們在調用subscribe()時建立的觀察者mSubscriber final Func1<? super T, ? extends R> mapper;//變換函數 boolean done; public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) { this.actual = actual; this.mapper = mapper; } @Override public void onNext(T t) { R result; try { result = mapper.call(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); unsubscribe(); onError(OnErrorThrowable.addValueAsLastCause(ex, t)); return; } actual.onNext(result); } @Override public void onError(Throwable e) { ... actual.onError(e); } @Override public void onCompleted() { ... actual.onCompleted(); } @Override public void setProducer(Producer p) { actual.setProducer(p); } } }
OnSubscribeMap
實現了OnSubscribe
接口,所以OnSubscribeMap
就是一個OnSubscribe
。在調用map()
的時候建立了一個新的被觀察者ObservableB
,而後咱們用ObservableB.subscribe(subscriberOne)
訂閱了觀察者subscriberOne
。結合咱們在第一小節的分析結果,因此OnSubscribeMap.call(o)
中的o
就是subscribe(subscriberOne)
中的subscriberOne
;一旦調用了ObservableB.subscribe(subscriberOne)
就會執行OnSubscribeMap.call()
。
在call()
方法中,首先經過咱們的觀察者o
和轉換函數transformer
構造了一個MapSubscriber
,最後調用了source
也就是observableA
的unsafeSubscribe()
方法。即observableA
訂閱了一個觀察者MapSubscriber
。
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) { try { ... RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { ... return Subscriptions.unsubscribed(); } }
上面這段代碼最終執行了onSubscribe
也就是OnSubscribeMap
的call()
方法,call()
方法中的參數就是以前在OnSubscribeMap.call()
中new出來的MapSubscriber
。最後在call()
方法中執行了咱們本身的業務代碼:
subscriber.onNext(1); subscriber.onCompleted();
其實也就是執行了MapSubscriber
的onNext()
和onCompleted()
。
@Override public void onNext(T t) { R result; try { result = mapper.call(t); } catch (Throwable ex) { ... return; } actual.onNext(result); }
onNext(T t)
方法中的的mapper
就是變換函數,actual
就是咱們在調用subscribe()
時建立的觀察者subscriberOne
。這個T
就是咱們例子中的Integer
,R
就是String
。在onNext()
中首先調用變換函數mapper.call()
將T
轉換成R
(在咱們的例子中就是將Integer
類型的1轉換成了String
類型的「This is 1」);接着調用subscriberOne.onNext(String result)
。一樣在調用MapSubscriber.onCompleted()
時會執行subscriberOne.onCompleted()
。這樣就完成了一直完成的調用流程。
我認可太囉嗦了,花費了這麼大的篇幅纔將map()
的轉換原理解釋清楚。我也是但願儘可能的將每一個細節都呈現出來方便你們理解,若是看我囉嗦了這麼久仍是沒能理解,請看下面我畫的這張執行流程圖。
在前面的文章中我介紹過RxJava能夠很方便的經過subscribeOn()
和observeOn()
來指定數據流的每一部分運行在哪一個線程。其中subscribeOn()
指定了處理Observable
的所有的過程(包括髮射數據和通知)的線程;observeOn()
指定了觀察者的onNext()
, onError()
和onCompleted()
執行的線程。接下來咱們就分析分析源碼,看看線程調度是如何實現的。
在分析源碼前咱們先看看一段常見的經過RxJava實現的線程調度代碼:
示例C
Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello RxJava!"); subscriber.onCompleted(); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<String>() { @Override public void onCompleted() { System.out.println("completed!"); } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { System.out.println(s); } });
public final Observable<T> subscribeOn(Scheduler scheduler) { ... return create(new OperatorSubscribeOn<T>(this, scheduler)); }
經過上面的代碼咱們能夠看到,subscribeOn()
和map()
同樣是建立了一個新的被觀察者Observable
。所以我大體就能猜到subscribeOn()
的執行流程應該和map()
差很少,OperatorSubscribeOn
確定也是一個OnSubscribe
。那咱們接下來就看看OperatorSubscribeOn
的源碼:
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> { final Scheduler scheduler;//線程調度器,用來指定訂閱事件發送、處理等所在的線程 final Observable<T> source; public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) { this.scheduler = scheduler; this.source = source; } @Override public void call(final Subscriber<? super T> subscriber) { final Worker inner = scheduler.createWorker(); subscriber.add(inner); inner.schedule(new Action0() { @Override public void call() { final Thread t = Thread.currentThread(); Subscriber<T> s = new Subscriber<T>(subscriber) { @Override public void onNext(T t) { subscriber.onNext(t); } @Override public void onError(Throwable e) { try { subscriber.onError(e); } finally { inner.unsubscribe(); } } @Override public void onCompleted() { try { subscriber.onCompleted(); } finally { inner.unsubscribe(); } } @Override public void setProducer(final Producer p) { subscriber.setProducer(new Producer() { @Override public void request(final long n) { if (t == Thread.currentThread()) { p.request(n); } else { inner.schedule(new Action0() { @Override public void call() { p.request(n); } }); } } }); } }; source.unsafeSubscribe(s); } }); } }
OperatorSubscribeOn
實現了OnSubscribe
接口,call()
中對Subscriber
的處理也和OperatorMap
對Subscriber
的處理相似。首先經過scheduler
構建了一個Worker
;而後用傳進來的subscriber
構造了一個新的Subscriber s
,並將s
丟到Worker.schedule()
中來處理;最後用原Observable
去訂閱觀察者s
。而這個Worker
就是線程調度的關鍵!前面的例子中咱們經過subscribeOn(Schedulers.io())
指定了Observable
發射處理事件以及通知觀察者的一系列操做的執行線程,正是經過這個Schedulers.io()
建立了咱們前面提到的Worker
。因此咱們來看看Schedulers.io()
的實現。
首先經過Schedulers.io()
得到了ioScheduler
並返回,上面的OperatorSubscribeOn
經過這個的Scheduler
的createWorker()
方法建立了咱們前面提到的Worker
。
public static Scheduler io() { return RxJavaHooks.onIOScheduler(getInstance().ioScheduler); }
接着咱們看看這個ioScheduler
是怎麼來的,下面的代碼向咱們展示了是如何在Schedulers
的構造函數中經過RxJavaSchedulersHook.createIoScheduler()
來初始化ioScheduler
的。
private Schedulers() { ... Scheduler io = hook.getIOScheduler(); if (io != null) { ioScheduler = io; } else { ioScheduler = RxJavaSchedulersHook.createIoScheduler(); } ... }
最終RxJavaSchedulersHook.createIoScheduler()
返回了一個CachedThreadScheduler
,並賦值給了ioScheduler
。
public static Scheduler createIoScheduler() { return createIoScheduler(new RxThreadFactory("RxIoScheduler-")); }
public static Scheduler createIoScheduler(ThreadFactory threadFactory) { ... return new CachedThreadScheduler(threadFactory); }
到這一步既然咱們知道了ioScheduler
就是一個CachedThreadScheduler
,那咱們就來看看它的createWorker()
的實現。
public Worker createWorker() { return new EventLoopWorker(pool.get()); }
上面的代碼向咱們赤裸裸的呈現了前面OperatorSubscribeOn
中的Worker
其實就是EventLoopWorker
。咱們重點要關注的是他的scheduleActual()
。
static final class EventLoopWorker extends Scheduler.Worker implements Action0 { private final CompositeSubscription innerSubscription = new CompositeSubscription(); private final CachedWorkerPool pool; private final ThreadWorker threadWorker; final AtomicBoolean once; EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.once = new AtomicBoolean(); this.threadWorker = pool.get(); } ... @Override public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) { ... ScheduledAction s = threadWorker.scheduleActual(new Action0() { @Override public void call() { if (isUnsubscribed()) { return; } action.call(); } }, delayTime, unit); innerSubscription.add(s); s.addParent(innerSubscription); return s; } }
經過對源碼的一步步追蹤,咱們知道了前面OperatorSubscribeOn.call()
中的inner.schedule()
最終會執行到ThreadWorker
的scheduleActual()
方法。
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) { Action0 decoratedAction = RxJavaHooks.onScheduledAction(action); ScheduledAction run = new ScheduledAction(decoratedAction); Future<?> f; if (delayTime <= 0) { f = executor.submit(run); } else { f = executor.schedule(run, delayTime, unit); } run.add(f); return run; }
scheduleActual()
中的ScheduledAction
實現了Runnable
接口,經過線程池executor
最終實現了線程切換。上面即是subscribeOn(Schedulers.io())
實現線程切換的所有過程。
observeOn()
切換線程是經過lift
來實現的,相比subscribeOn()
在實現原理上相對複雜些。不過本質上最終仍是建立了一個新的Observable
。
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ... return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize)); } public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) { return create(new OnSubscribeLift<T, R>(onSubscribe, operator)); }
OperatorObserveOn
做爲OnSubscribeLift
構造函數的參數用來建立了一個新的OnSubscribeLift
對象,接下來咱們看看OnSubscribeLift
的實現:
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> { final OnSubscribe<T> parent; final Operator<? extends R, ? super T> operator; public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) { this.parent = parent; this.operator = operator; } @Override public void call(Subscriber<? super R> o) { try { Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o); try { st.onStart(); parent.call(st); } catch (Throwable e) { Exceptions.throwIfFatal(e); st.onError(e); } } catch (Throwable e) { Exceptions.throwIfFatal(e); o.onError(e); } } }
OnSubscribeLift
繼承自OnSubscribe
,經過前面的分析咱們知道一旦調用了subscribe()
將觀察者與被觀察綁定後就會觸發被觀察者所對應的OnSubscribe
的call()
方法,因此這裏會觸發OnSubscribeLift.call()
。在call()
中調用了OperatorObserveOn.call()
並返回了一個新的觀察者Subscriber st
,接着調用了前一級Observable
對應OnSubscriber.call(st)
。
咱們再看看OperatorObserveOn.call()
的實現:
public Subscriber<? super T> call(Subscriber<? super T> child) { ... ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize); parent.init(); return parent; }
OperatorObserveOn.call()
中建立了一個ObserveOnSubscriber
並調用init()
進行了初始化。
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 { ... @Override public void onNext(final T t) { ... schedule(); } @Override public void onCompleted() { ... schedule(); } @Override public void onError(final Throwable e) { ... schedule(); } protected void schedule() { if (counter.getAndIncrement() == 0) { recursiveScheduler.schedule(this); } } @Override public void call() { long missed = 1L; long currentEmission = emitted; final Queue<Object> q = this.queue; final Subscriber<? super T> localChild = this.child; final NotificationLite<T> localOn = this.on; for (;;) { long requestAmount = requested.get(); while (requestAmount != currentEmission) { boolean done = finished; Object v = q.poll(); boolean empty = v == null; if (checkTerminated(done, empty, localChild, q)) { return; } if (empty) { break; } localChild.onNext(localOn.getValue(v)); currentEmission++; if (currentEmission == limit) { requestAmount = BackpressureUtils.produced(requested, currentEmission); request(currentEmission); currentEmission = 0L; } } if (requestAmount == currentEmission) { if (checkTerminated(finished, q.isEmpty(), localChild, q)) { return; } } emitted = currentEmission; missed = counter.addAndGet(-missed); if (missed == 0L) { break; } } } ... }
ObserveOnSubscriber
繼承自Subscriber
,並實現了Action0
接口。咱們看到ObserveOnSubscriber
的onNext()
、onCompleted()
、onError()
都有個schedule()
,這個方法就是咱們線程調度的關鍵;經過schedule()
將新觀察者ObserveOnSubscriber
發送給subscriberOne
的全部事件都切換到了recursiveScheduler
所對應的線程,簡單的說就是把subscriberOne
的onNext()
、onCompleted()
、onError()
方法丟到了recursiveScheduler
對應的線程中來執行。
那麼schedule()
又是如何作到這一點的呢?他內部調用了recursiveScheduler.schedule(this)
,recursiveScheduler
其實就是一個Worker
,和咱們在介紹subscribeOn()
時提到的worker
同樣,執行schedule()
實際上最終是建立了一個runable
,而後把這個runnable
丟到了特定的線程池中去執行。在runnable
的run()
方法中調用了ObserveOnSubscriber.call()
,看上面的代碼你們就會發如今call()
方法中最終調用了subscriberOne
的onNext()
、onCompleted()
、onError()
方法。這即是它實現線程切換的原理。
好了,咱們最後再看看示例C對應的執行流程圖,幫助你們加深理解。
這一章以執行流程、操做符實現以及線程調度三個方面爲切入點剖析了RxJava源碼。下一章將站在更宏觀的角度來分析整個RxJava的框架結構、設計思想等等。敬請期待~~ :)
若是你們喜歡這一系列的文章,歡迎關注個人知乎專欄和GitHub。