RxJava的一個重要優勢就在於能夠方便的切換線程,因此就想從源碼的角度探索下其切換線程的原理java
ObserveOn用於切換下游執行線程,能夠屢次調用,每調用一次會切換一次,先來看一個例子bash
fun threadName(desc: String) {
println("$desc ${Thread.currentThread().name}")
}
fun main() {
Observable.create<Int> {
threadName("subscribe")
it.onNext(1)
it.onNext(2)
it.onComplete()
}.observeOn(Schedulers.io())
.subscribe(object : Observer<Int> {
override fun onComplete() {
threadName("onComplete")
}
override fun onSubscribe(d: Disposable) {
threadName("onSubscribe")
}
override fun onError(e: Throwable) {
threadName("onError")
}
override fun onNext(t: Int) {
threadName("onNext")
}
})
}
複製代碼
輸出結果網絡
onSubscribe main
subscribe main
onNext RxCachedThreadScheduler-1
onNext RxCachedThreadScheduler-1
onComplete RxCachedThreadScheduler-1
複製代碼
說好的observeOn
切換下游執行線程,怎麼onSubscribe
方法會在主線程中調用?緣由是observeOn
方法生成的ObserveOnObserver實例並不會對onSubscribe
事件作切換線程的操做,這個等下看了源碼就理解了。那麼observeOn
是怎麼把下游的onNext
、onComplete
切換到子線程執行的呢?來看看observeOn
的源碼實現ide
// Observable.java
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
return observeOn(scheduler, delayError, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
複製代碼
observeOn
方法調用後會返回一個ObservableObserveOn實例,通過上篇文章的分析主要關注其subscribeActual
方法就行oop
protected void subscribeActual(Observer<? super T> observer) {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
複製代碼
包裝了一下下游Observer,能夠猜想這個Observer內部會將onNext
等事件轉到其它線程進行執行ui
public void onSubscribe(Disposable d) {
...
downstream.onSubscribe(this);
}
public void onNext(T t) {
queue.offer(t);
schedule();
}
public void onError(Throwable t) {
schedule();
}
public void onComplete() {
schedule();
}
void schedule() {
worker.schedule(this);
}
複製代碼
能夠看到onSubscribe
直接在當前線程執行了沒有進行線程切換,onNext
、onError
、onComplete
則是都調用了schedule
方法this
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
// EventLoopWorker.java
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
// NewThreadWorker.java
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
複製代碼
executor是一個SchedulerThreadPoolExecutor實例,最終都會在線程池中運行那個Runnable實例也就是ObserveOnObserver實例,因此就看看其run
方法spa
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
for (;;) {
for (;;) {
try {
v = q.poll();
} catch (Throwable ex) {
a.onError(ex);
return;
}
a.onNext(v);
}
}
}
void drainFused() {
for (;;) {
if (d) {
disposed = true;
ex = error;
if (ex != null) {
downstream.onError(ex);
} else {
downstream.onComplete();
}
worker.dispose();
return;
}
}
}
複製代碼
能夠看到onNext
、onError
、onComplete
都在這個方法裏面調用,所以這些方法就運行在了線程池中了,這樣就成功的切換了線程,那麼屢次調用observeOn有效果嗎?屢次調用其實就是在一個線程池中的某個線程中再次開啓了一個線程,因此是有效果的。接着看看subscribeOn
這個方法線程
subscribeOn用於上游執行線程,而且屢次調用只有第一次會生效,先來看一個例子code
fun main() {
Observable.create<Int> {
threadName("subscribe")
it.onNext(1)
it.onNext(2)
it.onComplete()
}.subscribeOn(Schedulers.io())
.subscribe(object : Observer<Int> {
override fun onComplete() {
threadName("onComplete")
}
override fun onSubscribe(d: Disposable) {
threadName("onSubscribe")
}
override fun onError(e: Throwable) {
threadName("onError")
}
override fun onNext(t: Int) {
threadName("onNext")
}
})
}
複製代碼
輸出結果
onSubscribe main
subscribe RxCachedThreadScheduler-1
onNext RxCachedThreadScheduler-1
onNext RxCachedThreadScheduler-1
onComplete RxCachedThreadScheduler-1
複製代碼
咦!不是說subscribeOn切換的只是上游的執行線程,爲何onNext
、onComplete
也會在子線程中執行?其實答案很簡單該段代碼中沒有調用observeOn因此下游執行線程並無發生改變,所以上游在子線程中發送一個onNext
事件過來,下游的onNext
方法天然也會在子線程中執行,那麼subscribeOn
內部到底作了什麼纔會致使上游會在子線程中執行呢,來看看其源碼實現
public final Observable<T> subscribeOn(Scheduler scheduler) {
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
複製代碼
建立了一個ObservableSubscribeOn
實例並將Scheduler
實例傳入,接着看看其subscribeActual
實現
// ObservableSubscribeOn.java
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
複製代碼
這裏直接回調了下游Observer
實例的onSubscribe
方法,接着執行scheduleDirect
,咱們繼續跟進
// Schedule.java
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// createWorker是抽象方法,IoScheduler會返回一個EventLoopWorker實例
final Worker w = createWorker();
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
複製代碼
調用到了scheduleActual
方法,這個方法上面在分析observeOn
時已經分析過,內部會在線程池中執行action
這個Runnable
實例,那麼主要看看SubscribeTask
的run
方法
// SubscribeTask.java
public void run() {
source.subscribe(parent);
}
複製代碼
裏面的邏輯很簡單就是在線程池中執行上游的subscribe
方法,所以上游的全部事件都將在該線程池中執行。那麼爲何說subscribeOn
只能生效一次呢?其實真正的來講subscribeOn
也能夠生效屢次,只不過最上游發送事件的線程是由第一次subscribeOn
調用時肯定的,舉個例子
fun main() {
Observable.create<Int> {
threadName("subscribe")
it.onNext(1)
it.onNext(2)
it.onComplete()
}.subscribeOn(Schedulers.io())
.map { threadName("map"); it + 1 }
.subscribeOn(Schedulers.computation())
.subscribe(object : Observer<Int> {
override fun onComplete() {
threadName("onComplete")
}
override fun onSubscribe(d: Disposable) {
threadName("onSubscribe")
}
override fun onError(e: Throwable) {
threadName("onError")
}
override fun onNext(t: Int) {
threadName("onNext")
}
})
// 主線程睡眠下,防止RxJava生成的daemon線程自動退出
Thread.sleep(200)
}
複製代碼
輸出結果
onSubscribe main
subscribe RxCachedThreadScheduler-1
map RxCachedThreadScheduler-1
onNext RxCachedThreadScheduler-1
map RxCachedThreadScheduler-1
onNext RxCachedThreadScheduler-1
onComplete RxCachedThreadScheduler-1
複製代碼
在這裏例子中調用了兩次subscribeOn
可是看起來只有第一次才生效,其實map
方法生成的ObservableMap
實例的subscribe
方法是在計算線程池中執行的,下面來看一下observeOn
和subscribeOn
進行組合的狀況
假設有這麼一個需求: 先註冊而後登陸,須要知足:1. 註冊成功後要能更新UI,2. 註冊失敗將再也不進行登陸,3. 登陸成功或者失敗也須要更新UI。因爲網絡請求不能在主線程執行,所以咱們就須要用到線程切換,下面是示例代碼
private var disposable: Disposable? = null
private fun registerAndLogin(listener: Listener) {
getRegisterObservable()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext {
threadName("doOnNext")
if (!it.success) {
listener.registerFail()
disposable?.dispose()
} else {
listener.registerSuccess()
}
}
.observeOn(Schedulers.io())
.flatMap {
getLoginObservable()
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : Observer<LoginModel> {
override fun onComplete() {
threadName("onComplete")
}
override fun onSubscribe(d: Disposable) {
disposable = d
threadName("onSubscribe")
}
override fun onNext(model: LoginModel) {
if (model.success) listener.loginSuccess(model.token)
else listener.loginFail()
threadName("onNext")
}
override fun onError(e: Throwable) {}
})
}
private fun getRegisterObservable(): Observable<RegisterModel> {
return Observable.create {
// 模擬網絡耗時
Thread.sleep(500)
threadName("register")
it.onNext(RegisterModel(true))
it.onComplete()
}
}
private fun getLoginObservable(): Observable<LoginModel> {
return Observable.create {
// 模擬網絡耗時
Thread.sleep(500)
threadName("login")
it.onNext(LoginModel(true, "token"))
it.onComplete()
}
}
data class RegisterModel(val success: Boolean)
data class LoginModel(val success: Boolean, val token: String)
interface Listener {
fun registerSuccess()
fun registerFail()
fun loginSuccess(token: String)
fun loginFail()
}
private fun threadName(desc: String) {
Log.d("Thread", "$desc ${Thread.currentThread().name}")
}
複製代碼
輸出結果以下
onSubscribe main
register RxCachedThreadScheduler-1
doOnNext main
login RxCachedThreadScheduler-2
onNext main
onComplete main
複製代碼
這個輸出徹底知足了咱們的需求,網絡請求在子線程,UI更新在主線程,如今來分析下爲何會是這個結果。
subscribeOn
這個方法在這條鏈上哪一個地方調用都不要緊其並不會影響結果,由於其只是決定了註冊操做所在的線程doOnNext
中,咱們知道登陸操做是在子線程,因此咱們這裏要使用observeOn
將線程切換到主線程observeOn
將線程再次切換到子線程observeOn
將線程切換到主線程