RxJava(四):線程操做

博客主頁java

1. 調度器(Scheduler)的種類

1.1 RxJava 線程介紹

RxJava 是一個爲異步編程而實現的庫,異步是其重要特點,合理地利用異步編程可以提升系統的處理速度。可是異步也會帶來線程的安全問題,並且異步並不等於併發,與異步概念相對應的是同步。編程

在默認狀況下, RxJava 只在當前線程中運行,它是單線程的。此時 Observable 用於發射數據流,Observer 用於接收和響應數據流,各類操做符( Operators )用於加工數據流,它們都在同一個線程中運行,實現出來的是一個同步的函數響應式。然而,函數響應式的實際應用是大部分操做都在後臺處理,前臺響應的一個過程。因此須要對剛纔的流程作一下修改,改爲 Observable 生成發射數據流, Operators 加工數據流在後臺線程中進行, Observer 在前臺線程中接收井響應數據。此時會涉及使用多線程來操做 RxJava ,咱們可使用 RxJava 的調度器(Scheduler)來實現。segmentfault

1.2 Scheduler

Scheduler 是 RxJava 對線程控制器的一個抽象, RxJava 內置了多個 Scheduler 的實現,它們基本知足絕大多數使用場景。

若是內置的 Scheduler 不能知足業務需求,那麼可使用自定義的 Executor 做爲調度器,以知足個性化需求。安全

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        Log.d(TAG, "subscribe: " + Thread.currentThread().getName());
        emitter.onNext("hello");
        emitter.onNext("world");
    }
}).observeOn(Schedulers.newThread())
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, Thread.currentThread().getName() + "#Next: " + s);
            }
        });

// 執行結果
 subscribe: main
 RxNewThreadScheduler-1#Next: hello
 RxNewThreadScheduler-1#Next: world

這裏的 Observable 發射完數據以後,切換到 newThread 。後面的兩次打印都是在 newThread 中進行的。多線程

2. RxJava 線程模型

RxJava 的被觀察者們在使用操做符時能夠利用線程調度器——Scheduler 來切換線程併發

Observable.just("aaa", "bbb")
        .observeOn(Schedulers.newThread())
        .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                Log.d(TAG, "apply: " + Thread.currentThread().getName());
                return s.toUpperCase();
            }
        }).subscribeOn(Schedulers.single())
        .observeOn(Schedulers.io())
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, Thread.currentThread().getName() + "#Next: " + s);
            }
        });

// 執行結果
 apply: RxNewThreadScheduler-1
 apply: RxNewThreadScheduler-1
 RxCachedThreadScheduler-1#Next: AAA
 RxCachedThreadScheduler-1#Next: BBB


其中,藍色表示主線程、橙色表示 newThread 、粉色表示 I/O 線程app

2.1 線程調度器

Schedulers 一個靜態工廠類,經過分析 Schedulers 的源碼能夠看到它有多種不一樣類型的 Scheduler 。下面是 Schedulers 的各個工廠方法。less

2.1.1 computation()

computation() 用於 CPU 密集型的計算任務,但井不適合 I/O 操做異步

public static Scheduler computation() {
    return RxJavaPlugins.onComputationScheduler(COMPUTATION);
}

2.1.2 io()

io() 用於 I/O 密集型任務,支持異步阻塞 I/O 操做,這個調度器的線程池會根據須要增加。對於普通的計算任務,請使用 Schedulers.computation()。ide

public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}

2.1.3 trampoline()

在 RxJava 2 中與在 RxJava 1 中的做用不一樣 。在 RxJava 2 表示當即執行,若是當前線程有任務在執行,則會將其暫停,等插入進來的新任務執行完成以後,再接着執行原先未完成的任務。在 RxJava 1 中,表示在當前線程中等待其餘任務完成以後,再執行新的任務

public static Scheduler trampoline() {
    return TRAMPOLINE;
}

2.1.4 newThread()

