本文主要介紹一下Kotlin是如何實現Coroutine的,對於具體的用法推薦參考一下官方文檔,講得仍是比較詳細的html
概念上來講相似於線程,擁有本身的棧、本地變量、指令指針等,須要一段代碼塊來運行而且擁有相似的生命週期。可是和線程不一樣,coroutine並不和某一個特定的線程綁定,它能夠再線程A中執行,並在某一個時刻暫停(suspend),等下次調度到恢復執行的時候在線程B中執行。不一樣於線程,coroutine是協做式的,即子程序能夠經過在函數中有不一樣的入口點來實現暫停、恢復,從而讓出線程資源。java
首先看一個簡單的小demo,來看看Kotlin的Coroutine是具體適合使用的:git
@Test
fun async() {
async {
delay(1000)
print("World!")
}
print("Hello ")
Thread.sleep(2000)
}複製代碼
上面這段代碼會輸出Hello World!
, 那麼下面咱們看看具體是如何工做的.github
asyn()這裏是一個函數,下面是它的源碼:golang
public val DefaultDispatcher: CoroutineDispatcher = CommonPool
public fun <T> async( context: CoroutineContext = DefaultDispatcher, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.initParentJob(context[Job])
start(block, coroutine, coroutine)
return coroutine
}複製代碼
這個函數有三個參數,其中兩個都有默認值,也就是默認不須要傳遞,context是指coroutine的上下文,默認是DefaultDispatcher,DefaultDispatcher當前的實現是CommonPool,因爲目前仍是experimental,後面說不定會更改爲其餘的實現。算法
object CommonPool : CoroutineDispatcher() {
private var usePrivatePool = false
@Volatile
private var _pool: Executor? = null
private inline fun <T> Try(block: () -> T) = try { block() } catch (e: Throwable) { null }
private fun createPool(): ExecutorService {
val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
?: return createPlainPool()
if (!usePrivatePool) {
Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
?.let { return it }
}
Try { fjpClass.getConstructor(Int::class.java).newInstance(defaultParallelism()) as? ExecutorService }
?. let { return it }
return createPlainPool()
}
private fun createPlainPool(): ExecutorService {
val threadId = AtomicInteger()
return Executors.newFixedThreadPool(defaultParallelism()) {
Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
}
}
}複製代碼
CommonPool默認會使用ForkJoinPool做爲coroutine的調度策略,若是不存在這fallback到線程池的策略,ForkJoinPool實現了work-stealing算法,當前線程的工做完成後從其餘線程的待執行任務中竊取,具體的解釋推薦直接看其註釋,比網上的博客清晰的多。
而第二個參數CoroutineStart
指的是coroutine啓動的選項,總的來講有四種:編程
/* * * [DEFAULT] -- immediately schedules coroutine for execution according to its context; * * [LAZY] -- starts coroutine lazily, only when it is needed; * * [ATOMIC] -- atomically (non-cancellably) schedules coroutine for execution according to its context; * * [UNDISPATCHED] -- immediately executes coroutine until its first suspension point _in the current thread_. */複製代碼
第三個就是實際的coroutine要執行的代碼段了,下面咱們看看具體的執行流程:app
public fun <T> async( context: CoroutineContext = DefaultDispatcher, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T
): Deferred<T> {
/** * 初始化上下文,例如名字(方便調試) */
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
//這裏是DeferredCoroutine,而且不存在父任務
coroutine.initParentJob(context[Job])
//Kotlin的運算符重載,會轉化爲對應參數的invoke方法
//https://kotlinlang.org/docs/reference/operator-overloading.html
start(block, coroutine, coroutine)
return coroutine
}
public fun newCoroutineContext(context: CoroutineContext): CoroutineContext {
val debug = if (DEBUG) context + CoroutineId(COROUTINE_ID.incrementAndGet()) else context
return if (context !== DefaultDispatcher && context[ContinuationInterceptor] == null)
debug + DefaultDispatcher else debug
}複製代碼
下面就會調用CoroutineStart中對應的invoke方法:jvm
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
when (this) {
//相似java的switch語句(TABLESWITCH/lookupswitch)
//https://kotlinlang.org/docs/reference/control-flow.html
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
CoroutineStart.LAZY -> Unit // will start lazily
}複製代碼
接着會去調用startCoroutineCancellable
方法:async
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
createCoroutineUnchecked(receiver, completion).resumeCancellable(Unit)
//
public fun <R, T> (suspend R.() -> T).createCoroutineUnchecked(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> =
if (this !is kotlin.coroutines.experimental.jvm.internal.CoroutineImpl)
buildContinuationByInvokeCall(completion) {
@Suppress("UNCHECKED_CAST")
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, completion)
}
else
//這裏create方法會去建立CoroutineImpl,可是咱們看到CoroutineImpl中這方法會直接拋異常
//wtf?實際上這裏傳遞進來的this是編輯器根據async中的lambda動態生產的類的實例,所以
//也就是說實際的調用是哪一個動態類
(this.create(receiver, completion) as kotlin.coroutines.experimental.jvm.internal.CoroutineImpl).facade複製代碼
上面說到編譯器會生成內部類,那麼咱們看看這裏到底有什麼黑魔法,下面貼一下具體的結構
static final class CoroutineTest.launch extends CoroutineImpl implements Function2<CoroutineScope, Continuation<? super Unit>, Object> {
private CoroutineScope p$;
@NotNull
public final Continuation<Unit> create(@NotNull CoroutineScope $receiver, @NotNull Continuation<? super Unit> $continuation) {
Intrinsics.checkParameterIsNotNull((Object)$receiver, (String)"$receiver");
Intrinsics.checkParameterIsNotNull($continuation, (String)"$continuation");
CoroutineImpl coroutineImpl = new ;
CoroutineScope coroutineScope = coroutineImpl.p$ = $receiver;
return coroutineImpl;
}
}複製代碼
建立完須要的上下文以後,會去調用拿到Continuation
後,就去調用resumeCancellable
方法:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
createCoroutineUnchecked(receiver, completion).resumeCancellable(Unit)複製代碼
internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {
is DispatchedContinuation -> resumeCancellable(value)
else -> resume(value)
}
@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeCancellable(value: T) {
val context = continuation.context
if (dispatcher.isDispatchNeeded(context))
dispatcher.dispatch(context, DispatchTask(continuation, value, exception = false, cancellable = true))
else
resumeUndispatched(value)
}複製代碼
internal class DispatchTask<in T>(
private val continuation: Continuation<T>,
private val value: Any?, // T | Throwable
private val exception: Boolean,
private val cancellable: Boolean
) : Runnable {
@Suppress("UNCHECKED_CAST")
override fun run() {
val context = continuation.context
val job = if (cancellable) context[Job] else null
withCoroutineContext(context) {
when {
job != null && !job.isActive -> continuation.resumeWithException(job.getCancellationException())
exception -> continuation.resumeWithException(value as Throwable)
else -> continuation.resume(value as T)
}
}
}
override fun toString(): String =
"DispatchTask[$value, cancellable=$cancellable, ${continuation.toDebugString()}]"
}複製代碼
在咱們的場景下最終會調用:Continuation#public fun resume(value: T)
,這裏的實際會調用:
abstract class CoroutineImpl(
arity: Int,
@JvmField
protected var completion: Continuation<Any?>?
) : Lambda(arity), Continuation<Any?> {
override fun resume(value: Any?) {
processBareContinuationResume(completion!!) {
doResume(value, null)
}
}
}複製代碼
@kotlin.internal.InlineOnly
internal inline fun processBareContinuationResume(completion: Continuation<*>, block: () -> Any?) {
try {
val result = block()
if (result !== COROUTINE_SUSPENDED) {
@Suppress("UNCHECKED_CAST")
(completion as Continuation<Any?>).resume(result)
}
} catch (t: Throwable) {
completion.resumeWithException(t)
}
}複製代碼
這裏doResume就是上文提到Kotlin編譯器生成的內部類:
@Nullable
public final Object doResume(@Nullable Object var1_1, @Nullable Throwable var2_2) {
var5_3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (var0.label) {
case 0: {
v0 = var2_2;
if (v0 != null) {
throw v0;
}
var3_4 = this.p$;
this.label = 1;
v1 = DelayKt.delay$default((long)1000, (TimeUnit)null, (Continuation)this, (int)2, (Object)null);
if (v1 == var5_3) {
return var5_3;
}
** GOTO lbl18
}
case 1: {
v2 = throwable;
if (v2 != null) {
throw v2;
}
v1 = data;
lbl18: // 2 sources:
var4_5 = "World!";
System.out.print((Object)var4_5);
return Unit.INSTANCE;
}
}
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}複製代碼
能夠看到實際上Kotlin使用狀態機實現的Coroutine,根據label的狀態決定要執行的代碼塊,Kotlin會在編譯時根據可suspend的方法插入相應的label,從而實現主動讓出線程資源,而且將本地變量保存到Continuation的實例變量中,等到下次獲得調度的時候,根據label來決定要執行的代碼塊,等到函數實際執行完的時候則直接返回對應的返回值(沒有的話則是默認值).
好比看下面這段代碼:
val a = a()
val y = foo(a).await() // suspension point #1
b()
val z = bar(a, y).await() // suspension point #2
c(z)複製代碼
這段代碼總共有三種狀態:
每個狀態都是continuation的入口點之一,這段代碼會編譯爲一個實現了狀態機的匿名類,有一個狀態變量用於保存當前狀態機的狀態,以及用於保存當前coroutine的本地變量,編譯後的代碼用Java代碼相似下面:
class <anonymous_for_state_machine> extends CoroutineImpl<...> implements Continuation<Object> {
// 狀態機當前的狀態
int label = 0
// coroutine的本地變量
A a = null
Y y = null void resume(Object data) {
if (label == 0) goto L0 if (label == 1) goto L1 if (label == 2) goto L2 else throw IllegalStateException() L0: a = a()
label = 1
data = foo(a).await(this) // 'this' 默認會被傳遞給await方法
if (data == COROUTINE_SUSPENDED) return //若是須要暫停則返回
L1:
//從新獲得調度
y = (Y) data//保存本地變量
b()
label = 2
data = bar(a, y).await(this)
if (data == COROUTINE_SUSPENDED) return
L2:
Z z = (Z) data c(z) label = -1 // No more steps are allowed
return
}
}複製代碼
當coroutine開始執行的時候,默認label爲0,那麼就會跳轉到L0,而後執行一個耗時的業務邏輯,將label設置爲1,調用await,若是coroutine的執行須要暫停的那麼就返回掉。當須要繼續執行的時候就再次調用resume(),此次就會跳轉到L1, 執行完業務邏輯後,將label設置爲2,調用await並根據是否須要暫停來return,下次的繼續調度,此次會從L2開始執行,而後label設置爲-1,意味着不須要執行完了,不須要再調度了。
回到最初的代碼段,咱們首先調用了delay方法,這個方法默認使用ScheduledExecutorService,從而將當前的coroutine上下文包裝到DispatchTask,再對應的延遲時間以後再恢復執行,恢復執行以後,這時候label是1,那麼就會直接進入第二段代碼,輸出World!
@Test
fun async() {
async {
//在另外的線程池中執行,經過保存當前的執行上下文(本地變量、狀態機的狀態位等),並丟到
//ScheduledExecutorService中延遲執行
delay(1000)
print("World!")
}
//主線程中直接輸出
print("Hello ")
Thread.sleep(2000)
}複製代碼
本文大體講解了一些Kotlin中Coroutine的實現原理,固然對於協程,不少編程語言都有相關的實現,推薦都看一下文檔,實際使用對比看看。