Rxjava是NetFlix出品的Java框架, 官方描述爲 a library for composing asynchronous and event-based programs using observable sequences for the Java VM,翻譯過來就是「使用可觀察序列組成的一個異步地、基於事件的響應式編程框架」。一個典型的使用示範以下:java
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { String s = "1234"; //執行耗時任務 emitter.onNext(s); } }).map(new Function<String, Integer>() { @Override public Integer apply(String s) throws Exception { return Integer.parseInt(s); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe();
本文要講的主要內容是Rxjava的核心思路,利用一張圖並結合源碼分析Rxjava的實現原理,至於使用以及其比較深刻的內容,好比不經常使用的操做符,背壓等,讀者能夠自行學習。另外提一句,本文采用的Rxjava版本是2.2.3,Rxjava最新版本是3.x.x,感興趣的能夠自行閱讀,但相信其最核心的原理是不會變化的。另外,本文篇幅較長,最好的效果是邊看本文邊到源碼中體會,若是讀者沒有耐心讀完,能夠只看圖片和頭尾。編程
先放出本文最重要的圖:api
Rxjava的核心思路被總結在了圖中,本文分爲兩部分,第一部分講圖中的三條流和事件傳遞,第二部分講線程切換的原理,下面進入正題。網絡
在講以前,先提一點,在Rxjava中,有Observable和Observer這兩個核心的概念,可是它們在發生訂閱時,跟普通的觀察者模式寫法不太同樣,由於常識來說,應該是觀察者去訂閱(subscribe)被觀察者,可是Rxjava爲了其基於事件的流式編程,只能反着來,observable去訂閱observer,因此在rxjava中,subscribe能夠理解「注入」觀察者。app
首先咱們看上面的圖片,先簡單解釋一下:圖中方形的框表明的是Observable,由於它表明節點,因此用Ni表示,圓形框表明的是觀察者Observer,用Oi標識,後面加括號的意思是Oi持有其下游Observer的引用,左側表明上游,右側表明下游。圖片裏有三條有方向的彩色粗線,表明三個不一樣的流,這三個流是咱們爲了分析問題而抽象出來的的,表明從構建到訂閱整個事件的流向,按照時間順序從上到下依次流過,它們的含義分別是:框架
咱們依次分析這三條流:異步
在使用Rxjava時,其流式構建流程是很大的特點,避免了傳統回調的繁瑣。怎麼實現的呢?使用過Rxjava的讀者應該都知道,Rxjava的每一步構建過程api都是相同的,這是由於每一步的函數返回結果都是一個Observable,Observable提供了Rxjava全部的功能。那麼Obsevable在Rxjava中到底扮演一個什麼角色呢?事實上,其官方定義就已經告訴咱們答案了,前言裏官方定義中有這樣一段:「using Observable sequences」,因此說,Obsevable就是構建流的組件,咱們能夠當作一個個節點,這些節點串起來組成整個鏈路。Observable這個類實現了一個接口:ObservableSource,這個接口只有一個方法:subscribe(observer),也就是說,全部的Obsevable節點都具備訂閱這個功能,這個功能很重要,是訂閱流的關鍵,待會會講。總結一下:async
在咱們編寫Rxjava代碼時,每一步操做都會生成一個新的Observable節點(沒錯,包括ObserveOn和SubscribeOn線程變換操做),並將新生成的Observable返回,直到最後一步執行subscribe方法ide
不管是構建的第一步 create方法,仍是observeOn,subscribeOn變換線程方法,仍是各類操做符好比map,flatMap等,都會生成對應的Observable,每一個Observble中要實現一個最重要的方法就是subscribe,咱們看其實現:函數
public final void subscribe(Observer<? super T> observer) { try { observer = RxJavaPlugins.onSubscribe(this, observer); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { RxJavaPlugins.onError(e); throw npe; } }
這裏提一點,你們看源碼時遇到RxJavaPlugins時直接略過看裏面的代碼就行了,它是hook用的,不影響主要流程。因此上面代碼其實只有一行有用:
subscribeActual(observer);
也就是說,每一個節點在執行subscribe時,其實就是在調用該節點的subscribeActual方法,這個方法是抽象的,每一個節點的實現都不同。咱們舉個栗子,拿ObseverOn這個操做生成的ObservableSubscribeOn瞧瞧:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); observer.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } //xxx省略 }
其中其父類繼承Observable,因此它是一個Observble。
整個過程有點像builder模式,不一樣之處是它是生成了新的節點,而builder模式返回的自身。若是你讀過okHttp的源碼,okHttp中攔截器跟這裏有些類似,okHttp中會構建多個Chain節點,而後用相應的Intercepter去處理Chain。
咱們理解了編寫Rxjava代碼的過程其實就是構建一個一個Observable節點的過程,接下來咱們看第二條流。
構建過程只是經過構造函數將一些配置傳給了各個節點,實際尚未執行任何代碼,只有最後一步才真正的執行訂閱行爲。當最後一個節點調用subscribe方法時,是構建流向訂閱流變化的轉折點,咱們以圖中爲例:最後一個節點是N5,N5節點是最後一個flatmap操做符方法產生的,也就是說,最後是調用這個節點的subscribe方法,這個方法最終也是會調用到subscribeActual方法中去,咱們看其源碼:
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends ObservableSource<? extends U>> mapper; final boolean delayErrors; final int maxConcurrency; final int bufferSize; public ObservableFlatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper, boolean delayErrors, int maxConcurrency, int bufferSize) { super(source); this.mapper = mapper; this.delayErrors = delayErrors; this.maxConcurrency = maxConcurrency; this.bufferSize = bufferSize; } @Override public void subscribeActual(Observer<? super U> t) { if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) { return; } source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize)); } static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> { final Observer<? super U> downstream; final Function<? super T, ? extends ObservableSource<? extends U>> mapper; }
剛纔咱們分析了,N5節點是Observable節點,其subscribe方法最後調用的是subscribeActual方法,咱們看上面代碼中它的這個方法:前面的判斷語句跳過,第二行:
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
這行代碼須要注意兩點:
到這裏,咱們分析了最後一個節點執行subscribe方法的過程,事實上,每一個節點的執行流程都是相似的(subscribeOn節點有些特殊,等會線程調度會將),也就是說,N5會調用N4的subscribe方法,而在N4的subscribe方法中,又去調用了N3的subscribe....一直到N0會調用source的subscribe方法。總結下來就是:
從最後一個N5節點的訂閱行爲開始,依次執行前面各個節點真正的訂閱方法。在每一個節點的訂閱方法中,都會生成一個新的Observer,這個Observer會包含「下游」的Observer,這樣當每一個節點都執行完訂閱(subscribeActual)後,也就生成了一串Observer,它們經過downstream,upstream引用鏈接。
以上就是訂閱流的發生過程,簡單講就是下游節點調用上游節點的subscribeActual方法,從而造成了一個調用鏈。
當訂閱流執行到最後,也就是第一個節點N0時,咱們看發生了什麼,首先看看N0節點怎麼創建的:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
生成了ObservableCreate實例,咱們看這個類(簡化):
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); source.subscribe(parent); } }
因此訂閱流的最終會掉到上面的subscrbeActual方法,它其實仍是和其餘節點同樣,最主要的仍是執行了
source.subscribe(parent)
這行代碼,那麼這個節點的source是什麼呢?它就是咱們事件的源頭啊!
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { String s = "1234"; //執行耗時任務 emitter.onNext(s); } })
上面代碼直接拿的開頭的例子,這個source是一個ObservableOnSubscribe,看它的subscribe方法裏,這裏很重要,這個函數裏面實際上是訂閱流和觀察者流的轉折點,也就是流在這兒「轉向了」。這裏,這個事件源沒有像節點那樣,調用上一個節點的訂閱方法,而是調用了其參數的emitter的onNext方法,這個emitter對應N0節點的什麼呢?看代碼知道,時CreateEmitter這個類,咱們看這個類裏面
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (!isDisposed()) { observer.onNext(t); } } //省略 }
看它的onNext方法,執行的是
observer.onNext(t)
observer是誰?構造函數傳進來的,也就是N0節點subscribeActual方法中的observer,這個observer是誰呢?仔細回想一下,前面訂閱流的時候不就是一次訂閱上一個節點生成的Observer嗎,因此這個observer就是前一個節點N1生成的Observer,咱們看N1節點,是一個Map,對應的Observable節點裏的Observer源碼以下:
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { downstream.onNext(null); return; } U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } downstream.onNext(v); //省略後續
名爲MapObserver,看它的onNext方法,忽略前面兩個判斷語句,核心就兩句,一個是mapper.apply(t),另外一個就是downstream.onNext(v)。也就是說,這個mapObserver幹了兩件事,一個是把上個節點返回的數據進行一次map變換,另外一個就是將map後的結果傳遞給下游,下游是什麼呢?看了訂閱流的讀者天然知道,就是N2節點的Observer,對應圖中O4,依次類推,咱們知道了,事件發生之後,經過各個節點的Observer事件源被層層處理並傳遞給下游,一直到最後一個觀察者執行完畢,整個事件處理完成。
至此,咱們三個流分析完畢,接下來,咱們開始分析線程調度是怎麼實現的。
觀察仔細的讀者可能已經看到了,圖中N2節點左側的全部節點和右側的節點顏色不一樣,我爲何要這樣畫呢?其實裏面的玄機就是線程調度,接下來咱們分別看subscribeOn和observeOn的線程切換玄機吧。
在訂閱流發生的的時候,大多數節點都是直接調用上一個節點的subscribe方法,實現雖有差異,但大同小異。惟一有個最大的不一樣就是subscribeOn這個節點,咱們看源碼:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); observer.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
普通的節點執行時,大多隻是簡單的執行source.subscribe(observer),可是這個不同。先看第二行,它調用了觀察者的onSubscribe方法,熟悉Rxjava的人知道,咱們在自定義Observer的時候,裏面有這個回調,其發生時機就在此刻。咱們接着看最後一行,忽略parent.setDisposable這個邏輯,咱們直接看參數裏面的東西。
scheduler.scheduleDirect(new SubscribeTask(parent))
看看幹了什麼:
@NonNull public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); }
繼續:
@NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; }
建立了一個worker,一個runnable,而後將兩者封裝到一個DisposeTask中,最後用worker執行這個task,那麼這個worker是什麼呢?
@NonNull public abstract Worker createWorker();
createworker是一個抽象方法,因此須要去找Scheduler的子類,咱們回想一下rxjava的使用,若是在子線程中執行,咱們通常設置調度器爲Schedulers.io(),咱們看這個子類的實現:
在IOSchedluer類中:
@Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); }
繼續:
@NonNull public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent != null) { if (!parent.add(sr)) { return sr; } } Future<?> f; try { if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; }
這裏的executor就是一個ExecutorService,熟悉線程池的讀者應該知道,這裏的submit方法,就是將callable丟到線程池中去執行任務了。
咱們回到主線
scheduler.scheduleDirect(new SubscribeTask(parent))
對於io線程的調度器來講,上面的代碼就是將new SubscribeTask(parent)丟到線程池中執行,咱們看參數裏面的SubscribeTask:
final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); } }
看run方法:source.subscribe(parent),這裏的parent跟普通節點同樣,仍然是本節點生成的新的Observer,對於本節點來講,是一個SubscribeOnObserver。所以,咱們就知道了,對於subscribeOn這個節點,它跟普通的節點不一樣之處在於:
SubscribeOn節點在訂閱的時候,將它的上游節點的訂閱行爲,以runnable的形式扔給了一個線程池(對於IO調度器來講),也就是說,當訂閱流流到SubscribeOn節點時,線程發生了切換,以後流向的節點都在切換後的線程中執行。
分析到這裏,咱們就知道了subscribeOn的線程切換原理了,原來是在訂閱流中塞了一個線程變化操做。咱們再看圖中的顏色問題,爲何這個節點上游的節點都是紅色的呢?由於當訂閱流流過這個節點後,後面的節點只是單純的傳遞給上游節點而已,不管是普通的操做符,仍是ObserveOn節點,都是簡單的傳遞給上游,沒有作線程切換(注意,ObserveOn是在觀察者流中作的線程切換,待會會講)。
咱們再思考一個問題,若是上游還有別的subscribeOn,會發生什麼?
咱們假設N1節點的map修改程subscribeOn(AndroidScheduler.Main),也就是說,切換到主線程。咱們仍是從N2節點開始分析,剛纔說到最後會執行到SubscribeTask裏的Run方法,注意此時source.subscribe(parent)發生在子線程中,接下來,回調用N1節點的subscribe,N1節點回調用scheduler.scheduleDirect(new SubscribeTask(parent)),方法,此時,由於線程調度器是主線程的,咱們看它的代碼:
private static final class MainHolder { static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()), false); }
看看這個HandlerScheduler的方法:
@Override public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); handler.postDelayed(scheduled, unit.toMillis(delay)); return scheduled; }
熟悉Android Handler機制的讀者應該很清楚,這裏會把N1節點上游的操做,經過Handler機制,扔給主線程操做,雖然這一步是在N2節點的子線程中執行的,可是它以前的事件仍然會在主線程中執行。所以咱們有如下結論:
subscribeOn節點影響它前面的節點的線程,若是前面還有多個subscribeOn節點,最終只有第一個,也就是最上游的那個節點生效
接下來咱們分析observeOn
前面的subscribeOn線程切換是在訂閱流中發生的,接下來的ObserveOn比較簡單,它發生在第三條流-觀察者回調流中,咱們看ObserveOn節點的源碼:
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { //簡化 @Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } }
在前面的觀察者流分析時,咱們知道,觀察者流是經過onNext()方法傳遞的,咱們看最後一行,schedule(),線程切換,因此這個ObserveOn節點其實沒幹啥事,就是切換線程了,並且是在onNext回調中切換的。咱們進到這個方法:
void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } }
worker是這個節點訂閱時指定的 scheduler.createWorker(), 以主線程觀察爲例:
public Disposable schedule(Runnable run, long delay, TimeUnit unit) { run = RxJavaPlugins.onSchedule(run); ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); Message message = Message.obtain(handler, scheduled); message.obj = this; // Used as token for batch disposal of this worker's runnables. if (async) { message.setAsynchronous(true); } handler.sendMessageDelayed(message, unit.toMillis(delay)); // Re-check disposed state for removing in case we were racing a call to dispose(). if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; }
一樣,經過Handler機制,將runnable扔給主線程執行,runnable是誰呢,是this,this就是這個ObserveOnObserver,咱們看它的run方法:
@Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } }
繼續看drainNormal
void drainNorml() { //簡化 final Observer<? super T> a = downstream; T v; v = q.poll(); a.onNext(v); }
抓重點,仍是把上游的處理結果扔給下游。也就是說observeOn會將它下游的onNext操做扔給它切換的線程中,所以ObserveOn影響的是它的下游,因此咱們途中observeOn後面的顏色都是藍的。
一樣咱們思考,若是有多個observeOn會發生什麼?很簡單,思路同subscribeOn,每一個ObserveOn只會影響它下游一直到下一個obseveOn節點的線程,也就是分段的。
到此爲止咱們就講完了所有內容,包括三條流的原理和線程切換的原理,至於Rxjava的其餘功能和原理,限於篇幅,本文不會講解,感興趣的讀者自行閱讀源碼。本文主要爲讀者提供了理解Rxjava的思路,真正要去理解它,仍是要多看源碼。
在我看來,Rxjava有點像觀察者模式和責任鏈模式的結合,普通的觀察者模式通常是被觀察者通知多個觀察者,而Rxjava則是被觀察者通知第一個Obsever,接下來Observer依次通知其餘節點的Observer,造成一個「觀察鏈」,將觀察者模式進行了一種相似鏈式的變換,每一個節點又會執行它不一樣的「職責」,很是巧妙,總結如下就是:
最原始的訂閱事件從最後一個節點開始,沿着Obsevable節點往上游傳遞,事件源頭處理完任務後,通知給最上游的觀察者,而後通知沿着Observer鏈條往下游傳遞,直到最後一個觀察者結束。
關於flatmap這個操做符,讀者可能會用到,但理解起來又比較難,咱們經過本文,其實就很容易從源碼中理解這個操做符的含義。這裏我順便給你們解釋一下,仍是看圖:
上圖簡化了整個事件流向,咱們對事件源進行了flatmap操做,flatmap在訂閱流的時候跟其餘的操做符基本一致,可是在觀察者回調流中卻很不同,它在回調流中作了如下內容:
flatmap將上游傳過來的數據進行了一次變換,變成了一個Observable,如何變的是由開發者自定義的,好比圖中下面三個豎着的三個Observable節點流,這條流跟上面的四個Observable節點本質上是同樣的。flatmap這個節點的Obsever將上游的數據轉化成了一個新的Observable流,而後執行這條新的流,當這條新的流走完時,會接着原來的觀察者流繼續走下去。也就是說,flatMap這個操做符將一條新的Observable節點流「插入」到原始的觀察者回調流上去了。
那圖中的橘黃色和紫色的虛線是什麼意思呢?
實際上它是flatmap的一種特殊狀況,當新插入的流的事件源有多個的時候,這是會產生分流,每一個流都會執行一遍下游的原始節點。咱們拿下面這個例子來看:
String[] mainArmy = {"第一大隊", "第二大隊", "第三大隊"}; Observable.fromArray(mainArmy) .flatMap(new Function<String, ObservableSource<String>>() { @Override public ObservableSource<String> apply(String s) throws Exception { String[] littleArmy = {s + "的第一小隊", s + "的第二小隊", s + "的第三小隊"}; return Observable.fromArray(littleArmy); } }).subscribe(new Consumer<String>() { @Override public void accept(String little) throws Exception { System.out.println(little); } });
這個代碼運行結果是
第一大隊的第一小隊 第一大隊的第二小隊 第一大隊的第三小隊 第二大隊的第一小隊 第二大隊的第二小隊 第二大隊的第三小隊 第三大隊的第一小隊 第三大隊的第二小隊 第三大隊的第三小隊
這個例子也許很好的體現了flatmap這個操做符的意義,把list鋪平展開,並且防止了繁瑣的嵌套循環。可是,雖然flatmap很擅長處理這種問題(我不知道這個操做符是否是爲了解決這個問題而設計出來的),但flatmap的功能卻遠不只如此,它的本質是在合併Obsevable流,能夠作不少事情,好比咱們網絡請求的「連環請求」,舉個例子,首先經過書本的Id獲取出版商名字,而後拿到出版商名字後獲取出版社信息。
api.getBookPublisherNameById("01102").flatmap(new Function<String, ObservableSource<PublisherInfo>>() { @Override public ObservableSource<PublisherInfo> apply(String s) throws Exception { return api.getPublisherInfoByName(s); } }).subscribe(new COnsume<PublisherInfo>() { @Override public void accept(PublisherInfo little) throws Exception { //獲取到出版社信息 } })
看完這裏,flatmap是否是也蠻好理解的~