rxjava2源碼解析(三)線程池原理分析

引言

先貼上這個系列的連接。
rxjava2源碼解析(一)基本流程分析
rxjava2源碼解析(二)線程切換分析
上一篇說了rxjava2的線程切換,可是沒有深刻說其中的線程池。這篇咱們來深扒一下。java

observeOn

仍是先說observeOn,直接看源碼:面試

public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
複製代碼

這段代碼咱們上篇看到過,這裏再重複一下。obsererOn是切換下游觀察者線程,咱們看ObserveOnObserver中的onNext方法是如何切換線程的。bash

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
        
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
        
        void schedule() {
            if (getAndIncrement() == 0) {
                //ObserveOnObserver繼承了runnable接口,意味着能夠當作是線程任務來執行。這裏表明着在新線程中執行run方法。
                worker.schedule(this);
            }
        }
        
        //ObserveOnObserver繼承了runnable接口
        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
        
        void drainNormal() {
            int missed = 1;
            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = downstream;
            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }
                for (;;) {
                    ····//省略一些判斷的代碼
                    v = q.poll();
                    //這裏就能夠看到,將下游的onNext方法,切換到新線程執行。
                    a.onNext(v);
                }
                ···
            }
        }
        
    }
複製代碼

這是上游的處理器執行onNext,傳到這裏,使用以前設置的線程執行下游的onNext方法。app

Worker

這個worker究竟是什麼?咱們先看scheduler的createWorker方法:ide

public abstract Worker createWorker();
複製代碼

Scheduler類中,createWorker只是一個接口,子類會重寫這個方法,咱們就以Schedulers.newThread()這個方法建立的Scheduler爲例,來看看這裏面的原理。post

//Schedulers類中的newThread靜態方法,這裏的hock咱們暫且不理,直接返回NEW_THREAD
    public static Scheduler newThread() {
        return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
    }
    
    //Schedulers類中定義了NEW_THREAD和其餘THREAD
        static {
        SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

        COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

        IO = RxJavaPlugins.initIoScheduler(new IOTask());

        TRAMPOLINE = TrampolineScheduler.instance();

        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }
    
    //NewThreadTask是Schedulers的靜態內部類,繼承自Callable接口,其中call方法返回一個Scheduler
    static final class NewThreadTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return NewThreadHolder.DEFAULT;
        }
    }
    
    //NewThreadHolder一樣是一個靜態內部類,裏面只有一個靜態參數DEFAULT,這裏咱們就找到了newThread方法返回的本尊NewThreadScheduler
    static final class NewThreadHolder {
        static final Scheduler DEFAULT = new NewThreadScheduler();
    }
複製代碼

如上面代碼和註釋所示,咱們直接看NewThreadScheduler的源碼:ui

/**
 * Schedules work on a new thread.
 */
public final class NewThreadScheduler extends Scheduler {

    final ThreadFactory threadFactory;

    private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
    private static final RxThreadFactory THREAD_FACTORY;

    /** The name of the system property for setting the thread priority for this Scheduler. */
    private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

    static {
        //這裏Thread.MIN_PRIORITY爲1,Thread.MAX_PRIORITY爲10.Thread.NORM_PRIORITY爲5.若是咱們不作任何更改,這裏的priority的值就爲5.
        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));

        THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
    }

    public NewThreadScheduler() {
        this(THREAD_FACTORY);
    }

    public NewThreadScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }

    @NonNull
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(threadFactory);
    }
}
複製代碼

這裏createWorker方法返回的是一個NewThreadWorker對象。咱們總算找到了worker的來源,須要注意這裏的構造參數是threadFactory。來看看NewThreadWorker的源碼。this

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable run) {
        return schedule(run, 0, null);
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
        return scheduleActual(action, delayTime, unit, null);
    }
    
    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        //hock機制
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        //用一個ScheduledRunnable把傳入的runnable包裝一下,本質上沒區別。
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        ....//省略判斷性代碼
        Future<?> f;
        try {
            if (delayTime <= 0) {
                //executor由構造方法中建立
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }
    ....
}
複製代碼

這裏咱們就能夠看到,前面調用worker.schedule(this),最終走到了executor.submit(sr)。這裏的sr只是前面ObserveOnObserver的包裝。executor在構造方法中建立。來看看executor是什麼:spa

//SchedulerPoolFactory類中的靜態方法
    public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        tryPutIntoPool(PURGE_ENABLED, exec);
        return exec;
    }
複製代碼
//Executors類的靜態方法
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
    
複製代碼

OK,executor是一個ScheduledThreadPoolExecutor,標準的工做線程池。核心線程數爲1,threadFactory是前面NewThreadWorker構造參數中的RxThreadFactory。他會給thread按照命名格式進行命名。線程

public final class RxThreadFactory extends AtomicLong implements ThreadFactory {

    public RxThreadFactory(String prefix) {
        this(prefix, Thread.NORM_PRIORITY, false);
    }

    public RxThreadFactory(String prefix, int priority) {
        this(prefix, priority, false);
    }

    public RxThreadFactory(String prefix, int priority, boolean nonBlocking) {
        this.prefix = prefix;
        this.priority = priority;
        this.nonBlocking = nonBlocking;
    }

    @Override
    public Thread newThread(Runnable r) {
        StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
        String name = nameBuilder.toString();
        Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
        t.setPriority(priority);
        t.setDaemon(true);
        return t;
    }
    
    ...
}
複製代碼

總結一下:

  • observeOnsubscribe方法中,新建一個worker對象。這個worker對象是根據設置的scheduler建立的。而後在新建一個ObserveOnObserver對象,將上游與之訂閱。
  • ObserveOnObserveronNext方法中,會調用worker.schedule(this),將自己做爲runnable傳入到worker中。
  • newThreadScheduler爲例,他建立的worker是一個NewThreadWorker實例。在實例構造方法中,會根據傳入的threadFactory新建一個ScheduledThreadPool線程池。
  • NewThreadWorkershedule方法,就是將ObserveOnObserver做爲一個runnable放在一個新的線程池中執行。
  • ObserveOnObserverrun方法,就是用來執行下游的onNext,將數據傳輸下去。從而達到了,切換下游onNext線程的目的。

subscribeOn

subscribeOn是用來切換上游發射器線程。切換原理上一篇有說過,其中線程池相關跟上面observeOn差很少,這裏就不贅述了。

總結

上面就是rxjava2線程切換原理分析了,後面再有人面試問你rxjava2裏面的線程池是哪種,你就能夠自信的說出:ScheduledThreadPool

相關文章
相關標籤/搜索