經過RxJava看kotlin協程(二)

這篇文章主要和你們探討下關於rxjava的Scheduler和協程的Dispatcher。java

這兩個東西的用處都是處理線程調度用的。面試

Rxjava Scheduler

釋義

Scheduler 與 Worker 在 RxJava2 中是一個很是重要的概念,他們是 RxJava 線程調度的核心與基石。Scheduler主要負責的就是一件事情,定義好每一個流模塊的執行線程。app

源碼追溯

源碼分析咱們先從subscribeOn方法開始。異步

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {    
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");   
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
複製代碼

上述代碼能夠看出,subscribeOn的核心代碼是ObservableSubscribeOn,這個類只作了一件事情,它會把上一個流用裝飾者模式包裝了一下,當上一個流被執行的時候會將流執行到scheduler的線程上去。async

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
}
複製代碼

ObservableSubscribeOn的subscribeActual方法執行以後,scheduler.scheduleDirect(new SubscribeTask(parent))經過這段代碼將當前流運行到Scheduler的線程內。ide

以後咱們能夠看下Scheduler的實現累,RxAndroid的HandlerScheduler,看看對於安卓的調度器,RxJava是怎麼寫的。函數

@Overridepublic 
Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
    if (run == null) throw new NullPointerException("run == null");    
    if (unit == null) throw new NullPointerException("unit == null");    
    run = RxJavaPlugins.onSchedule(run);    
    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);    
    handler.postDelayed(scheduled, unit.toMillis(delay));    
    return scheduled;
}
複製代碼

scheduler.scheduleDirect執行的時候就會調用scheduleDirect方法。看得出來,方法被觸發以後調用了handler的postdelay方法,將當前的Runnable運行到該handler的線程上去。oop

static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();

                // releasing the pool should be the last action
                pool.release(threadWorker);
            }
        }

        @Override
        public boolean isDisposed() {
            return once.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }
複製代碼

接下來咱們要說另一個Worker,上面是IoScheduler裏面的Worker代碼。Work在初始化的時候會去生成一個線程池,這個線程池就是咱們後續schedule執行的地方,當一個Runnnable被調度到這個work上的時,會調用schedule方法,而後將這個Runnnable運行到這個線程池上去。源碼分析

協程 Dispatcher

釋義

協程上下文(coroutine context)包含一個協程調度器(參閱 CoroutineDispatcher),協程調度器 用於肯定執行協程的目標載體,即運行於哪一個線程,包含一個仍是多個線程。協程調度器能夠將協程的執行操做限制在特定線程上,也能夠將其分派到線程池中,或者讓它無限制地運行。post

源碼追溯

public suspend fun <T> withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T
): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
    // compute new context
    val oldContext = uCont.context
    val newContext = oldContext + context
    // always check for cancellation of new context
    newContext.checkCompletion()
    // FAST PATH #1 -- new context is the same as the old one
    if (newContext === oldContext) {
        val coroutine = ScopeCoroutine(newContext, uCont)
        return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
    }
    // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
    // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
    if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
        val coroutine = UndispatchedCoroutine(newContext, uCont)
        // There are changes in the context, so this thread needs to be updated
        withCoroutineContext(newContext, null) {
            return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
        }
    }
    // SLOW PATH -- use new dispatcher
    val coroutine = DispatchedCoroutine(newContext, uCont)
    coroutine.initParentJob()
    block.startCoroutineCancellable(coroutine, coroutine)
    coroutine.getResult()
}
複製代碼

這是協程的線程調度的代碼,當發現當前的調度器和目標調度器不是同一個的狀況下,會new一個DispatchedCoroutine,開始進行線程的調度操做。

coroutine.initParentJob()初始化父任務,這個方法須要一開始就被初始化調用的。 而後就是核心關鍵將block.startCoroutineCancellable(coroutine, coroutine),該方法會建立一個新的可掛起線程。 進行異步等待操做,當有值的狀況下會回調將當前掛起結束,進行下一步獲取值操做,而後將當前的線程返回。

在調用withContext方法的時候由於咱們傳入的是Dispatchers.Main

@JvmStatic
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
複製代碼

而DispatcherMain是能夠有外部的fatroy構造的,由安卓的kotlin支持庫中能夠發現,其實現類是AndroidDispatcherFactory。

internal class AndroidDispatcherFactory : MainDispatcherFactory {

    override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
        HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")

    override fun hintOnError(): String? = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"

    override val loadPriority: Int
        get() = Int.MAX_VALUE / 2
}
複製代碼

此次真的轉過來了把,沒有干擾了把,各位老哥,我要給大家跪了啊。

接下來咱們看下重頭戲HandlerContext,這個類就是和rxjava的HandlerScheduler基本如出一轍的線程調度器。

internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    /** * Creates [CoroutineDispatcher] for the given Android [handler]. * * @param handler a handler. * @param name an optional name for debugging. */
    public constructor(
        handler: Handler,
        name: String? = null
    ) : this(handler, name, false)

    @Volatile
    private var _immediate: HandlerContext? = if (invokeImmediately) this else null

    override val immediate: HandlerContext = _immediate ?:
        HandlerContext(handler, name, true).also { _immediate = it }

    override fun isDispatchNeeded(context: CoroutineContext): Boolean {
        return !invokeImmediately || Looper.myLooper() != handler.looper
    }

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        handler.post(block)
    }

    override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        val block = Runnable {
            with(continuation) { resumeUndispatched(Unit) }
        }
        handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
        continuation.invokeOnCancellation { handler.removeCallbacks(block) }
    }

    override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
        handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
        return object : DisposableHandle {
            override fun dispose() {
                handler.removeCallbacks(block)
            }
        }
    }

    override fun toString(): String =
        if (name != null) {
            if (invokeImmediately) "$name [immediate]" else name
        } else {
            handler.toString()
        }

    override fun equals(other: Any?): Boolean = other is HandlerContext && other.handler === handler
    override fun hashCode(): Int = System.identityHashCode(handler)
}

複製代碼

DispatchedCoroutine在上面的這個掛起函數的父類CoroutineDispatcher,會調用dispatch方法,進行線程切換操做,而後是否是和上面的rxjava 有點似曾類似的感受。

沒錯,各位看官,此次調用了handler.post(block)。因此我此次我真的下結論了,上篇文章是有點小微妙,可是此次應該沒事清楚了。

結論

若是當你基本瞭解rxjava的調度器的實現的狀況下。大膽點之後面試問你kotlin協程是如何實現調度的邏輯,你就把邏輯copy一遍告訴他就行了。

感謝

寫這篇文章還收收集了一些資料的,謝謝各位大佬。

理解RxJava(三)線程調度原理分析

【譯】kotlin 協程官方文檔(4)-協程上下文和調度器(Coroutine Context and Dispatchers)

相關文章
相關標籤/搜索