newThread() 爲每一個任務建立一個新線程。

public static Scheduler newThread() {
    return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}

2.1.5 single()

single() 擁有一個線程單例,全部的任務都在這一個錢程中執行。當此線程中有任務執行時,它的任務將會按照先進先出的順序依次執行。

public static Scheduler single() {
    return RxJavaPlugins.onSingleScheduler(SINGLE);
}

2.1.6 自定義的 Executor 來做爲調度器

public static Scheduler from(@NonNull Executor executor) {
    return new ExecutorScheduler(executor, false);
}

2.2 Scheduler源碼

Scheduler 是 RxJava 的線程任務調度器, Worker 是線程任務的具體執行者。從 Scheduler 源碼能夠看到, Scheduler 在 scheduleDirect()、 schedulePeriodicallyDirect() 方法中建立了 Worker,而後會分別調用 Worker 的 schedule()、 schedulePeriodically() 來執行任務.

// Scheduler.java

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}

public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);

    Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
    if (d == EmptyDisposable.INSTANCE) {
        return d;
    }

    return periodicTask;
}

Worker 是一個抽象類,每種 Scheduler 會對應一種具體的 Worker

public abstract static class Worker implements Disposable {
   
    @NonNull
    public Disposable schedule(@NonNull Runnable run) {
        return schedule(run, 0L, TimeUnit.NANOSECONDS);
    }

    @NonNull
    public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);

    @NonNull
    public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
        final SequentialDisposable first = new SequentialDisposable();

        final SequentialDisposable sd = new SequentialDisposable(first);

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        final long periodInNanoseconds = unit.toNanos(period);
        final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
        final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);

        Disposable d = schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
                periodInNanoseconds), initialDelay, unit);

        if (d == EmptyDisposable.INSTANCE) {
            return d;
        }
        first.replace(d);

        return sd;
    }
}

2.2.1 SingleScheduler

SingleScheduler 是 RxJava 2 新增 Scheduler。SingleScheduler 中有一個屬性叫做 executor,它是使用 AtomicReference 包裝 ScheduledExecutorService

// SingleScheduler.java

final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<ScheduledExecutorService>();

在 SingleScheduler 構造函數中, executor 會調用 lazySet()

public SingleScheduler(ThreadFactory threadFactory) {
    this.threadFactory = threadFactory;
    executor.lazySet(createExecutor(threadFactory));
}

它的 createExecutor() 用於建立工做線程,看到經過 SchedulerPoolFactory 來建立 ScheduledExecutorService

static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {
    return SchedulerPoolFactory.create(threadFactory);
}

在 SchedulerPoolFactory 類的 create(ThreadFactory factory) 中,使用 newScheduledThreadPool 線程池定義定時器,最大容許線程數爲 1

public static ScheduledExecutorService create(ThreadFactory factory) {
    final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
    tryPutIntoPool(PURGE_ENABLED, exec);
    return exec;
}

在 SingleScheduler 中, 每次使用 ScheduledExecutorService 時,實際上是使用 executor.get()。因此說, single 擁有一個線程單例

SingleScheduler 會建立一個 ScheduledWorker, ScheduledWorker 使用 JDK 的ScheduledExecutorService 做爲 executor

下面是 ScheduledWorker 的 schedule() 方法, 使用 ScheduledExecutorService 的 submit() 或者 schedule() 來執行 runnable

public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    if (disposed) {
        return EmptyDisposable.INSTANCE;
    }

    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, tasks);
    tasks.add(sr);

    try {
        Future<?> f;
        if (delay <= 0L) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delay, unit);
        }

        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        dispose();
        RxJavaPlugins.onError(ex);
        return EmptyDisposable.INSTANCE;
    }

    return sr;
}

2.2.2 ComputationScheduler

ComputationScheduler 使用 FixedSchedulerPool 做爲線程池,井且 FixedSchedulerPool 被
AtomicReference 裝了一下。

