Android的第三方庫-RxJava:RxJava的線程切換

介紹

RxJava的基礎使用和基礎流程能夠看看上一篇的文章《Android的三方庫 - RxJava:RxJava的使用和基本訂閱流程》緩存

實際項目中常常會有一些數據獲取操做,這就須要使用到RxJava的線程了。 因此讓咱們來看看RxJava的線程切換。bash

1.線程切換案例

首先看一個小的案例:ide

Log.i(TAG, "當前線程 : " + Thread.currentThread().getName());
Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        Log.i(TAG, "subscribe: currentThread : " + Thread.currentThread().getName());
        emitter.onNext("ONE");
        emitter.onComplete();
    }
}).subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.i(TAG, "onSubscribe: currentThread : " + Thread.currentThread().getName());
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: currentThread : " + Thread.currentThread().getName());
        Log.i(TAG, "onNext: s : " +  s);
    }

    @Override
    public void onError(Throwable e) {
    
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete: currentThread : " + Thread.currentThread().getName());
    }
});
複製代碼

輸出結果:oop

當前線程 : main
onSubscribe: currentThread : main
subscribe: currentThread : RxCachedThreadScheduler-1
onNext: currentThread : main
onNext: s : ONE
onComplete: currentThread : main
複製代碼

而後咱們在線程中來執行這段代碼:源碼分析

new Thread(new Runnable() {
    @Override
    public void run() {
        RxObservable3(); // 這個就是上面那段代碼
    }
}).start();
複製代碼

輸出結果:post

當前線程 : Thread-5
onSubscribe: currentThread : Thread-5
subscribe: currentThread : RxCachedThreadScheduler-1
onNext: currentThread : main
onNext: s : ONE
onComplete: currentThread : main
複製代碼

咱們能夠很明顯得出幾個結論:ui

1. onSubscribe運行的線程和代碼運行所在的線程是一致的。
2. Observable 運行的線程是subscribeOn指定的線程。
3. Observer 運行的線程是 observeOn 指定的線程。
複製代碼

2.源碼分析

如今咱們對源碼來進行分析。this

2.1 subscribeOn()

在咱們的案例中的調用:spa

subscribeOn(Schedulers.io())
複製代碼

進入到它的源碼中:線程

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
複製代碼

在這裏傳入的參數 this 其實就是 當前建立的Observable( 就是 ObservableCreate)將它封裝成 ObservableSubscribeOn,並返回它的對象。

scheduler就是你使用的 Schedulers.io()

咱們進入到 io()

public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}
複製代碼

在這裏的 IO 是一個對象,它的具體實現要看 IOTask

static final class IOTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return IoHolder.DEFAULT;
    }
}
--------
static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}

複製代碼

能夠看到咱們就這樣獲得了IoScheduler的對象。先靜置這裏,等會使用。

2.1.1 ObservableSubscribeOn#subscribeActual( )

咱們根據上一篇的文章的分析,知道subscribeActual()是一個抽象方法,以前的實現是ObservableCreate中,

如今ObservableCreate對象被包裝成爲一個新的ObservableSubscribeOn對象

所以咱們來看看 ObservableSubscribeOn 中的 subscribeActual方法:

@Override
public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

    observer.onSubscribe(parent);
    
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
複製代碼

目前看到都是沒有使用過任何其餘的線程,因此 觀察者( observer ) 的 onSubscribe() 運行的線程就是當前的線程。

因此 onSubscribe()執行的線程就是 當前的線程。

即:

onSubscribe: currentThread : Thread-5
複製代碼

繼續來看下一行代碼,執行了 SubscribeTask 這個類。

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }
    
    @Override
    public void run() {
        source.subscribe(parent);
    }
}

複製代碼

這是一個Runnable類,在run方法中 執行 subscribe 方法,

這裏的source其實就是上一個Observable,也就是ObservableCreate的對象。

2.1.2 Schedule#scheduleDirect()

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

-----------------------------------

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    // 這個createWorker 是一個抽象方法,具體的實現是在 Schedule的子類中,在這裏也就是IoScheduler
    final Worker w = createWorker();
    //  decoratedRun實際上仍是SubscribeTask這個對象
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    // 將 Workder 和 Runnable 包裝成爲一個DisposeTask
    DisposeTask task = new DisposeTask(decoratedRun, w);
    // 執行
    w.schedule(task, delay, unit);

    return task;
}
複製代碼

再繼續看一下IoScheduler中的createWorker()workerschedule()方法。

final AtomicReference<CachedWorkerPool> pool; // AtomicReference 就是實現對象引用的原子更新

@NonNull
@Override
public Worker createWorker() {
    // 獲得一個EventLoopWorker對象,並傳入一個緩存池
    return new EventLoopWorker(pool.get());
}

--------------------------

static final class EventLoopWorker extends Scheduler.Worker {
    private final CompositeDisposable tasks;
    private final CachedWorkerPool pool;
    private final ThreadWorker threadWorker;

    final AtomicBoolean once = new AtomicBoolean();

