Kotlin1.3開始,協程從experimental變成了release,前些日子看了看簡單的用法,今天就從源碼的角度來看看Kotlin的協程到底是怎樣造成的.android
看源碼要帶着問題,我決定從如下三個問題來進行分析bash
啓動一個協程的方法async
GlobalScope.launch { // launch new coroutine in background and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
複製代碼
這段代碼就是啓動一個協程,並啓動,延遲1秒後打印world,就從這個launch方法進行切入ide
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
複製代碼
代碼很清楚,根據CoroutineStart是否是CoroutineStart.LAZY對象,建立不一樣的Job實現類,默認咱們傳入的start參數爲CoroutineStart.DEFAULT,這時咱們建立的是一個StandaloneCoroutine對象,調用它的start方法啓動,而後對它進行返回。oop
GlobalScope.launch(Dispatchers.Default){
println("Current thread is ${Thread.currentThread().name}")
launch {
delay(1000)
println("now")
}
println("next")
}
複製代碼
看一下這段代碼,這段代碼先打印出next,而後延遲1秒鐘後打印出now,有沒有一種感受,這像是android裏handler的post和postDelay方法。首先看一下delay方法post
@InternalCoroutinesApi
public interface Delay {
suspend fun delay(time: Long) {
if (time <= 0) return // don't delay return suspendCancellableCoroutine { scheduleResumeAfterDelay(time, it) } } fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle = DefaultDelay.invokeOnTimeout(timeMillis, block) } public suspend fun delay(timeMillis: Long) { if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay
複製代碼
delay方法在Delay.kt文件裏,能夠看到,這裏定義了一個Delay接口,scheduleResumeAfterDelay是用來從新把任務恢復調度的,invokeOnTimeout顯然是調度過程當中發現時間到了之後要恢復執行的方法體。Delay是一個接口,看一它的實現類是如何實現scheduleResumeAfterDelay方法的。ui
internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
...
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) =
schedule(DelayedResumeTask(timeMillis, continuation))
...
複製代碼
先看DelayResumeTaskthis
private inner class DelayedResumeTask(
timeMillis: Long,
private val cont: CancellableContinuation<Unit>
) : DelayedTask(timeMillis) {
init {
// Note that this operation isn't lock-free, but very short cont.disposeOnCancellation(this) } override fun run() { with(cont) { resumeUndispatched(Unit) } } } 複製代碼
這個類繼承自DelayTask,而DelayedTask實現了runnable接口,這裏複寫了run方法,調用了CancellableContinuation的resumeUndispatched方法。經過方法名能夠看出通過等待時間後就會恢復執行。CancellableContinuation的實現類是CancellableContinuationImp跟進去看一看這個類spa
@PublishedApi
internal open class CancellableContinuationImpl<in T>(
delegate: Continuation<T>,
resumeMode: Int
) : AbstractContinuation<T>(delegate, resumeMode), CancellableContinuation<T>, Runnable {
...
override fun completeResume(token: Any) = completeStateUpdate(token as NotCompleted, state, resumeMode)
override fun CoroutineDispatcher.resumeUndispatched(value: T) {
val dc = delegate as? DispatchedContinuation
resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
}
...
}
複製代碼
resumeUndispatched方法裏調用了resumeImp方法,這是繼承自AbstractContinuation的方法線程
protected fun resumeImpl(proposedUpdate: Any?, resumeMode: Int) {
loopOnState { state ->
when (state) {
is NotCompleted -> {
if (updateStateToFinal(state, proposedUpdate, resumeMode)) return
}
is CancelledContinuation -> {
/*
* If continuation was cancelled, then all further resumes must be
* ignored, because cancellation is asynchronous and may race with resume.
* Racy exception are reported so no exceptions are lost
*
* :todo: we should somehow remember the attempt to invoke resume and fail on the second attempt.
*/
if (proposedUpdate is CompletedExceptionally) {
handleException(proposedUpdate.cause)
}
return
}
else -> error("Already resumed, but proposed with update $proposedUpdate")
}
}
}
複製代碼
這裏會根據不一樣的狀態調用不一樣的方法.
private fun updateStateToFinal(expect: NotCompleted, proposedUpdate: Any?, mode: Int): Boolean {
...
completeStateUpdate(expect, proposedUpdate, mode)
return true
}
protected fun completeStateUpdate(expect: NotCompleted, update: Any?, mode: Int) {
...
dispatchResume(mode)
}
private fun dispatchResume(mode: Int) {
if (tryResume()) return // completed before getResult invocation -- bail out
// otherwise, getResult has already commenced, i.e. completed later or in other thread
dispatch(mode)
}
internal fun <T> DispatchedTask<T>.dispatch(mode: Int = MODE_CANCELLABLE) {
val delegate = this.delegate
if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
// dispatch directly using this instance's Runnable implementation val dispatcher = delegate.dispatcher val context = delegate.context if (dispatcher.isDispatchNeeded(context)) { dispatcher.dispatch(context, this) } else { UndispatchedEventLoop.resumeUndispatched(this) } } else { resume(delegate, mode) } } 複製代碼
刪掉了不相關的代碼,只保留dispatch這條主線,相信很容易個看明白最終又把這個任務放回到Dispatcher裏面去了。那個else分支的resume其實內部調用的是Continuation.resume擴展方法,最終同樣要調用到resumeImpl中,又回到上面已經分析的流程裏了,這是處理有Continuation代理的狀況。以上就是當delay時間到達後協程是如何從新恢復的。
接下來看一看延時是如何實現的,協程裏有個默認的DefaultExecutor線程用來執行協程代碼
override fun run() {
timeSource.registerTimeLoopThread()
try {
var shutdownNanos = Long.MAX_VALUE
if (!notifyStartup()) return
while (true) {
Thread.interrupted() // just reset interruption flag
var parkNanos = processNextEvent()
if (parkNanos == Long.MAX_VALUE) {
// nothing to do, initialize shutdown timeout
if (shutdownNanos == Long.MAX_VALUE) {
val now = timeSource.nanoTime()
if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + KEEP_ALIVE_NANOS
val tillShutdown = shutdownNanos - now
if (tillShutdown <= 0) return // shut thread down
parkNanos = parkNanos.coerceAtMost(tillShutdown)
} else
parkNanos = parkNanos.coerceAtMost(KEEP_ALIVE_NANOS) // limit wait time anyway
}
if (parkNanos > 0) {
// check if shutdown was requested and bail out in this case
if (isShutdownRequested) return
timeSource.parkNanos(this, parkNanos)
}
}
} finally {
_thread = null // this thread is dead
acknowledgeShutdownIfNeeded()
timeSource.unregisterTimeLoopThread()
// recheck if queues are empty after _thread reference was set to null (!!!)
if (!isEmpty) thread() // recreate thread if it is needed
}
}
複製代碼
override fun processNextEvent(): Long {
if (!isCorrectThread()) return Long.MAX_VALUE
// queue all delayed tasks that are due to be executed
val delayed = _delayed.value
if (delayed != null && !delayed.isEmpty) {
val now = timeSource.nanoTime()
while (true) {
// make sure that moving from delayed to queue removes from delayed only after it is added to queue
// to make sure that 'isEmpty' and `nextTime` that check both of them
// do not transiently report that both delayed and queue are empty during move
delayed.removeFirstIf {
if (it.timeToExecute(now)) {
enqueueImpl(it)
} else
false
} ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
}
}
// then process one event from queue
dequeue()?.run()
return nextTime
}
複製代碼
DefaultExecutor不斷獲取task並執行,而這些task事件就是存儲在_delayed裏的,這裏能夠將_delayed理解爲一個隊列。簡述這兩段代碼作的事情就是就是死循環遍歷task隊列該執行的就執行並出隊,沒到執行時間的就留在隊列。 總結一下,協程就是維持了一個相似android Looper和MessageQueuen的東西,將要執行的代碼封裝成Coroutine放入隊列,而後經過循環並根據必定條件不停的取出執行。
回到launch方法
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
複製代碼
看一下StandaloneCoroutine的start方法
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
複製代碼
start(block, receiver, this)調用的就是CoroutineStart裏的invoke方法,這裏實際上是CoroutineStart對操做符進行了複寫,並非遞歸調用,這個start就是launch方法傳進來的,默認是CoroutineStart.DEFAULT,這是一個枚舉對象
@InternalCoroutinesApi
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
when (this) {
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
CoroutineStart.LAZY -> Unit // will start lazily
}
internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =
createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {
is DispatchedContinuation -> resumeCancellable(value)
else -> resume(value)
}
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeCancellable(value: T) {
if (dispatcher.isDispatchNeeded(context)) {
_state = value
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
UndispatchedEventLoop.execute(this, value, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatched(value)
}
}
}
}
複製代碼
總之到了這裏,就是經過 dispatcher.dispatch(...)把這個任務分發給線程/線程池去執行了,分發方式根據CoroutineStart對象有關。
上面說了不少源碼上的東西,畫張圖,方便理解