從 ComputationScheduler 的源碼中能夠看出, MAX_THREADS 是 CPU 的數目。 FixedSchedulerPool 能夠理解爲擁有固定數量的線程池,數量爲 MAX_THREADS

static {
    MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));
    // ... 
}

static int cap(int cpuCount, int paramThreads) {
    return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads;
}

ComputationScheduler 會建立一個 EventLoopWorker

public Worker createWorker() {
    return new EventLoopWorker(pool.get().getEventLoop());
}

其中 getEventLoop() 是 FixedSchedulerPool 中的方法,返回了 FixedSchedulerPool 中的一個
PoolWorker

public PoolWorker getEventLoop() {
    int c = cores;
    if (c == 0) {
        return SHUTDOWN_WORKER;
    }
    // simple round robin, improvements to come
    return eventLoops[(int)(n++ % c)];
}

PoolWorker 繼承自 NewThreadWorker, 也是線程數爲 1 的 ScheduledExecutorService

2.2.3 IoScheduler

IoScheduler 使用 CachedWorkerPool 做爲線程池,井且 CachedWorkerPool 也被 AtomicReference 包裝了。

CachedWorkerPool 是基於 RxThreadFactory 這個 ThreadFactory 來建立的

static {
    // ... 

    WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);;

    NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
    // ...
}

在 RxThreadFactory 中, prefix 和 incrementAndGet() 來建立新線程的名稱

@Override
public Thread newThread(Runnable r) {
    StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());

    String name = nameBuilder.toString();
    Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
    t.setPriority(priority);
    t.setDaemon(true);
    return t;
}

IoScheduler 建立的線程數是不固定的,能夠經過 IoScheduler 的 size() 來得到當前的線程數。通常狀況下, ComputationScheduler 的線程數等於 CPU 的數目

public int size() {
    return pool.get().allWorkers.size();
}

須要特別注意的是, ComputationScheduler 和 IoScheduler 都是依賴線程池來維護線程的,區別就是 IoScheduler 線程池中的個數是無限的,由 prefix 和 incrementAndGet()產生的遞增值來決定線程的名字。而 ComputationScheduler 中則是一個固定線程數量的線程池,數據爲 CPU 數目,而且不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待時間會浪費 CPU

一樣, IoScheduler 會建立 EventLoopWorker

public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

但這個 EventLoopWorker 是 IoScheduler 的內部類,與 ComputationScheduler 建立的 EventLoopWorker 不一樣。只是兩者的名稱相同。

2.2.4 NewThreadScheduler

NewThreadScheduler 會建立 NewThreadWorker, NewThreadWorker 的構造函數使用的也是 SchedulerPoolFactory

public NewThreadWorker(ThreadFactory threadFactory) {
    executor = SchedulerPoolFactory.create(threadFactory);
}

與 SingleScheduler 不一樣的是, SingleScheduler 的 executor 是使用 AtomicReference 包裝的 ScheduledExecutorService。每次使用時,都會調用 executor.get()

然而, NewThreadScheduler 每次都會建立一個新的線程。

2.2.5 TrampolineScheduler

TrampolineScheduler 會建立 TrampolineWorker,在 TrampolineWorker 內部維護着一個 PriorityBlockingQueue 。任務進入該隊列以前,會先用 TimedRunnable 封裝一下。

static final class TimedRunnable implements Comparable<TimedRunnable> {
    final Runnable run;
    final long execTime;
    final int count; // In case if time between enqueueing took less than 1ms

    volatile boolean disposed;

    TimedRunnable(Runnable run, Long execTime, int count) {
        this.run = run;
        this.execTime = execTime;
        this.count = count;
    }

    @Override
    public int compareTo(TimedRunnable that) {
        int result = ObjectHelper.compare(execTime, that.execTime);
        if (result == 0) {
            return ObjectHelper.compare(count, that.count);
        }
        return result;
    }
}

