博客主頁java
RxJava 是一個爲異步編程而實現的庫,異步是其重要特點,合理地利用異步編程可以提升系統的處理速度。可是異步也會帶來線程的安全問題,並且異步並不等於併發,與異步概念相對應的是同步。編程
在默認狀況下, RxJava 只在當前線程中運行,它是單線程的。此時 Observable 用於發射數據流,Observer 用於接收和響應數據流,各類操做符( Operators )用於加工數據流,它們都在同一個線程中運行,實現出來的是一個同步的函數響應式。然而,函數響應式的實際應用是大部分操做都在後臺處理,前臺響應的一個過程。因此須要對剛纔的流程作一下修改,改爲 Observable 生成發射數據流, Operators 加工數據流在後臺線程中進行, Observer 在前臺線程中接收井響應數據。此時會涉及使用多線程來操做 RxJava ,咱們可使用 RxJava 的調度器(Scheduler)來實現。segmentfault
Scheduler 是 RxJava 對線程控制器的一個抽象, RxJava 內置了多個 Scheduler 的實現,它們基本知足絕大多數使用場景。
若是內置的 Scheduler 不能知足業務需求,那麼可使用自定義的 Executor 做爲調度器,以知足個性化需求。安全
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { Log.d(TAG, "subscribe: " + Thread.currentThread().getName()); emitter.onNext("hello"); emitter.onNext("world"); } }).observeOn(Schedulers.newThread()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, Thread.currentThread().getName() + "#Next: " + s); } }); // 執行結果 subscribe: main RxNewThreadScheduler-1#Next: hello RxNewThreadScheduler-1#Next: world
這裏的 Observable 發射完數據以後,切換到 newThread 。後面的兩次打印都是在 newThread 中進行的。多線程
RxJava 的被觀察者們在使用操做符時能夠利用線程調度器——Scheduler 來切換線程併發
Observable.just("aaa", "bbb") .observeOn(Schedulers.newThread()) .map(new Function<String, String>() { @Override public String apply(String s) throws Exception { Log.d(TAG, "apply: " + Thread.currentThread().getName()); return s.toUpperCase(); } }).subscribeOn(Schedulers.single()) .observeOn(Schedulers.io()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, Thread.currentThread().getName() + "#Next: " + s); } }); // 執行結果 apply: RxNewThreadScheduler-1 apply: RxNewThreadScheduler-1 RxCachedThreadScheduler-1#Next: AAA RxCachedThreadScheduler-1#Next: BBB
其中,藍色表示主線程、橙色表示 newThread 、粉色表示 I/O 線程app
Schedulers 一個靜態工廠類,經過分析 Schedulers 的源碼能夠看到它有多種不一樣類型的 Scheduler 。下面是 Schedulers 的各個工廠方法。less
computation() 用於 CPU 密集型的計算任務,但井不適合 I/O 操做異步
public static Scheduler computation() { return RxJavaPlugins.onComputationScheduler(COMPUTATION); }
io() 用於 I/O 密集型任務,支持異步阻塞 I/O 操做,這個調度器的線程池會根據須要增加。對於普通的計算任務,請使用 Schedulers.computation()。ide
public static Scheduler io() { return RxJavaPlugins.onIoScheduler(IO); }
在 RxJava 2 中與在 RxJava 1 中的做用不一樣 。在 RxJava 2 表示當即執行,若是當前線程有任務在執行,則會將其暫停,等插入進來的新任務執行完成以後,再接着執行原先未完成的任務。在 RxJava 1 中,表示在當前線程中等待其餘任務完成以後,再執行新的任務
public static Scheduler trampoline() { return TRAMPOLINE; }
newThread() 爲每一個任務建立一個新線程。
public static Scheduler newThread() { return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD); }
single() 擁有一個線程單例,全部的任務都在這一個錢程中執行。當此線程中有任務執行時,它的任務將會按照先進先出的順序依次執行。
public static Scheduler single() { return RxJavaPlugins.onSingleScheduler(SINGLE); }
public static Scheduler from(@NonNull Executor executor) { return new ExecutorScheduler(executor, false); }
Scheduler 是 RxJava 的線程任務調度器, Worker 是線程任務的具體執行者。從 Scheduler 源碼能夠看到, Scheduler 在 scheduleDirect()、 schedulePeriodicallyDirect() 方法中建立了 Worker,而後會分別調用 Worker 的 schedule()、 schedulePeriodically() 來執行任務.
// Scheduler.java public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; } public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w); Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit); if (d == EmptyDisposable.INSTANCE) { return d; } return periodicTask; }
Worker 是一個抽象類,每種 Scheduler 會對應一種具體的 Worker
public abstract static class Worker implements Disposable { @NonNull public Disposable schedule(@NonNull Runnable run) { return schedule(run, 0L, TimeUnit.NANOSECONDS); } @NonNull public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit); @NonNull public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) { final SequentialDisposable first = new SequentialDisposable(); final SequentialDisposable sd = new SequentialDisposable(first); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); final long periodInNanoseconds = unit.toNanos(period); final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS); final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay); Disposable d = schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd, periodInNanoseconds), initialDelay, unit); if (d == EmptyDisposable.INSTANCE) { return d; } first.replace(d); return sd; } }
SingleScheduler 是 RxJava 2 新增 Scheduler。SingleScheduler 中有一個屬性叫做 executor,它是使用 AtomicReference 包裝 ScheduledExecutorService
// SingleScheduler.java final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<ScheduledExecutorService>();
在 SingleScheduler 構造函數中, executor 會調用 lazySet()
public SingleScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; executor.lazySet(createExecutor(threadFactory)); }
它的 createExecutor() 用於建立工做線程,看到經過 SchedulerPoolFactory 來建立 ScheduledExecutorService
static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) { return SchedulerPoolFactory.create(threadFactory); }
在 SchedulerPoolFactory 類的 create(ThreadFactory factory) 中,使用 newScheduledThreadPool 線程池定義定時器,最大容許線程數爲 1
public static ScheduledExecutorService create(ThreadFactory factory) { final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory); tryPutIntoPool(PURGE_ENABLED, exec); return exec; }
在 SingleScheduler 中, 每次使用 ScheduledExecutorService 時,實際上是使用 executor.get()。因此說, single 擁有一個線程單例
SingleScheduler 會建立一個 ScheduledWorker, ScheduledWorker 使用 JDK 的ScheduledExecutorService 做爲 executor
下面是 ScheduledWorker 的 schedule() 方法, 使用 ScheduledExecutorService 的 submit() 或者 schedule() 來執行 runnable
public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { if (disposed) { return EmptyDisposable.INSTANCE; } Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, tasks); tasks.add(sr); try { Future<?> f; if (delay <= 0L) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delay, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { dispose(); RxJavaPlugins.onError(ex); return EmptyDisposable.INSTANCE; } return sr; }
ComputationScheduler 使用 FixedSchedulerPool 做爲線程池,井且 FixedSchedulerPool 被
AtomicReference 裝了一下。
從 ComputationScheduler 的源碼中能夠看出, MAX_THREADS 是 CPU 的數目。 FixedSchedulerPool 能夠理解爲擁有固定數量的線程池,數量爲 MAX_THREADS
static { MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0)); // ... } static int cap(int cpuCount, int paramThreads) { return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads; }
ComputationScheduler 會建立一個 EventLoopWorker
public Worker createWorker() { return new EventLoopWorker(pool.get().getEventLoop()); }
其中 getEventLoop() 是 FixedSchedulerPool 中的方法,返回了 FixedSchedulerPool 中的一個
PoolWorker
public PoolWorker getEventLoop() { int c = cores; if (c == 0) { return SHUTDOWN_WORKER; } // simple round robin, improvements to come return eventLoops[(int)(n++ % c)]; }
PoolWorker 繼承自 NewThreadWorker, 也是線程數爲 1 的 ScheduledExecutorService
IoScheduler 使用 CachedWorkerPool 做爲線程池,井且 CachedWorkerPool 也被 AtomicReference 包裝了。
CachedWorkerPool 是基於 RxThreadFactory 這個 ThreadFactory 來建立的
static { // ... WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);; NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY); // ... }
在 RxThreadFactory 中, prefix 和 incrementAndGet() 來建立新線程的名稱
@Override public Thread newThread(Runnable r) { StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet()); String name = nameBuilder.toString(); Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name); t.setPriority(priority); t.setDaemon(true); return t; }
IoScheduler 建立的線程數是不固定的,能夠經過 IoScheduler 的 size() 來得到當前的線程數。通常狀況下, ComputationScheduler 的線程數等於 CPU 的數目
public int size() { return pool.get().allWorkers.size(); }
須要特別注意的是, ComputationScheduler 和 IoScheduler 都是依賴線程池來維護線程的,區別就是 IoScheduler 線程池中的個數是無限的,由 prefix 和 incrementAndGet()產生的遞增值來決定線程的名字。而 ComputationScheduler 中則是一個固定線程數量的線程池,數據爲 CPU 數目,而且不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待時間會浪費 CPU
一樣, IoScheduler 會建立 EventLoopWorker
public Worker createWorker() { return new EventLoopWorker(pool.get()); }
但這個 EventLoopWorker 是 IoScheduler 的內部類,與 ComputationScheduler 建立的 EventLoopWorker 不一樣。只是兩者的名稱相同。
NewThreadScheduler 會建立 NewThreadWorker, NewThreadWorker 的構造函數使用的也是 SchedulerPoolFactory
public NewThreadWorker(ThreadFactory threadFactory) { executor = SchedulerPoolFactory.create(threadFactory); }
與 SingleScheduler 不一樣的是, SingleScheduler 的 executor 是使用 AtomicReference 包裝的 ScheduledExecutorService。每次使用時,都會調用 executor.get()
然而, NewThreadScheduler 每次都會建立一個新的線程。
TrampolineScheduler 會建立 TrampolineWorker,在 TrampolineWorker 內部維護着一個 PriorityBlockingQueue 。任務進入該隊列以前,會先用 TimedRunnable 封裝一下。
static final class TimedRunnable implements Comparable<TimedRunnable> { final Runnable run; final long execTime; final int count; // In case if time between enqueueing took less than 1ms volatile boolean disposed; TimedRunnable(Runnable run, Long execTime, int count) { this.run = run; this.execTime = execTime; this.count = count; } @Override public int compareTo(TimedRunnable that) { int result = ObjectHelper.compare(execTime, that.execTime); if (result == 0) { return ObjectHelper.compare(count, that.count); } return result; } }
能夠看到 TimedRunnable 實現了 Comparable 接口,會比較任務的 execTime 和 count
任務在進入 queue 以前, count 每次都會 +1
final TimedRunnable timedRunnable = new TimedRunnable(action, execTime, counter.incrementAndGet()); queue.add(timedRunnable);
因此,在使用 TrampolineScheduler 時,新的任務老是會優先執行。
默認狀況下不作任何線程處理, Observable 和 Observer 處於同一線程中。若是想要切換線程,則可使用 subscribeOn() 和 observeOn()。
subscribeOn 經過接收一個 Scheduler 參數,來指定對數據的處理運行在特定的線程調度器 Scheduler 上
若屢次執行 subscribeOn ,則只有一次起做用
subscribeOn 的源碼能夠看到,每次調用 subscribeOn 都會建立一個 ObservableSubscribeOn 對象。
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
ObservableSubscribeOn 真正發生訂閱的方法是 subscribeActual(observer)
@Override public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); observer.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
其中 SubscribeOnObserver 是下游的 Observer 經過裝飾器模式生成的,它實現了 Observer、Disposable 接口。
接下來,在上游的線程中執行下游 Observer 的 onSubscribe(Disposable d)方法。
observer.onSubscribe(parent);
而後,將子線程的操做加入 Disposable 管理中, 加入 Disposable 後能夠方便上下游的統一管理。
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
在這裏,已經調用了對應 scheduler 的 scheduleDirect 方法。 scheduleDirect() 傳入的是一個Runnable ,也就是下面的 SubscribeTask
final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); } }
SubscribeTask 會執行 run() 對上游的 Observable,從而進行訂閱。
此時,己經在對應的 Scheduler 線程中運行了
source.subscribe(parent);
在 RxJava 鏈式操做中,數據的處理是自下而下。若是屢次調用 subscribeOn,則最上面的線程切換最晚執行,因此就變成了只有第一次切換線程纔有效。
observeOn 一樣接收一個 Scheduler 參數,用來指定下游操做運行在特定的線程調度器 Scheduler 上。
若屢次執行 observeOn,則每次都起做用,線程會一直切換。
observeOn() 的源碼能夠看到,每次調用 observeOn() 都會建立 ObservableObserveOn 對象
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); }
ObservableObserveOn 真正發生訂閱的方法是 subscribeActual(observer)
protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } }
若是 scheduler 是 TrampolineScheduler,則上游事件和下游事件會當即產生訂閱。
若是不 TrampolineScheduler,則 scheduler 會建立本身的 Worker,而後上游事件和下游事件產生訂閱,生成一個 ObserveOnObserver 對象,封裝了下游真正的 Observer
ObserveOnObserver 是 ObservableObserveOn 內部類,實現了 Observer、Runnable 接口。與 SubscribeOnObserver 不一樣的是, SubscribeOnObserver 實現了 Observer、Disposable 接口
在 ObserveOnObserver 的 onNext 中, schedule() 執行了具體調度的方法
public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } }
其中 worker 是當前 scheduler 建立的 Worker,this 指的是當前的 ObserveOnObserver 對象,
this 也實現了 Runnable 接口
再來看看 Runnable 接口的實現方法 run(),這個方法是在 Worker 對應的線程裏執行的。drainNormal 會取出 ObserveOnObserver 的 queue 裏的數據進行發送。
@Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } }
若下游屢次調用 observeOn(),則線程會一直切換。每次切換線程,都會把對應的 Observer 對象的各個方法的處理執行在指定的線程中。
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { Log.d(TAG, "subscribe: " + Thread.currentThread().getName()); emitter.onNext("hello"); emitter.onNext("world"); } }).subscribeOn(Schedulers.newThread()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, Thread.currentThread().getName() + "#Next: " + s); } }); // 執行結果 subscribe: RxNewThreadScheduler-1 RxNewThreadScheduler-1#Next: hello RxNewThreadScheduler-1#Next: world
全部的操做都走在 newThread 運行的,包括髮射數據。
屢次調用 subscribeOn 和 observeOn 的例子
Observable.just("HELLO#1", "HELLO#2") .subscribeOn(Schedulers.single()) .map(new Function<String, String>() { @Override public String apply(String s) throws Exception { s = s.toLowerCase(); Log.d(TAG, "map##1 threadName: " + Thread.currentThread().getName() + " s:" + s); return s; } }).observeOn(Schedulers.io()) .map(new Function<String, String>() { @Override public String apply(String s) throws Exception { s = s + " RxJava."; Log.d(TAG, "map##2 threadName: " + Thread.currentThread().getName() + " s:" + s); return s; } }) .subscribeOn(Schedulers.computation()) .map(new Function<String, String>() { @Override public String apply(String s) throws Exception { s = s + "it is a test."; Log.d(TAG, "map##3 threadName: " + Thread.currentThread().getName() + " s:" + s); return s; } }) .observeOn(Schedulers.newThread()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "subscribe threadName:" + Thread.currentThread().getName() + "#Next: " + s); } }); //執行結果 map##1 threadName: RxSingleScheduler-1 s:hello#1 map##1 threadName: RxSingleScheduler-1 s:hello#2 map##2 threadName: RxCachedThreadScheduler-1 s:hello#1 RxJava. map##3 threadName: RxCachedThreadScheduler-1 s:hello#1 RxJava.it is a test. map##2 threadName: RxCachedThreadScheduler-1 s:hello#2 RxJava. map##3 threadName: RxCachedThreadScheduler-1 s:hello#2 RxJava.it is a test. subscribe threadName:RxNewThreadScheduler-1#Next: hello#1 RxJava.it is a test. subscribe threadName:RxNewThreadScheduler-1#Next: hello#2 RxJava.it is a test.
TestScheduler 是專門用於測試的調度器,與其餘調度器的區別是,TestScheduler 只有被調用了時間纔會繼續。 TestScheduler 是一種特殊的、非線程安全的調度器,用於測試一些不引入真實併發性、容許手動推動虛擬時間的調度器。
在 RxJava 2.x 中,原先 RxJava l.x 的 Schedulers.test() 被去掉了 。要想得到 TestScheduler 對象,則能夠經過直接 new TestScheduler() 的方式來實現。
TestScheduler 所包含的方法井很少,下面羅列幾個關鍵的方法。
將調度器的時鐘移動到某個特定時刻。
如,時鐘移動到 lOms.
scheduler.advanceTimeTo(10, TimeUnit.MILLISECONDS);
下例展現了 0s、20s、40s 各會打印什麼結果
TestScheduler scheduler = new TestScheduler(); scheduler.createWorker().schedule(new Runnable() { @Override public void run() { Log.d(TAG, "immediate"); } }); scheduler.createWorker().schedule(new Runnable() { @Override public void run() { Log.d(TAG, "20s"); } }, 20, TimeUnit.SECONDS); scheduler.createWorker().schedule(new Runnable() { @Override public void run() { Log.d(TAG, "40s"); } }, 40, TimeUnit.SECONDS); scheduler.advanceTimeTo(1, TimeUnit.MILLISECONDS); Log.d(TAG, "virtual time :" + scheduler.now(TimeUnit.MILLISECONDS)); scheduler.advanceTimeTo(20, TimeUnit.SECONDS); Log.d(TAG, "virtual time :" + scheduler.now(TimeUnit.SECONDS)); scheduler.advanceTimeTo(40, TimeUnit.SECONDS); Log.d(TAG, "virtual time :" + scheduler.now(TimeUnit.SECONDS)); // 執行結果 immediate virtual time :1 20s virtual time :20 40s virtual time :40
使用 advanceTimeTo 以後,移動不一樣的時間點會打印不一樣的內容。
將調度程序的時鐘按指定的時間向前移動
例如,時鐘移動了 lOms
scheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);
再次調用剛纔的方法,時鐘又會移動 lOms。此時,時鐘移動到 20ms,這是一個累加的過程。
下例,使用了 timer 操做符, timer 是按照指定時間延遲發送的操做符,timer() 井不會按週期地執行。該例子展現了 2s 後 atomicLong 會自動加1
TestScheduler scheduler = new TestScheduler(); final AtomicLong atomicLong = new AtomicLong(); Observable.timer(2, TimeUnit.SECONDS, scheduler).subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { atomicLong.incrementAndGet(); } }); Log.d(TAG, "atomicLong's value=" + atomicLong.get() + ", virtual time:" + scheduler.now(TimeUnit.SECONDS)); scheduler.advanceTimeBy(1, TimeUnit.SECONDS); Log.d(TAG, "atomicLong's value=" + atomicLong.get() + ", virtual time:" + scheduler.now(TimeUnit.SECONDS)); scheduler.advanceTimeBy(1, TimeUnit.SECONDS); Log.d(TAG, "atomicLong's value=" + atomicLong.get() + ", virtual time:" + scheduler.now(TimeUnit.SECONDS)); // 執行結果 atomicLong's value=0, virtual time:0 atomicLong's value=0, virtual time:1 atomicLong's value=1, virtual time:2
這個結果符合預期,最初 atomicLong 爲 0,時鐘移動到 1s 時它的值仍然爲 0;時鐘再移動 ls ,即至關於時鐘移動到 2s 因此它的值變爲 1
advanceTimeBy() 也能夠傳負數,表示回到過去。
triggerActions 不會修改時間,它執行計劃中的可是未啓動的任務,已經執行過的任務不會再啓動。
TestScheduler scheduler = new TestScheduler(); scheduler.createWorker().schedule(new Runnable() { @Override public void run() { Log.d(TAG, "immediate"); } }); scheduler.createWorker().schedule(new Runnable() { @Override public void run() { Log.d(TAG, "20s"); } }, 20, TimeUnit.SECONDS); Log.d(TAG, "virtual time :" + scheduler.now(TimeUnit.MILLISECONDS)); // 執行結果 virtual time :0
增長 scheduler.triggerActions() 後
TestScheduler scheduler = new TestScheduler(); scheduler.createWorker().schedule(new Runnable() { @Override public void run() { Log.d(TAG, "immediate"); } }); scheduler.createWorker().schedule(new Runnable() { @Override public void run() { Log.d(TAG, "20s"); } }, 20, TimeUnit.SECONDS); scheduler.triggerActions(); Log.d(TAG, "virtual time :" + scheduler.now(TimeUnit.MILLISECONDS)); // 執行結果 immediate virtual time :0
再增長 advanceTimeBy()
TestScheduler scheduler = new TestScheduler(); scheduler.createWorker().schedule(new Runnable() { @Override public void run() { Log.d(TAG, "immediate"); } }); scheduler.createWorker().schedule(new Runnable() { @Override public void run() { Log.d(TAG, "20s"); } }, 20, TimeUnit.SECONDS); scheduler.triggerActions(); Log.d(TAG, "virtual time :" + scheduler.now(TimeUnit.MILLISECONDS)); scheduler.advanceTimeBy(20, TimeUnit.SECONDS); // 執行結果 immediate virtual time :0 20s
若是將 triggerActions() 放在最後, 看看效果。
TestScheduler scheduler = new TestScheduler(); scheduler.createWorker().schedule(new Runnable() { @Override public void run() { Log.d(TAG, "immediate"); } }); scheduler.createWorker().schedule(new Runnable() { @Override public void run() { Log.d(TAG, "20s"); } }, 20, TimeUnit.SECONDS); Log.d(TAG, "virtual time :" + scheduler.now(TimeUnit.MILLISECONDS)); scheduler.advanceTimeBy(20, TimeUnit.SECONDS); scheduler.triggerActions(); // 執行結果 virtual time :0 immediate 20s
由於己經使用了 advanceTimeBy(),因此即便再調用 triggerActions(),也不會執行己經啓動過的任務。
若是個人文章對您有幫助,不妨點個贊鼓勵一下(^_^)