經過前一篇的從觀察者模式出發,聊聊RxJava,咱們大體理解了RxJava的實現原理,在RxJava中能夠很是方便的實現不一樣線程間的切換。subscribeOn 用於指定上游線程,observeOn 用於指定下游線程,屢次用 subscribeOn 指定上游線程只有第一次有效,屢次用 observeOn 指定下次線程,每次都有效;簡直太方便了,比直接使用Handler省了很多力氣,同時也不用去關注內存泄漏的問題了。本篇就來看看在RxJava中上游是如何實現線程切換。java
爲了方便後面的敘述,這裏經過下面的UML圖簡單回顧一下上一篇的內容。網絡
此圖並無完整的展示圖中各個接口和類之間的各類關係,由於那樣會致使整個圖錯綜複雜,不便於查看,這裏只繪製出了RxJava各個類之間核心關係網絡app
從上面的UML圖中能夠看出,具體的實現類只有ObservableCreate和CreateEmitter。CreateEmitter是ObservableCreate的內部類(PlantUML 怎麼繪製內部類,沒搞懂,玩的轉的同窗請賜教呀(^▽^))。ide
上篇說過Observable建立的過程,能夠簡化以下:函數
Observable mObservable=new ObservableCreate(new ObservableOnSubscribe())
複製代碼
結合圖能夠更直觀的體現出這一點。ObservableCreate 內部持有ObservableOnSubscribe的引用。工具
當觀察者訂閱主題後:oop
mObservable.subscribe(mObserver);
複製代碼
ObservableCreate 中的subscribeActual()方法就會執行,post
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
複製代碼
在這個過程當中會建立CreateEmitter 的實例,而這個CreateEmitter實現了Emitter和Disposable接口,同時又持有Observer的引用(固然這個引用是ObservableCreate傳遞給他的)。接着就會執行ObservableOnSubscribe的subscribe 方法,方法的參數即爲剛剛建立的CreateEmitter 的實例,接着一系列連鎖反應,Emitter 接口中的方法(onNext,onComplete等)開始執行,在CreateEmitter內部,Observer接口中對應的方法依次執行,這樣就實現了一次從主題(上游)到觀察者(下游)的事件傳遞。ui
source.subscribe(parent)this
這裏的 source 是ObservableOnSubscribe的實例,parent是CreateEmitter的實例。上面加粗文本敘述的內容,就是這行代碼,能夠說這是整個訂閱過程最核心的實現。
好了,回顧完基礎知識後,立刻進入正題,看看RxJava是如何實現線程切換的。
咱們知道正常狀況下,全部的內容都是在主線程執行,既然這裏提到了線程切換,那麼必然是切換到了子線程,所以,這裏須要關注線程的問題,咱們就帶着下面這幾個問題去閱讀代碼。
首先看一下,平常開發中實現線程切換的具體實現
private void multiThread() {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("This msg from work thread :" + Thread.currentThread().getName());
sb.append("\nsubscribe: currentThreadName==" + Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.e(TAG, "accept: s= " + s);
}
});
}
複製代碼
這段代碼,使用過RxJava的同窗再熟悉不過了,上游事件會在一個名爲 RxNewThreadScheduler-1 的線程執行,下游線程會切換回咱們熟悉的Android UI線程。
咱們就從subscribeOn(Schedulers.newThread()) 出發,看看這個代碼的背後,到底發生了什麼。
這裏咱們先無論Schedulers.newThread() 是什麼鬼,首先看看這個subscribeOn()方法。
Observable.java--- subscribeOn(Scheduler scheduler)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
複製代碼
能夠看到,這個方法須要一個Scheduler 類型的參數。
RxJavaPlugins.java--- onAssembly(@NonNull Observable source)
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
複製代碼
O(∩_∩)O哈哈~,是否是以爲似曾相識,和create操做符一個套路呀。所以,observeOn也能夠簡化以下:
new ObservableSubscribeOn<T>(this, Schedulers.newThread());
複製代碼
這裏你也許會有疑問,這個this是什麼呢?其實這個this就是Observable,具體到上面的代碼來講就是ObservableCreate,總之就是一個具體的Observable。
接着看ObservableSubscribeOn 這個類
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
}
複製代碼
看一下 AbstractObservableWithUpstream.java
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
/** The source consumable Observable. */
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
複製代碼
再看一下 HasUpstreamObservableSource.java
/** * Interface indicating the implementor has an upstream ObservableSource-like source available * via {@link #source()} method. * * @param <T> the value type */
public interface HasUpstreamObservableSource<T> {
/** * Returns the upstream source of this Observable. * <p>Allows discovering the chain of observables. * @return the source ObservableSource */
ObservableSource<T> source();
}
複製代碼
饒了半天,ObservableSubscribeOn 原來和上一篇說的ObservableCreate同樣,也是Observable的一個子類。只不過比ObservableCreate多實現了一個接口HasUpstreamObservableSource,這個接口頗有意思,他的source()方法返回類型是ObservableSource(還記得這個類的角色嗎?)。也就是說ObservableSubscribeOn這個Observable是一個擁有上游的Observable。他有一個很是關鍵的屬性source,這個source就表明了他的上游。
咱們接着看ObservableSubscribeOn的具體實現。
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
// observer 調用onSubscribe方法,獲取上游的控制權
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
}
複製代碼
咱們以前說過,subscribeActual()是實現上下游之間訂閱關係的重要方法。由於只有真正實現了訂閱關係,上下游之間才能鏈接起來。咱們看這個方法的最後一句代碼。
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
複製代碼
這句代碼,能夠說就是很是關鍵,由於從這裏開始了一系列的連鎖反應。首先看一下SubscribeTask
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
複製代碼
看到這句 source.subscribe(parent),是否是以爲似曾相識呢?
SubscribeTask 實現了是Runnable接口,在其run方法中,定義了一個須要在線程中執行的任務。按照類的繼承關係,很明顯source 就是ObservableSubscribeOn 的上游Observable,parent是一個Observer。也就是說這個run方法要執行的內容就是實現ObservableSubscribeOn的上游和Observer的訂閱。一旦某個線程執行了這個Runnable(SubscribeTask),就會觸發了這個run方法,從而實現訂閱,而一旦這個訂閱實現,那麼後面的流程就是上節所說的事情了。
這裏能夠解答第三個問題了,上游事件是怎麼給弄到子線程裏去的,這裏很明顯了,就是直接把訂閱方法放在了一個Runnable中去執行,這樣就一旦這個Runnable在某個子線程執行,那麼上游全部事件只能在這個子線程中執行了。
好了,線程要執行的任務彷佛建立完了,下面就接着找看看子線程是怎麼建立的。回過頭繼續看剛纔的方法,
scheduler.scheduleDirect(new SubscribeTask(parent))
複製代碼
Scheduler.java----scheduleDirect
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
// 對run進行了一次裝飾
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
@NonNull
// 抽象方法
public abstract Worker createWorker();
複製代碼
首先看一下Worker類
/** * Sequential Scheduler for executing actions on a single thread or event loop. * <p> * Disposing the {@link Worker} cancels all outstanding work and allows resource cleanup. */
public abstract static class Worker implements Disposable {
@NonNull
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
}
複製代碼
Worker是Scheduler內部的一個靜態抽象類,實現了Disposable接口,其schedule()方法也是抽象的。
再看一下DisposeTask
static final class DisposeTask implements Runnable, Disposable {
final Runnable decoratedRun;
final Worker w;
Thread runner;
DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
}
複製代碼
DisposeTask 又是一個Runnable,同時也實現了Disposable接口。能夠看到在他的run方法中會執行decoratedRun的run方法,這個decoratedRun其實就是參數中傳遞進來的run,也就是說,執行了這個DisposeTask的run方法,就會觸發SubscribeTask中的run方法,所以,咱們就要關注是誰執行了這個DisposeTask。
回到scheduleDirect()方法
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
// 對run進行了一次裝飾
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
複製代碼
scheduleDirect()方法的實現咱們總結一下:
ε=(´ο`*)))唉,說了這麼久,子線程是如何建立的依然不清楚,不管是SubscribeTask仍是DisposeTask只是定義會在某個子線程中執行的任務,並不表明子線程已被建立。可是經過以上代碼,咱們也能夠收穫一些有價值的結論:
好了,到這裏對於subscribeOn()方法的分析已經到了盡頭,咱們找了最終須要運行子任務的對象Worker,而這個Worker是個抽象類,所以咱們須要關注Worker的具體實現了。
下面咱們就從剛纔丟下的Schedulers.newThread() 換個角度來分析,看看能不能找到這個Worker的具體實現。
前面說了subscribeOn()方法須要一個Scheduler 類型的參數,然而經過前面的分析咱們知道Scheduler是個抽象類,是沒法被實例化的。所以,這裏就從Schedulers類出發。
/** * Static factory methods for returning standard Scheduler instances. */
public final class Schedulers {
}
複製代碼
註釋很清楚,這個Schedulers就是一個用於生成Scheduler實例的靜態工廠。
下面咱們就來看看,在這個工廠中newThread() 生成了一個什麼樣的Scheduler實例。
@NonNull
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
複製代碼
newThread() 方法通過層層委託處理(最終的建立方式,有點單例模式的意味),最終咱們須要的就是一個NewThreadScheduler的實例。
NewThreadScheduler.java
public final class NewThreadScheduler extends Scheduler {
final ThreadFactory threadFactory;
private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";
static {
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}
public NewThreadScheduler() {
this(THREAD_FACTORY);
}
public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}
複製代碼
不出所料NewThreadScheduler 是Scheduler的一個子類,在他的靜態代碼塊中構造了一個Priority=5的線程工廠。而在咱們最最關注的createWorker()方法中他又用這個線程工廠建立了一個NewThreadWorker 的實例。下面就讓咱們看看最終的NewThreadWorker 作了些什麼工做。
NewThreadWorker.java(節選關鍵內容)
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable run) {
return schedule(run, 0, null);
}
@Override
public void dispose() {
if (!disposed) {
disposed = true;
executor.shutdownNow();
}
}
}
複製代碼
衆裏尋他千百度,終於找到了Worker的實現了,同時再一次不出所料的又一次實現了Disposable接口,o(╥﹏╥)o。
在其構造函數中,經過NewThreadScheduler中提供的線程工廠threadFactory建立了一個ScheduledExecutorService。
ScheduledExecutorService.java ---create
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}
複製代碼
用大名鼎鼎的Executors(Executor的工具類),建立了一個核心線程爲1的線程。
至此,咱們終於找到了第一個問題的答案,子線程是誰如何建立的;在NewThreadScheduler的createWorker()方法中,經過其構建好的線程工廠,在Worker實現類的構造函數中建立了一個ScheduledExecutorService的實例,是經過SchedulerPoolFactory建立的。
同時能夠看到,經過執行dispose 方法,可使用ScheduledExecutorService的shutdown()方法,中止線程的執行。
線程已經建立好了,下面就來看看究竟是誰啓動了這個線程。前面咱們說過,Worker的schedule()方法若是執行了,就會執行咱們定義好的Runnable,經過這個Runnable中run方法的執行,就能夠實現上下游訂閱關係。下面就來看看這個scheduler()方法。
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
複製代碼
到這裏,已經很明顯了,在schedulerActual方法中,會經過剛纔建立好的子線程對象executor經過submit或schedule執行一個Runnable任務(雖然這個Runnable對象再一次通過了各類裝飾和包裝,但其本質沒有發生變化),並將執行結果封裝後返回。而這個Runnable對象追根溯源來講,就是咱們在ObservableSubscribeOn類中建立的一個SubscribeTask對象。所以,當這個子線程開始運行的時候就是執行SubscribeTask中run()方法的時機;一旦這個run方法執行,那麼
source.subscribe(parent)
複製代碼
這句最關鍵的代碼就開始執行了,一切的一切又回到了咱們上一篇那熟悉的流程了。
好了,按照上面的流程捋下來,感受仍是有點分散,那麼就用UML圖看看總體的結構。
咱們看最下面的ObservableSubscribeOn,他是subscribeOn 返回的Observable對象,他持有一個Scheduler 實例的引用,而這個Scheduler實例就是NewThreadScheduler(即Schedulers.newThreade())的一個實例。ObservableSubscribeOn 的subscribeActual方法,會觸發NewThreadScheduler去執行SubscribeTask中定義的任務,而這個具體的任務又將由Worker類建立的子線程去執行。這樣就把上游事件放到了一個子線程中實現。
至於最後一個問題,屢次用 subscribeOn 指定上游線程爲何只有第一次有效?,看完通篇其實也很好理解了,由於上游Observable只有一個任務,就是subscribe(準確的來講是subscribeActual()),而subscribeOn 要作的事情就是把上游任務切換到一個指定線程裏,那麼一旦被切換到了某個指定的線程裏,後面的切換不就是沒有意義了嗎。
好了,至此上游事件切換到子線程的過程咱們就明白了。下游事件又是如何切換的且聽下回分解,原本想一篇寫完的,結果發現越寫越多,只能分紅兩篇了!!!o(╯□╰)o。
在RxJava的分析中,咱們常常會遇到Disposable這個單詞,確切的說是接口,這裏簡單說一說這個接口。
/** * Represents a disposable resource. */
public interface Disposable {
void dispose();
boolean isDisposed();
}
複製代碼
咱們知道,在Java中,類實現某個接口,通俗來講就是表明這個類多了一項功能,好比一個類實現Serializable接口,表明這個類是能夠序列化的。這裏Disposable也是表明一種能力,這個能力就是Disposable,就是表明一次性的,用後就丟棄的,好比一次性筷子,還有那啥。
在RxJava中不少類都實現了這個接口,這個接口有兩個方法,isDisposed()顧名思義返回當前類是否被拋棄,dispose()就是主動拋棄。所以,全部實現了這個接口的類,都擁有了這樣一種能力,就是能夠判斷本身是否被拋棄,同時也能夠主動拋棄本身。
上一篇咱們說了,Observer經過onSubscribe(@NonNull Disposable d),會得到一個Disposable,這樣就有能力控制上游的事件發送了。這樣,咱們就不難理解,爲何那麼多類實現了這個接口,由於下游獲取到的是一個擁有Disposable的對象,而一旦擁有了一個這樣的對象,那麼就能夠經過下游控制上游了。能夠說,這是RxJava對常規的觀察者模式所作的最給力的改變。
在查看RxJava的源碼時,可能不少人都和我同樣,有一個巨大的困擾,就是這些類的名字好他媽難記,感受長得都差很少,關鍵念起來好像也差很少。但其實本質上來講,RxJava對類的命名仍是很是規範的,只是咱們不太習慣而已。按照英文單詞翻譯:
其實就這麼三個主語,其餘的什麼ObservableCreate,ObservableSubscribeOn,AbstractObservableWithUpstream,還有上面提到的Disposable,都是對各類各樣的Observable和Observer的變形和修飾結果,只要理解這個類的核心含義是什麼,就不會被這些名字搞暈了。
RxJava 能夠說是博大精深,以上全部分析徹底是我的平時使用時的總結與感悟,有任何錯誤之處,還望各位讀者提出,共同進步。
關於RxJava 這裏牆裂推薦一篇文章一篇不太同樣的RxJava介紹,感受是自扔物線那篇以後,對RxJava思想感悟最深的一篇了。對RxJava 有興趣的同窗,能夠多度幾遍,每次都會有收穫!!