能夠看到 TimedRunnable 實現了 Comparable 接口,會比較任務的 execTime 和 count

任務在進入 queue 以前, count 每次都會 +1

final TimedRunnable timedRunnable = new TimedRunnable(action, execTime, counter.incrementAndGet());
 queue.add(timedRunnable);

因此,在使用 TrampolineScheduler 時,新的任務老是會優先執行。

2.3 線程調度

默認狀況下不作任何線程處理, Observable 和 Observer 處於同一線程中。若是想要切換線程,則可使用 subscribeOn() 和 observeOn()。

2.3.1 subscribeOn

subscribeOn 經過接收一個 Scheduler 參數,來指定對數據的處理運行在特定的線程調度器 Scheduler 上

若屢次執行 subscribeOn ,則只有一次起做用

subscribeOn 的源碼能夠看到,每次調用 subscribeOn 都會建立一個 ObservableSubscribeOn 對象。

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

ObservableSubscribeOn 真正發生訂閱的方法是 subscribeActual(observer)

@Override
public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

    observer.onSubscribe(parent);

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

其中 SubscribeOnObserver 是下游的 Observer 經過裝飾器模式生成的,它實現了 Observer、Disposable 接口。

接下來,在上游的線程中執行下游 Observer 的 onSubscribe(Disposable d)方法。

observer.onSubscribe(parent);

而後,將子線程的操做加入 Disposable 管理中, 加入 Disposable 後能夠方便上下游的統一管理。

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

在這裏,已經調用了對應 scheduler 的 scheduleDirect 方法。 scheduleDirect() 傳入的是一個Runnable ,也就是下面的 SubscribeTask

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}

SubscribeTask 會執行 run() 對上游的 Observable,從而進行訂閱。

此時,己經在對應的 Scheduler 線程中運行了

source.subscribe(parent);

在 RxJava 鏈式操做中,數據的處理是自下而下。若是屢次調用 subscribeOn,則最上面的線程切換最晚執行,因此就變成了只有第一次切換線程纔有效。

2.3.2 observeOn

observeOn 一樣接收一個 Scheduler 參數,用來指定下游操做運行在特定的線程調度器 Scheduler 上。

若屢次執行 observeOn,則每次都起做用,線程會一直切換。

observeOn() 的源碼能夠看到,每次調用 observeOn() 都會建立 ObservableObserveOn 對象

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

ObservableObserveOn 真正發生訂閱的方法是 subscribeActual(observer)

protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();

        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

若是 scheduler 是 TrampolineScheduler,則上游事件和下游事件會當即產生訂閱。

若是不 TrampolineScheduler,則 scheduler 會建立本身的 Worker,而後上游事件和下游事件產生訂閱,生成一個 ObserveOnObserver 對象,封裝了下游真正的 Observer

ObserveOnObserver 是 ObservableObserveOn 內部類,實現了 Observer、Runnable 接口。與 SubscribeOnObserver 不一樣的是, SubscribeOnObserver 實現了 Observer、Disposable 接口

在 ObserveOnObserver 的 onNext 中, schedule() 執行了具體調度的方法

public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

其中 worker 是當前 scheduler 建立的 Worker,this 指的是當前的 ObserveOnObserver 對象,
this 也實現了 Runnable 接口

再來看看 Runnable 接口的實現方法 run(),這個方法是在 Worker 對應的線程裏執行的。drainNormal 會取出 ObserveOnObserver 的 queue 裏的數據進行發送。

@Override
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

若下游屢次調用 observeOn(),則線程會一直切換。每次切換線程,都會把對應的 Observer 對象的各個方法的處理執行在指定的線程中。

2.4 示例

2.4.1 單獨使用subscribeOn

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        Log.d(TAG, "subscribe: " + Thread.currentThread().getName());
        emitter.onNext("hello");
        emitter.onNext("world");
    }
}).subscribeOn(Schedulers.newThread())
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, Thread.currentThread().getName() + "#Next: " + s);
            }
        });