    EventLoopWorker(CachedWorkerPool pool) {
        this.pool = pool;
        this.tasks = new CompositeDisposable();
        this.threadWorker = pool.get();
    }

    .... //省略無關代碼

    @NonNull
    @Override
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (tasks.isDisposed()) {
            // don't schedule, we are unsubscribed return EmptyDisposable.INSTANCE; } // Runnable 最終是交給了threadWorker 去執行 return threadWorker.scheduleActual(action, delayTime, unit, tasks); } } 複製代碼

咱們能夠看下這個 ThreadWorker ,它是沒有實現 scheduleActual()這個方法的,

咱們來到它的父類 NewThreadWorker

public NewThreadWorker(ThreadFactory threadFactory) {
    executor = SchedulerPoolFactory.create(threadFactory);
}

@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
     // 這個run 是SubscribeTask
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    // 繼續封裝成爲ScheduledRunnable
    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    .... // 省略無關代碼
    Future<?> f;
    try {
      // 在這裏用線程池執行
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}
複製代碼

最後就是 executor這個線程池對象來執行任務,SubscribeTask會被線程池執行,

也就是說 Observablesubscribe()方法會在IO線程中被調用。

以前的輸出結果:

subscribe: currentThread : RxCachedThreadScheduler-1
複製代碼

因此 subscribe() 執行的線程就是 subscribeOn 指定的線程(在這裏就是IoScheduler)。

2.1.3 屢次設置subscrineOn()的問題

案例:

//省略先後代碼,看重點部分
        .subscribeOn(Schedulers.io())//第一次
        .subscribeOn(Schedulers.newThread())//第二次
        .subscribeOn(AndroidSchedulers.mainThread())//第三次
複製代碼

輸出結果:

subscribe: currentThread : RxCachedThreadScheduler-1 
複製代碼

就是第一次的 subscribeOn()的設置起做用了,這是爲何呢?

由於 每一次調用 subscribeOn() 都會將以前的Observable從新包裝。

咱們來看這張圖: 引用 玉剛說的 詳解 RxJava 的消息訂閱和線程切換原理中的圖

從第三次的ObservableSubscribeOn每次都會通知它的上一個Obsevable

最後都會上傳到第一次的ObservableSubscribeOn中,

因此不過設置多少,都只有第一次的subscribeOn() 纔是生效的。

2.2 observeOn( )

在案例中的設置:

.observeOn(AndroidSchedulers.mainThread())
複製代碼

進入源碼:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

-----------------

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
複製代碼

這個也是從新包裝了一個ObservableObserveOn對象,

這裏的this就是以前被封裝過的 ObservableSubscribeOn 對象。

而後咱們繼續進入到ObservableObserveOn 中去。

2.2.1 ObservableObserveOn#subscribeActual( )

@Override
protected void subscribeActual(Observer<? super T> observer) {
    // 若是是當前線程的話,直接調用ObservableSubscribeOn的subscribe()方法
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        // 這裏的 scheduler 其實就是 AndroidSchedulers.mainThread()
        Scheduler.Worker w = scheduler.createWorker();
        // 將worker封裝成ObserveOnObserver
        // 這裏的source.subscribe是在IO線程中執行
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}
複製代碼

接下來咱們來看看 ObserveOnObserver 中的源碼:

2.2.2 ObserveOnObserver#onNext()

@Override
public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t); // 加入隊列中
    }
    schedule();
}
複製代碼

主要是執行 schedule()

2.2.3 ObserveOnObserver#schedule()

void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}
複製代碼

其實 ObserveOnObserver 本身也實現了 Runnable ,因此就是調用了本身。

這裏的worker 就是 你傳入的 主線程(mainThread

而後咱們來看看 run 方法:

@Override
public void run() {
    if (outputFused) { // outputFused 默認是false
        drainFused();
    } else {
        drainNormal();
    }
}
複製代碼

進入到 drainNormal() 中:

void drainNormal() {
    int missed = 1;
    // 這裏就是存儲消息的隊列
    final SimpleQueue<T> q = queue;
    // 這裏是  downstream 就是 自定義的Observer
    final Observer<? super T> a = downstream;
   
      .... //省略部分代碼 
      // 隊列中取出消息
                v = q.poll();
     .... //省略部分代碼
      // 這裏就是調用自定義的Observer#onNext()
            a.onNext(v);
      ....
    }
}
複製代碼

這樣咱們最終會調用到 咱們自定義的Observer#onNext()

因此Observer#onNext() 是在 observeOn() 指定的線程中調用的。

注意:

在這裏的包裝順序是

CreateEmitter包裝了SubscribeOnObserver

SubscribeOnObserver包裝了ObserveOnObserver

ObserveOnObserver包裝了自定義的Observer

對於調用onNext() 方法:

咱們在subscribe 中使用 ObservableEmitter 中調用了onNext()

會在SubscribeOnObserver中調用onNext()

而後會繼續調用到ObserveOnObserver#onNext()

因此在run中調用的Observer的對象實際上是自定義的Observer

3.結束

以上就是我對於RxJava線程切換的源碼分析,若有不對,請指正。

相關文章
相關標籤/搜索