Rxjava 2.x 源碼系列 - 線程切換 (下)bash
Rxjava 2.x 源碼系列 - 變換操做符 Map(上)app
在上一篇博客 Rxjava 源碼系列 - 基礎框架分析,咱們分析了 Rxjava 的基礎框架。框架
Observable 和 Observer 經過 subscribe() 方法實現訂閱關係,從而 Observable 能夠在須要的時候發出事件來通知 Observer,而且回調 Observer 的相應的方法。ide
用一張簡單的流程圖描述以下:函數
在 Android 中,咱們知道默認都是執行在主線程的,那麼 Rxjava 是如何實現線程切換的。post
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("TAG", "onSubscribe(): ");
}
@Override
public void onNext(String s) {
Log.e("TAG", "onNext(): " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e("TAG", "onComplete(): ");
}
});
複製代碼
咱們先來看一下 subscribeOn 方法,能夠看到ui
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
// scheduler 判空
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
// 用 ObservableSubscribeOn 將 scheduler 包裝 起來
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
複製代碼
而咱們從上一篇博客中知道,當咱們調用 observable.subscibe(observable) 的時候,最終會調用到具體的 observable 的實例的 subscribActual 方法。而這裏具體的 observable 的實例爲 ObservableSubscribeOn。this
接下來,咱們來看一下 ObservableSubscribeOn 這個類,能夠看到繼承 AbstractObservableWithUpstream ,而 AbstractObservableWithUpstream 繼承 Observable,實現 HasUpstreamObservableSource 這個接口。
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
---
}
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
/** The source consumable Observable. */
protected final ObservableSource<T> source;
/**
* Constructs the ObservableSource with the given consumable.
* @param source the consumable Observable
*/
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
public interface HasUpstreamObservableSource<T> {
/**
* Returns the upstream source of this Observable.
* <p>Allows discovering the chain of observables.
* @return the source ObservableSource
*/
ObservableSource<T> source();
}
複製代碼
observableSubscribeOn 的 subscribeActual 方法,跟 ObservableCreate 的 subscribeActual 的套路差很少,它也是 Observable 的一個子類。只不過比 ObservableCreate 多實現了一個接口HasUpstreamObservableSource,這個接口頗有意思,他的 source() 方法返回類型是 ObservableSource(還記得這個類的角色嗎?)。也就是說 ObservableSubscribeOn 這個 Observable 是一個擁有上游的 Observable 。他有一個很是關鍵的屬性 source,這個 source 就表明了他的上游。
接下來咱們一塊兒來看一下 ObservableSubscribeOn 的具體實現
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
}
複製代碼
首先先來看他的構造函數 ,有兩個參數 source ,scheduler。
這裏咱們先大概瞭解一下 Scheduler 是個什麼東東,Scheduler 裏面封裝了 Worker 和 DisposeTask,下面會詳細講到。
Schedulers.newThread()
@NonNull
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
複製代碼
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
IO = RxJavaPlugins.initIoScheduler(new IOTask());
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
複製代碼
咱們再回到 ObservableSubscribeOn 的 subscribeActual 方法,在上一篇博客的時候已經講解 Observable 和 Observer 之間是怎樣實現訂閱關係的,這裏就再也不具體展開了。
接下來,咱們重點關注這一行代碼
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
複製代碼
咱們先來看一下 SubscribeTask 這個類,他是 ObservableSubscribeOn 的一個非靜態內部類,能夠看到 其實也比較簡單,他實現了 Runnable 接口,而且持有 parent 引用。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
複製代碼
而後在 run 方法中,經過 source.subscribe(parent) 創建聯繫。於是,當咱們的 SubscribeTask 的 run 方法運行在哪一個線程,相應的 observer 的 subscribe 方法就運行在哪一個線程。
這裏可能會有人有疑問,SubscribeTask 沒有 source 屬性,它是怎麼訪問到 ObservableSubscribeOn 的屬性的。
咱們知道 java 中,非靜態內部類默認持有外部類的引用,於是他能夠正常訪問外部類 ObservableSubscribeOn 的 source 屬性。
接着,咱們再來看一下 scheduler.scheduleDirect 這個方法
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
// 判斷 run 是否爲 null
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
複製代碼
這裏咱們以 NewThreadScheduler 爲例,來看看這個 Worker 究竟是什麼?
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
---
}
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}
複製代碼
從上面能夠看到,其實 worker 裏面封裝了 executor(線程池),看到這裏,相信你也基本明白 Rxjava 線程切換的原理了,其實很簡單。
在 ObservableSubscribeOn subscribeActual 方法中, SubscribeTask 包裝 parent(SubscribeOnObserver ,包裝了 Observer),SubscribeTask 實現了 Runnable 接口,在 run 方法裏面調用了 source.subscribe(parent),於是 run 方法所執行的線程將由 worker 決定。這就是 下游決定上游 observable 執行線程的原理。
接下來咱們再來看一下:DisposeTask
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
final Runnable decoratedRun;
final Worker w;
Thread runner;
DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
@Override
public Runnable getWrappedRunnable() {
return this.decoratedRun;
}
}
}
複製代碼
// 將 新的 Disposable 設置給 parent ,方便取消訂閱關係,
//(由於咱們對 Observer 進行相應的包裝,原來的 parent 的 Disposable 已經不能表明最新的 Disposable)
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
複製代碼
DisposeTask 實現了 Disposable,Runnable ,SchedulerRunnableIntrospection 接口,Disposable 接口主要是用來取消訂閱關係的 Disposable。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.i(TAG, "subscribe: getName=" +Thread.currentThread().getName());
emitter.onNext("1");
emitter.onNext("2");
emitter.onNext("3");
emitter.onComplete();
}
}) // 進行兩次 subscribeOn
.subscribeOn(Schedulers.io()).subscribeOn(Schedulers.computation()).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("TAG", "onSubscribe(): ");
}
@Override
public void onNext(String s) {
Log.e("TAG", "onNext(): " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e("TAG", "onComplete(): ");
}
});
複製代碼
subscribe: getName=RxCachedThreadScheduler-1
若是將上述的 subscribeOn 的順序置換
subscribeOn(Schedulers.computation()).subscribeOn(Schedulers.io())
複製代碼
那麼將打印出
subscribe: getName=RxComputationThreadPool-1
爲何是第一次 Observable#subscribeOn(Scheduler) 纔有效呢?
前面咱們分析到,Observable#subscribeOn(Scheduler) 其實是將 Observable#subscribe(Observer) 的操做放在了指定線程,當咱們調用 subcribe 的時候,它的過程是從下往上的,即下面的 Observable 調用上面的 Observanle。
因此對於咱們上面的第一個例子,他的調用流程是這樣的:第三個 Observable 調用 Observable#subscribe(Observer) 啓動訂閱,在其內部會激活第二個 Observable 的 Observable#subscribe(Observer) 方法,可是此時該方法外部被套入了一個 Schedulers.computation() 線程
因而這個訂閱的過程就被運行在了該線程中。用僞代碼演示以下
public class Observable {
// 第「二」個 Observable
Observable source;
Observer observer;
public Observable(Observable source, Observer observer) {
this.source = source;
this.observer = observer;
}
public void subscribe(Observer Observer) {
new Thread("computation") {
@Override
public void run() {
// 第「二」個 Observable 訂閱
source.subscribe(observer);
}
}
}
}
複製代碼
再往上走,第二個 Observable 訂閱內部會激活第一個 Observable 的 Observable#subscribe(Observer) 方法,一樣的,該方法被套在了 Schedulers.io() 線程中,用僞代碼演示
public class Observable {
// 第「一」個 Observable
Observable source;
Observer observer;
public Observable(Observable source, Observer observer) {
this.source = source;
this.observer = observer;
}
public void subscribe(Observer Observer) {
new Thread("io") {
@Override
public void run() {
// 第「一」個 Observable 訂閱
source.subscribe(observer);
}
}
}
}
複製代碼
此時到達第一個 Observable 了以後就要開始發射事件了,此時的執行線程很明顯是 io 線程。還能夠換成 Thread 僞代碼來表示。
new Thread("computation") {
@Override
public void run() {
// 第二個 Observable.subscribe(Observer) 的實質
// 就是切換線程,效果相似以下
new Thread("io") {
@Override
public void run() {
// 第一個 Observable.subscribe(Observer) 的實質
// 就是發射事件
System.out.println("onNext(T)/onError(Throwable)/onComplete() 的執行線程是: " + Thread
.currentThread().getName());
}
} .start();
}
} .start();
複製代碼
用流程圖描述以下:
參考博客:
下一篇咱們將講解到 observeOn(AndroidSchedulers.mainThread()) 的原理。
掃一掃,歡迎關注個人公衆號。若是你有好的文章,也歡迎你的投稿。