// 執行結果
 subscribe: RxNewThreadScheduler-1
 RxNewThreadScheduler-1#Next: hello
 RxNewThreadScheduler-1#Next: world

全部的操做都走在 newThread 運行的,包括髮射數據。

2.4.2 屢次切換線程

屢次調用 subscribeOn 和 observeOn 的例子

Observable.just("HELLO#1", "HELLO#2")
        .subscribeOn(Schedulers.single())
        .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                s = s.toLowerCase();
                Log.d(TAG, "map##1 threadName: " + Thread.currentThread().getName() + " s:" + s);
                return s;
            }
        }).observeOn(Schedulers.io())
        .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                s = s + " RxJava.";
                Log.d(TAG, "map##2 threadName: " + Thread.currentThread().getName() + " s:" + s);
                return s;
            }
        })
        .subscribeOn(Schedulers.computation())
        .map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                s = s + "it is a test.";
                Log.d(TAG, "map##3 threadName: " + Thread.currentThread().getName() + " s:" + s);
                return s;
            }
        })
        .observeOn(Schedulers.newThread())
        .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "subscribe threadName:" + Thread.currentThread().getName() + "#Next: " + s);
            }
        });

//執行結果
 map##1 threadName: RxSingleScheduler-1 s:hello#1
 map##1 threadName: RxSingleScheduler-1 s:hello#2
 map##2 threadName: RxCachedThreadScheduler-1 s:hello#1 RxJava.
 map##3 threadName: RxCachedThreadScheduler-1 s:hello#1 RxJava.it is a test.
 map##2 threadName: RxCachedThreadScheduler-1 s:hello#2 RxJava.
 map##3 threadName: RxCachedThreadScheduler-1 s:hello#2 RxJava.it is a test.
 subscribe threadName:RxNewThreadScheduler-1#Next: hello#1 RxJava.it is a test.
 subscribe threadName:RxNewThreadScheduler-1#Next: hello#2 RxJava.it is a test.

3. Scheduler 的測試

TestScheduler 是專門用於測試的調度器,與其餘調度器的區別是,TestScheduler 只有被調用了時間纔會繼續。 TestScheduler 是一種特殊的、非線程安全的調度器,用於測試一些不引入真實併發性、容許手動推動虛擬時間的調度器。

在 RxJava 2.x 中,原先 RxJava l.x 的 Schedulers.test() 被去掉了 。要想得到 TestScheduler 對象,則能夠經過直接 new TestScheduler() 的方式來實現。

TestScheduler 所包含的方法井很少,下面羅列幾個關鍵的方法。

3.1 advanceTimeTo

將調度器的時鐘移動到某個特定時刻。

如,時鐘移動到 lOms.

scheduler.advanceTimeTo(10, TimeUnit.MILLISECONDS);

下例展現了 0s、20s、40s 各會打印什麼結果

TestScheduler scheduler = new TestScheduler();

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "immediate");
    }
});

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "20s");
    }
}, 20, TimeUnit.SECONDS);

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "40s");
    }
}, 40, TimeUnit.SECONDS);

scheduler.advanceTimeTo(1, TimeUnit.MILLISECONDS);
Log.d(TAG, "virtual time :" + scheduler.now(TimeUnit.MILLISECONDS));

scheduler.advanceTimeTo(20, TimeUnit.SECONDS);
Log.d(TAG, "virtual time :" + scheduler.now(TimeUnit.SECONDS));

scheduler.advanceTimeTo(40, TimeUnit.SECONDS);
Log.d(TAG, "virtual time :" + scheduler.now(TimeUnit.SECONDS));

// 執行結果
 immediate
 virtual time :1
 20s
 virtual time :20
 40s
 virtual time :40

使用 advanceTimeTo 以後,移動不一樣的時間點會打印不一樣的內容。

