RxJava之Scheduler (二)

1.SingleScheduler

SingleScheduler是RxJava2新增的Scheduler。SingleScheduler中有一個屬性叫做executor,它是使用AtomicReference包裝的ScheduledExecutorService。java

補充:AtomicReference類的做用:AtomicReference則對應普通的對象引用,即保證你在修改對象引用時的線程安全性;對」對象」進行原子操做安全

final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<ScheduledExecutorService>();
複製代碼

在SingleScheduler構造函數中,Executor會調用lazySet().bash

/** * @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any * system properties for configuring new thread creation. Cannot be null. */
    public SingleScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        executor.lazySet(createExecutor(threadFactory));
    }
複製代碼

************ 分割線 ************app

其中**lazySet()**是AtomicReference中的方法,用於修改引用對象:less

// AtomicRefence類
    /** * Sets to the given value. * * @param newValue the new value */
    public final void set(V newValue) {
        value = newValue;
    }
    
    /** * Eventually sets to the given value. * * @param newValue the new value * @since 1.6 */
    public final void lazySet(V newValue) {
        U.putOrderedObject(this, VALUE, newValue);
    }
複製代碼

AtomicReferences中set()和lazySet()區別:set()馬上修改舊值,別的線程能夠馬上看到更新後的值;而lazySet()不會馬上(可是最終會)修改舊值,別的線程看到新值的時間會延遲一些。ide

************ 分割線 ************函數

它的createExecutor()用於建立工做線程,能夠看到經過SchedulerPoolFactory來建立ScheduledExecutorService。oop

// SingleScheduler類
    
    static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {
        return SchedulerPoolFactory.create(threadFactory);
    }
複製代碼

經過SchedulerPoolFactory類的create(ThreadFactory factory)來建立單線程的線程ui

// SchedulerPoolFactory類
    
    /**
     * Creates a ScheduledExecutorService with the given factory.
     * @param factory the thread factory
     * @return the ScheduledExecutorService
     */
    public static ScheduledExecutorService create(ThreadFactory factory) {
        // 建立單線程
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        
        if (exec instanceof ScheduledThreadPoolExecutor) {
            ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
            POOLS.put(e, exec);
        }
        return exec;
    }
複製代碼

在SingleScheduler中,每次使用ScheduledExecutorService時,實際上是使用executor.get()。因此說,single擁有一個線程單例。this

SingleScheduler會建立一個ScheduledWorker,ScheduledWorker使用JDK的ScheduledExecutorService做爲executor。 下面是ScheduledWorker的schedule()方法,使用ScheduledExecutorService的submit()或schedule()來執行runnable。

@NonNull
        @Override
        public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            if (disposed) {
                return EmptyDisposable.INSTANCE;
            }

            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, tasks);
            tasks.add(sr);

            try {
                Future<?> f;
                if (delay <= 0L) {
                    /**
                     * 當即執行則執行submit()方法
                     * Submits a value-returning task for execution and returns a
                     * Future representing the pending results of the task. The
                     * Future's {@code get} method will return the task's result upon
                     * successful completion.
                     */
                    f = executor.submit((Callable<Object>)sr);
                } else {
                
                    /**
                     * 需延遲執行,則執行schedule()方法
                     * Creates and executes a ScheduledFuture that becomes enabled after the
                     * given delay.
                     */
                    f = executor.schedule((Callable<Object>)sr, delay, unit);
                }

                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                dispose();
                RxJavaPlugins.onError(ex);
                return EmptyDisposable.INSTANCE;
            }

            return sr;
        }
複製代碼

2.ComputationScheduler

ComputationScheduler使用FixedSchedulerPool做爲線程池,而且FixedSchedulerPool被AtomicReference包裝了一下。

從ComputationScheduler的源碼中能夠看出,MAX_THREADSCPU的數目FixedSchedulerPool能夠理解爲擁有固定數量的線程池(有點相似線程池中的FixedThreadPool),數量爲MAX_THREADS。

static {
        MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));
        ...
    }
    
    static int cap(int cpuCount, int paramThreads) {
        return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads;
    }
複製代碼

ComputationScheduler類會建立一個EventLoopWorker。

@NonNull
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get().getEventLoop());
    }
