先貼上這個系列的連接。
rxjava2源碼解析(一)基本流程分析
rxjava2源碼解析(二)線程切換分析
上一篇說了rxjava2
的線程切換,可是沒有深刻說其中的線程池。這篇咱們來深扒一下。java
仍是先說observeOn
,直接看源碼:面試
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
複製代碼
這段代碼咱們上篇看到過,這裏再重複一下。obsererOn
是切換下游觀察者線程,咱們看ObserveOnObserver
中的onNext
方法是如何切換線程的。bash
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
//ObserveOnObserver繼承了runnable接口,意味着能夠當作是線程任務來執行。這裏表明着在新線程中執行run方法。
worker.schedule(this);
}
}
//ObserveOnObserver繼承了runnable接口
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
····//省略一些判斷的代碼
v = q.poll();
//這裏就能夠看到,將下游的onNext方法,切換到新線程執行。
a.onNext(v);
}
···
}
}
}
複製代碼
這是上游的處理器執行onNext
,傳到這裏,使用以前設置的線程執行下游的onNext
方法。app
這個worker
究竟是什麼?咱們先看schedule
r的createWorker
方法:ide
public abstract Worker createWorker();
複製代碼
在Scheduler
類中,createWorker
只是一個接口,子類會重寫這個方法,咱們就以Schedulers.newThread()
這個方法建立的Scheduler
爲例,來看看這裏面的原理。post
//Schedulers類中的newThread靜態方法,這裏的hock咱們暫且不理,直接返回NEW_THREAD
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
//Schedulers類中定義了NEW_THREAD和其餘THREAD
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
//NewThreadTask是Schedulers的靜態內部類,繼承自Callable接口,其中call方法返回一個Scheduler
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}
//NewThreadHolder一樣是一個靜態內部類,裏面只有一個靜態參數DEFAULT,這裏咱們就找到了newThread方法返回的本尊NewThreadScheduler
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
複製代碼
如上面代碼和註釋所示,咱們直接看NewThreadScheduler
的源碼:ui
/**
* Schedules work on a new thread.
*/
public final class NewThreadScheduler extends Scheduler {
final ThreadFactory threadFactory;
private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";
static {
//這裏Thread.MIN_PRIORITY爲1,Thread.MAX_PRIORITY爲10.Thread.NORM_PRIORITY爲5.若是咱們不作任何更改,這裏的priority的值就爲5.
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}
public NewThreadScheduler() {
this(THREAD_FACTORY);
}
public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}
複製代碼
這裏createWorker
方法返回的是一個NewThreadWorker
對象。咱們總算找到了worker
的來源,須要注意這裏的構造參數是threadFactory
。來看看NewThreadWorker
的源碼。this
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable run) {
return schedule(run, 0, null);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
//hock機制
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//用一個ScheduledRunnable把傳入的runnable包裝一下,本質上沒區別。
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
....//省略判斷性代碼
Future<?> f;
try {
if (delayTime <= 0) {
//executor由構造方法中建立
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
....
}
複製代碼
這裏咱們就能夠看到,前面調用worker.schedule(this)
,最終走到了executor.submit(sr)
。這裏的sr
只是前面ObserveOnObserver
的包裝。executor
在構造方法中建立。來看看executor
是什麼:spa
//SchedulerPoolFactory類中的靜態方法
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
tryPutIntoPool(PURGE_ENABLED, exec);
return exec;
}
複製代碼
//Executors類的靜態方法
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
複製代碼
OK,executor
是一個ScheduledThreadPoolExecutor
,標準的工做線程池。核心線程數爲1,threadFactory
是前面NewThreadWorker
構造參數中的RxThreadFactory
。他會給thread
按照命名格式進行命名。線程
public final class RxThreadFactory extends AtomicLong implements ThreadFactory {
public RxThreadFactory(String prefix) {
this(prefix, Thread.NORM_PRIORITY, false);
}
public RxThreadFactory(String prefix, int priority) {
this(prefix, priority, false);
}
public RxThreadFactory(String prefix, int priority, boolean nonBlocking) {
this.prefix = prefix;
this.priority = priority;
this.nonBlocking = nonBlocking;
}
@Override
public Thread newThread(Runnable r) {
StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
String name = nameBuilder.toString();
Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
t.setPriority(priority);
t.setDaemon(true);
return t;
}
...
}
複製代碼
observeOn
在subscribe
方法中,新建一個worker
對象。這個worker
對象是根據設置的scheduler
建立的。而後在新建一個ObserveOnObserver
對象,將上游與之訂閱。ObserveOnObserver
的onNext
方法中,會調用worker.schedule(this)
,將自己做爲runnable
傳入到worker
中。newThreadScheduler
爲例,他建立的worker
是一個NewThreadWorker
實例。在實例構造方法中,會根據傳入的threadFactory
新建一個ScheduledThreadPool
線程池。NewThreadWorker
的shedule
方法,就是將ObserveOnObserver
做爲一個runnable
放在一個新的線程池中執行。ObserveOnObserver
的run
方法,就是用來執行下游的onNext
,將數據傳輸下去。從而達到了,切換下游onNext
線程的目的。subscribeOn
是用來切換上游發射器線程。切換原理上一篇有說過,其中線程池相關跟上面observeOn
差很少,這裏就不贅述了。
上面就是rxjava2
線程切換原理分析了,後面再有人面試問你rxjava2
裏面的線程池是哪種,你就能夠自信的說出:ScheduledThreadPool
。