3.2 advanceTimeBy

將調度程序的時鐘按指定的時間向前移動

例如,時鐘移動了 lOms

scheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS);

再次調用剛纔的方法,時鐘又會移動 lOms。此時,時鐘移動到 20ms,這是一個累加的過程。

下例,使用了 timer 操做符, timer 是按照指定時間延遲發送的操做符,timer() 井不會按週期地執行。該例子展現了 2s 後 atomicLong 會自動加1

TestScheduler scheduler = new TestScheduler();

final AtomicLong atomicLong = new AtomicLong();
Observable.timer(2, TimeUnit.SECONDS, scheduler).subscribe(new Consumer<Long>() {
    @Override
    public void accept(Long aLong) throws Exception {
        atomicLong.incrementAndGet();
    }
});

Log.d(TAG, "atomicLong's value=" + atomicLong.get() + ", virtual time:" + scheduler.now(TimeUnit.SECONDS));

scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

Log.d(TAG, "atomicLong's value=" + atomicLong.get() + ", virtual time:" + scheduler.now(TimeUnit.SECONDS));

scheduler.advanceTimeBy(1, TimeUnit.SECONDS);

Log.d(TAG, "atomicLong's value=" + atomicLong.get() + ", virtual time:" + scheduler.now(TimeUnit.SECONDS));

 // 執行結果
 atomicLong's value=0, virtual time:0
 atomicLong's value=0, virtual time:1
 atomicLong's value=1, virtual time:2

這個結果符合預期,最初 atomicLong 爲 0,時鐘移動到 1s 時它的值仍然爲 0;時鐘再移動 ls ,即至關於時鐘移動到 2s 因此它的值變爲 1

advanceTimeBy() 也能夠傳負數,表示回到過去。

3.3 triggerActions

triggerActions 不會修改時間,它執行計劃中的可是未啓動的任務,已經執行過的任務不會再啓動。

TestScheduler scheduler = new TestScheduler();

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "immediate");
    }
});

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "20s");
    }
}, 20, TimeUnit.SECONDS);

Log.d(TAG, "virtual time :" + scheduler.now(TimeUnit.MILLISECONDS));

// 執行結果
 virtual time :0

增長 scheduler.triggerActions() 後

TestScheduler scheduler = new TestScheduler();

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "immediate");
    }
});

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "20s");
    }
}, 20, TimeUnit.SECONDS);

scheduler.triggerActions();
Log.d(TAG, "virtual time :" + scheduler.now(TimeUnit.MILLISECONDS));

// 執行結果
 immediate
 virtual time :0

再增長 advanceTimeBy()

TestScheduler scheduler = new TestScheduler();

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "immediate");
    }
});

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "20s");
    }
}, 20, TimeUnit.SECONDS);

scheduler.triggerActions();
Log.d(TAG, "virtual time :" + scheduler.now(TimeUnit.MILLISECONDS));

scheduler.advanceTimeBy(20, TimeUnit.SECONDS);

// 執行結果
 immediate
 virtual time :0
 20s

若是將 triggerActions() 放在最後, 看看效果。

TestScheduler scheduler = new TestScheduler();

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "immediate");
    }
});

scheduler.createWorker().schedule(new Runnable() {
    @Override
    public void run() {
        Log.d(TAG, "20s");
    }
}, 20, TimeUnit.SECONDS);


Log.d(TAG, "virtual time :" + scheduler.now(TimeUnit.MILLISECONDS));

scheduler.advanceTimeBy(20, TimeUnit.SECONDS);

scheduler.triggerActions();

// 執行結果
 virtual time :0
 immediate
 20s

由於己經使用了 advanceTimeBy(),因此即便再調用 triggerActions(),也不會執行己經啓動過的任務。

若是個人文章對您有幫助,不妨點個贊鼓勵一下(^_^)

相關文章
相關標籤/搜索