複製代碼

其中getEventLoop()是FixedSchedulerPool中的方法,返回了FixedSchedulerPool中的一個PoolWorker。

注:FixedSchedulerPoolEventLoopWorker都爲ComputationScheduler的內部類

// EventLoopWorker類中的方法
        
        public PoolWorker getEventLoop() {
            int c = cores;
            if (c == 0) {
                return SHUTDOWN_WORKER;
            }
            // simple round robin, improvements to come
            return eventLoops[(int)(n++ % c)];
        }
複製代碼

PoolWorker繼承自NewThreadWorker,也是線程數爲1的ScheduledExecutorService。

3.IoScheduler

IoScheduler使用CachedWorkerPool做爲線程池,而且CacheWorkerPool也被AtomicReference包裝了一下。 CachedWorkerPool是基於RxThreadFactory這個ThreadFactory來建立的。

static {
        ...
        WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
        ...
        NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
        ...
    }
複製代碼

在RxThreadFactory中,由prefix和incrementAndGet()來建立新線程的名稱

@Override
    public Thread newThread(Runnable r) {
        StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());

        String name = nameBuilder.toString();
        Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
        t.setPriority(priority);
        t.setDaemon(true);
        return t;
    }
複製代碼

IoScheduler建立的線程數是不固定的,能夠經過IOScheduler的size()來獲取當前的線程數。通常狀況下,ComputationScheduler的線程數等於CPU的數目

public int size() {
        return pool.get().allWorkers.size();
    }
複製代碼

注意:ComputationSchedulerIoScheduler都是依賴線程池來維護線程的,區別在於:IoScheduler線程池中的個數是無限的,由prefix和incrementAndGet()產生的遞增值來決定線程的名字。而ComputationScheduler中則是一個固定線程數量的線程池,數量爲CPU的數目,而且不要把I/O操做放在computation()中,不然I/O操做的等待時間會浪費CPU

一樣,IoScheduler也會建立EventLoopWorker類

public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
複製代碼

但這個EventLoopWorker是IoScheduler的內部類,與ComputationScheduler建立的EventLoopWorker不一樣,知識兩者同名罷了,且都是繼承Scheduler.WOrker類而已。

4.NewThreadScheduler

NewThrScheduler會建立NewThreadWorker,NewThreadWorker的構成函數使用的也是SchedulerPoolFactory。

public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
複製代碼

與SingleScheduler不一樣的是,SingleScheduler的executor是使用AtomicReference包裝的SchedulerExecutorService。每次使用時,都會調用executor.get()。

然而,NewThreadScheduler每次都會建立一個新的線程

我的這塊不太懂,按照自我理解分析下:在SingleScheduler由於該調度器中只有一個線程,於是在後續調用,須要保證該對象惟一且保證先後一致,於是使用AtomicReference保證其餘線程能知道;而NewThreadScheduler中每次都會建立新的線程,於是無需保證線程同步,不用管是否讓其餘線程知道。

5.TrampolineScheduler

TrampolineScheduler會建立TrampolineWorker,在TrampolineWorker內部維護着一個PriorityBlockingQueue。任務進入該隊列以前,會先用TimedRunnable封裝下。

static final class TimedRunnable implements Comparable<TimedRunnable> {
        final Runnable run;
        final long execTime;
        final int count; // In case if time between enqueueing took less than 1ms

        volatile boolean disposed;

        TimedRunnable(Runnable run, Long execTime, int count) {
            this.run = run;
            this.execTime = execTime;
            this.count = count;
        }

        @Override
        public int compareTo(TimedRunnable that) {
            int result = ObjectHelper.compare(execTime, that.execTime);
            if (result == 0) {
                return ObjectHelper.compare(count, that.count);
            }
            return result;
        }
    }
複製代碼

能夠看到TimeRunnable實現了Comparable接口,會比較任務的execTime和count。 任務在進入queue以前,count每次都會+1.

final TimedRunnable timedRunnable = new TimedRunnable(action, execTime, counter.incrementAndGet());
    queue.add(timedRunnable);
複製代碼

因此,在使用TrampolineScheduler時,新的任務總會優先執行

相關文章
相關標籤/搜索