RxJava的被觀察者在使用操做符時能夠利用線程調度器--Scheduler來切換線程,例如java
Observable.just("aaa","bbb")
.observeOn(Schedulers.newThread())
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
return s.toUpperCase();
}
})
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println(s);
}
});複製代碼
被觀察者(Observable、Flowable...)發射數據流以後,其操做符能夠在不一樣的線程中加工數據流,最後被觀察者在前臺線程中接受並響應數據。ios
下圖不一樣的箭頭顏色表示不一樣的線程。
bash
Schedulers 是一個靜態工廠類,經過分析Schedulers的源碼能夠看到它有多種不一樣類型的Scheduler。下面是Schedulers的各個工廠方法。app
computation()用於CPU密集型的計算任務,但並不適合於IO操做。less
@NonNull
public static Scheduler computation() {
return RxJavaPlugins.onComputationScheduler(COMPUTATION);
}複製代碼
io()用於IO密集型任務,支持異步阻塞IO操做,這個調度器的線程池會根據須要增加。對於普通的計算任務,請使用Schedulers.computation()。異步
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}複製代碼
trampoline()在RxJava2中跟RxJava1的做用是不一樣的。在RxJava2中表示當即執行,若是當前線程有任務在執行,則會將其暫停,等插入進來的新任務執行完以後,再將原先未完成的任務接着執行。在RxJava1中表示在當前線程中等待其餘任務完成以後,再執行新的任務。ide
@NonNull
public static Scheduler trampoline() {
return TRAMPOLINE;
}複製代碼
newThread()爲每一個任務建立一個新線程。函數
@NonNull
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}複製代碼
single()擁有一個線程單例,全部的任務都在這一個線程中執行,當此線程中有任務執行時,它的任務們將會按照先進先出的順序依次執行。oop
@NonNull
public static Scheduler single() {
return RxJavaPlugins.onSingleScheduler(SINGLE);
}複製代碼
除此以外,還支持自定義的Executor來做爲調度器。ui
@NonNull
public static Scheduler from(@NonNull Executor executor) {
return new ExecutorScheduler(executor);
}複製代碼
Scheduler是RxJava的線程任務調度器,Worker是線程任務的具體執行者。從Scheduler源碼能夠看到,Scheduler在scheduleDirect()、schedulePeriodicallyDirect()方法中建立了Worker,而後會分別調用worker的schedule()、schedulePeriodically()來執行任務。
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);
Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
if (d == EmptyDisposable.INSTANCE) {
return d;
}
return periodicTask;
}複製代碼
Worker也是一個抽象類,從上圖能夠看到每一種Scheduler會對應一種具體的Worker。
public abstract static class Worker implements Disposable {
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
final SequentialDisposable first = new SequentialDisposable();
final SequentialDisposable sd = new SequentialDisposable(first);
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
final long periodInNanoseconds = unit.toNanos(period);
final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);
Disposable d = schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
periodInNanoseconds), initialDelay, unit);
if (d == EmptyDisposable.INSTANCE) {
return d;
}
first.replace(d);
return sd;
}
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
...
}複製代碼
SingleScheduler是RxJava2新增的Scheduler。SingleScheduler中有一個屬性叫executor,它是使用AtomicReference包裝的ScheduledExecutorService。
final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<ScheduledExecutorService>();複製代碼
在SingleScheduler構造函數中,executor會調用lazySet()。
public SingleScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
executor.lazySet(createExecutor(threadFactory));
}複製代碼
它的createExecutor()用於建立工做線程,能夠看到經過SchedulerPoolFactory來建立ScheduledExecutorService。
static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {
return SchedulerPoolFactory.create(threadFactory);
}複製代碼
在SchedulerPoolFactory類的create(ThreadFactory factory) 中,使用newScheduledThreadPool線程池定義定時器,最大容許線程數爲1。
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}複製代碼
在SingleScheduler中每次使用ScheduledExecutorService,實際上是使用executor.get()。因此說,single擁有一個線程單例。
SingleScheduler會建立一個ScheduledWorker,ScheduledWorker使用jdk的ScheduledExecutorService做爲executor。
下面是ScheduledWorker的schedule()方法。使用ScheduledExecutorService的submit()或schedule()來執行runnable。
@NonNull
@Override
public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, tasks);
tasks.add(sr);
try {
Future<?> f;
if (delay <= 0L) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delay, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
dispose();
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
return sr;
}複製代碼
ComputationScheduler使用FixedSchedulerPool做爲線程池,而且FixedSchedulerPool被AtomicReference包裝了一下。
從ComputationScheduler的源碼中能夠看出,MAX_THREADS是CPU的數目。FixedSchedulerPool能夠理解爲擁有固定數量的線程池,數量爲MAX_THREADS。
static {
MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));
......
}
static int cap(int cpuCount, int paramThreads) {
return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads;
}複製代碼
ComputationScheduler會建立一個EventLoopWorker。
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get().getEventLoop());
}複製代碼
其中,getEventLoop()是FixedSchedulerPool中的方法,返回了FixedSchedulerPool中的一個PoolWorker。
public PoolWorker getEventLoop() {
int c = cores;
if (c == 0) {
return SHUTDOWN_WORKER;
}
// simple round robin, improvements to come
return eventLoops[(int)(n++ % c)];
}複製代碼
PoolWorker繼承自NewThreadWorker,它也是線程數爲1的ScheduledExecutorService。
IoScheduler使用CachedWorkerPool做爲線程池,而且CachedWorkerPool也是被AtomicReference包裝了一下。
CachedWorkerPool是基於RxThreadFactory這個ThreadFactory來建立的。
static {
......
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
......
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
......
}複製代碼
在RxThreadFactory中,由 prefix 和 incrementAndGet() 來建立新線程的名稱。
@Override
public Thread newThread(Runnable r) {
StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
String name = nameBuilder.toString();
Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
t.setPriority(priority);
t.setDaemon(true);
return t;
}複製代碼
IoScheduler建立的線程數是不固定的,能夠經過IoScheduler 的 size() 來得到當前的線程數。而ComputationScheduler的線程數通常狀況等於CPU的數目。
public int size() {
return pool.get().allWorkers.size();
}複製代碼
特別須要的是 ComputationScheduler 和 IoScheduler 都是依賴線程池來維護線程的,區別就是 IoScheduler 線程池中的個數是無限的,由 prefix 和 incrementAndGet() 產生的遞增值來決定線程的名字;而 ComputationScheduler 中則是一個固定線程數量的線程池,數據爲CPU的數目,而且不要把 I/O 操做放在 computation() 中,不然 I/O 操做的等待時間會浪費 CPU。
一樣,IoScheduler也會建立EventLoopWorker。
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}複製代碼
可是這個EventLoopWorker是IoScheduler的內部類,跟ComputationScheduler建立的EventLoopWorker是不同的,只是兩者的名稱相同罷了。
NewThreadScheduler會建立NewThreadWorker。咱們看到NewThreadWorker的構造函數也是使用SchedulerPoolFactory。
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}複製代碼
跟SingleScheduler不一樣的是,SingleScheduler的executor是使用AtomicReference包裝的ScheduledExecutorService。每次使用時,會調用executor.get()。
然而,NewThreadScheduler每次都會建立一個新的線程。
TrampolineScheduler會建立TrampolineWorker,在TrampolineWorker內部維護着一個PriorityBlockingQueue。任務進入該隊列以前,會先用TimedRunnable封裝一下。
static final class TimedRunnable implements Comparable<TimedRunnable> {
final Runnable run;
final long execTime;
final int count; // In case if time between enqueueing took less than 1ms
volatile boolean disposed;
TimedRunnable(Runnable run, Long execTime, int count) {
this.run = run;
this.execTime = execTime;
this.count = count;
}
@Override
public int compareTo(TimedRunnable that) {
int result = ObjectHelper.compare(execTime, that.execTime);
if (result == 0) {
return ObjectHelper.compare(count, that.count);
}
return result;
}
}複製代碼
咱們能夠看到TimedRunnable實現了Comparable接口,會比較任務的execTime和count。
任務在進入queue以前,count每次都會+1。
final TimedRunnable timedRunnable = new TimedRunnable(action, execTime, counter.incrementAndGet());
queue.add(timedRunnable);複製代碼
因此,使用TrampolineScheduler時,每次新的任務都會優先執行。
在默認狀況下不作任何線程處理,Observable和Observer是處於同一線程中的。若是想要切換線程的話,可使用subscribeOn()和observeOn()。
subscribeOn經過接收一個Scheduler參數,來指定對數據的處理運行在特定的線程調度器Scheduler上。
若屢次執行subscribeOn,則只有一次起做用。
點擊subscribeOn()的源碼能夠看到,每次調用subscribeOn()都會建立一個ObservableSubscribeOn對象。
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}複製代碼
ObservableSubscribeOn真正發生訂閱的方法是subscribeActual(Observer<? super T> observer)。
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}複製代碼
其中,SubscribeOnObserver是下游的Observer經過裝飾器模式生成的。它實現了Observer、Disposable接口。
接下來,在上游的線程中執行下游Observer的onSubscribe(Disposable disposabel)方法。
s.onSubscribe(parent);複製代碼
而後,將子線程的操做加入Disposable管理中,加入Disposable後能夠方便上下游的統一管理。
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));複製代碼
在這裏,已經調用對應scheduler的scheduleDirect()方法。scheduleDirect() 傳入的是一個Runnable,也就是下面的SubscribeTask。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}複製代碼
SubscribeTask會執行run()對上游的Observable進行訂閱。
此時,已經在對應的Scheduler線程中運行了。
source.subscribe(parent);複製代碼
在RxJava的鏈式操做中,數據的處理是自下而上,這點跟數據發射正好相反。若是屢次調用subscribeOn,最上面的線程切換最晚執行,因此變成了只有第一次切換線程纔有效。
observeOn一樣接收一個Scheduler參數,用來指定下游操做運行在特定的線程調度器Scheduler上。
若屢次執行observeOn,則每次均起做用,線程會一直切換。
點擊observeOn()的源碼能夠看到,每次調用observeOn()都會建立一個ObservableObserveOn對象。
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}複製代碼
ObservableObserveOn真正發生訂閱的方法是subscribeActual(Observer<? super T> observer)。
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}複製代碼
若是scheduler是TrampolineScheduler,上游事件和下游事件會當即產生訂閱。
若是不是的話,scheduler會建立本身的Worker,而後上游事件和下游事件產生訂閱,生成一個ObserveOnObserver對象包裝了下游真正的Observer。
ObserveOnObserver是ObservableObserveOn的內部類,實現了Observer、Runnable接口。跟SubscribeOnObserver不一樣的是,SubscribeOnObserver實現了Observer、Disposable接口。
在ObserveOnObserver的onNext()中,schedule()執行了具體調度的方法。
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}複製代碼
其中,worker是當前scheduler建立的Worker,this指的是當前的ObserveOnObserver對象,this實現了Runnable接口。
而後,咱們看看Runnable接口的實現方法run(),這個方法是在worker對應的線程裏執行的。drainNormal()會取出 ObserveOnObserver 的 queue 裏的數據進行發送。
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}複製代碼
下游屢次調用observeOn()的話,線程會一直切換。每一次切換線程,都會把對應的Observer對象的各個方法的處理執行在指定的線程中。
舉一個屢次調用subscribeOn、observeOn的例子。
Observable.just("HELLO WORLD")
.subscribeOn(Schedulers.single())
.map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
s = s.toLowerCase();
L.i("map1",s);
return s;
}
})
.observeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
s = s+" tony.";
L.i("map2",s);
return s;
}
})
.subscribeOn(Schedulers.computation())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
s = s+"it is a test.";
L.i("map3",s);
return s;
}
})
.observeOn(Schedulers.newThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
L.i("subscribe",s);
System.out.println(s);
}
});複製代碼
瞭解RxJava的線程模型、線程調度器、線程調度是很是有意義的。可以幫助咱們更合理地使用RxJava。另外,RxJava的線程切換結合鏈式調用很是方便,比起Java使用線程操做實在是簡單太多了。