這篇文章主要和你們探討下關於rxjava的Scheduler和協程的Dispatcher。java
這兩個東西的用處都是處理線程調度用的。面試
Scheduler 與 Worker 在 RxJava2 中是一個很是重要的概念,他們是 RxJava 線程調度的核心與基石。Scheduler主要負責的就是一件事情,定義好每一個流模塊的執行線程。app
源碼分析咱們先從subscribeOn方法開始。異步
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
複製代碼
上述代碼能夠看出,subscribeOn的核心代碼是ObservableSubscribeOn,這個類只作了一件事情,它會把上一個流用裝飾者模式包裝了一下,當上一個流被執行的時候會將流執行到scheduler的線程上去。async
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
}
複製代碼
ObservableSubscribeOn的subscribeActual方法執行以後,scheduler.scheduleDirect(new SubscribeTask(parent))經過這段代碼將當前流運行到Scheduler的線程內。ide
以後咱們能夠看下Scheduler的實現累,RxAndroid的HandlerScheduler,看看對於安卓的調度器,RxJava是怎麼寫的。函數
@Overridepublic
Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, unit.toMillis(delay));
return scheduled;
}
複製代碼
scheduler.scheduleDirect執行的時候就會調用scheduleDirect方法。看得出來,方法被觸發以後調用了handler的postdelay方法,將當前的Runnable運行到該handler的線程上去。oop
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
@Override
public boolean isDisposed() {
return once.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
複製代碼
接下來咱們要說另一個Worker,上面是IoScheduler裏面的Worker代碼。Work在初始化的時候會去生成一個線程池,這個線程池就是咱們後續schedule執行的地方,當一個Runnnable被調度到這個work上的時,會調用schedule方法,而後將這個Runnnable運行到這個線程池上去。源碼分析
協程上下文(coroutine context)包含一個協程調度器(參閱 CoroutineDispatcher),協程調度器 用於肯定執行協程的目標載體,即運行於哪一個線程,包含一個仍是多個線程。協程調度器能夠將協程的執行操做限制在特定線程上,也能夠將其分派到線程池中,或者讓它無限制地運行。post
public suspend fun <T> withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T
): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// compute new context
val oldContext = uCont.context
val newContext = oldContext + context
// always check for cancellation of new context
newContext.checkCompletion()
// FAST PATH #1 -- new context is the same as the old one
if (newContext === oldContext) {
val coroutine = ScopeCoroutine(newContext, uCont)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
// FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont)
// There are changes in the context, so this thread needs to be updated
withCoroutineContext(newContext, null) {
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
// SLOW PATH -- use new dispatcher
val coroutine = DispatchedCoroutine(newContext, uCont)
coroutine.initParentJob()
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
複製代碼
這是協程的線程調度的代碼,當發現當前的調度器和目標調度器不是同一個的狀況下,會new一個DispatchedCoroutine,開始進行線程的調度操做。
coroutine.initParentJob()初始化父任務,這個方法須要一開始就被初始化調用的。 而後就是核心關鍵將block.startCoroutineCancellable(coroutine, coroutine),該方法會建立一個新的可掛起線程。 進行異步等待操做,當有值的狀況下會回調將當前掛起結束,進行下一步獲取值操做,而後將當前的線程返回。
在調用withContext方法的時候由於咱們傳入的是Dispatchers.Main
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
複製代碼
而DispatcherMain是能夠有外部的fatroy構造的,由安卓的kotlin支持庫中能夠發現,其實現類是AndroidDispatcherFactory。
internal class AndroidDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")
override fun hintOnError(): String? = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"
override val loadPriority: Int
get() = Int.MAX_VALUE / 2
}
複製代碼
此次真的轉過來了把,沒有干擾了把,各位老哥,我要給大家跪了啊。
接下來咱們看下重頭戲HandlerContext,這個類就是和rxjava的HandlerScheduler基本如出一轍的線程調度器。
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
/** * Creates [CoroutineDispatcher] for the given Android [handler]. * * @param handler a handler. * @param name an optional name for debugging. */
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
@Volatile
private var _immediate: HandlerContext? = if (invokeImmediately) this else null
override val immediate: HandlerContext = _immediate ?:
HandlerContext(handler, name, true).also { _immediate = it }
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
return object : DisposableHandle {
override fun dispose() {
handler.removeCallbacks(block)
}
}
}
override fun toString(): String =
if (name != null) {
if (invokeImmediately) "$name [immediate]" else name
} else {
handler.toString()
}
override fun equals(other: Any?): Boolean = other is HandlerContext && other.handler === handler
override fun hashCode(): Int = System.identityHashCode(handler)
}
複製代碼
DispatchedCoroutine在上面的這個掛起函數的父類CoroutineDispatcher,會調用dispatch方法,進行線程切換操做,而後是否是和上面的rxjava 有點似曾類似的感受。
沒錯,各位看官,此次調用了handler.post(block)。因此我此次我真的下結論了,上篇文章是有點小微妙,可是此次應該沒事清楚了。
若是當你基本瞭解rxjava的調度器的實現的狀況下。大膽點之後面試問你kotlin協程是如何實現調度的邏輯,你就把邏輯copy一遍告訴他就行了。
寫這篇文章還收收集了一些資料的,謝謝各位大佬。
【譯】kotlin 協程官方文檔(4)-協程上下文和調度器(Coroutine Context and Dispatchers)