0%

Kotlin笔记之再看协程工作原理

概述

关于协程的创建,以及挂起和恢复,之前有写过一篇文章 Kotlin协程之深入理解协程工作原理 整理这个流程,最近再看这篇文章的时候,感觉看起来比较费劲,不是说写得有问题,只是看起来比较臃肿。如果想再复习这块的知识,可能需要看几遍后才能懂,所以想另外再整理一篇文章写写协程启动,挂起和恢复的原理,适合在读完上篇文章后再看看,这篇文章的目的在于希望读完后能够清晰明了地了解 Kotlin 这部分的原理,提高效率。Kotlin 协程系列:

Kotlin 由于本身灵活的语法和特性,导致有些时候跟踪它的源码时,容易跟着跟着就迷路了,记得我刚开始尝试阅读协程源码的时候,也是头大了一圈,后面 Kotlin 用的看的多了,现在再阅读就显得轻松了不少。

前置知识

在阅读 Kotlin 源码之前,可以先了解一些前置知识。

Function

Function 是 Kotlin 对函数类型的封装,对于函数类型,它会被编译成 FunctionX 系列的类:

1
2
3
4
5
6
7
8
9
10
11
// 0 个参数
public interface Function0<out R> : Function<R> {
public operator fun invoke(): R
}

// 1 个参数
public interface Function1<in P1, out R> : Function<R> {
public operator fun invoke(p1: P1): R
}

// X 个参数

Kotlin 提供了从 Function0 到 Function22 之间的接口,这意味着我们的 lambda 函数最多可以支持 22 个参数,另外 Function 接口有一个 invoke 操作符重载,因此我们可以直接通过 () 调用 lambda 函数:

1
2
3
4
5
val sum = { a: Int, b: Int ->
a + b
}
sum(10, 12)
sum.invoke(10, 12)

编译成 Java 代码后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Function2 sum = (Function2)null.INSTANCE;
sum.invoke(10, 12);
sum.invoke(10, 12);

// lambda 编译后的类
final class KotlinTest$main$sum$1 extends Lambda implements Function2<Integer, Integer, Integer> {
public static final KotlinTest$main$sum$1 INSTANCE = new KotlinTest$main$sum$1();

KotlinTest$main$sum$1() {
super(2);
}

@Override // kotlin.jvm.functions.Function2
public /* bridge */ /* synthetic */ Integer invoke(Integer num, Integer num2) {
return invoke(num.intValue(), num2.intValue());
}

public final Integer invoke(int a, int b) {
return Integer.valueOf(a + b);
}
}

可以看到对于 lambda 函数,在编译后会生成一个实现 Function 接口的类,并在使用 lambda 函数时创建一个单例对象来调用,创建对象的过程是编译器自动生成的代码

而对于协程里的 lambda 代码块,也会为其创建一个对象,它实现 FunctionX 接口,并继承 SuspendLambda 类,不一样的地方在于它会自动增加一个 Continuation 类型的参数。

Continuation Passing Style(CPS)

Continuation Passing Style(续体传递风格): 约定一种编程规范,函数不直接返回结果值,而是在函数最后一个参数位置传入一个 callback 函数参数,并在函数执行完成时通过 callback 来处理结果。回调函数 callback 被称为续体(Continuation),它决定了程序接下来的行为,整个程序的逻辑通过一个个 Continuation 拼接在一起。

Kotlin 协程本质就是利用 CPS 来实现对过程的控制,并解决了 CPS 会产生的问题(如回调地狱,栈空间占用)

  • Kotlin suspend 挂起函数写法与普通函数一样,但编译器会对 suspend 关键字的函数做 CPS 变换,这就是咱们常说的用看起来同步的方式写出异步的代码,消除回调地狱(callback hell)。
  • 另外为了避免栈空间过大的问题, Kotlin 编译器并没有把代码转换成函数回调的形式,而是利用状态机模型。每两个挂起点之间可以看为一个状态,每次进入状态机时都有一个当前的状态,然后执行该状态对应的代码;如果程序执行完毕则返回结果值,否则返回一个特殊值,表示从这个状态退出并等待下次进入。相当于创建了一个可复用的回调,每次都使用这同一个回调,根据不同状态来执行不同的代码。

Continuation

Kotlin 续体有两个接口: Continuation 和 CancellableContinuation, 顾名思义 CancellableContinuation 是一个可以取消的 Continuation。

Continuation 成员

  • val context: CoroutineContext: 当前协程的 CoroutineContext 上下文
  • fun resumeWith(result: Result<T>): 传递 result 恢复协程

CancellableContinuation 成员

  • isActive, isCompleted, isCancelled: 表示当前 Continuation 的状态
  • fun cancel(cause: Throwable? = null): 可选通过一个异常 cause 来取消当前 Continuation 的执行

可以将 Continuation 看成是在挂起点恢复后需要执行的代码封装(通过之前的文章可以知道是通过状态机实现的),比如说对如下逻辑:

1
2
3
4
5
6
7
8
9
suspend fun request() = suspendCoroutine<Response> {
val response = doRequest()
it.resume(response)
}

fun test() = runBlocking {
val response = request()
handle(response)
}

用下面的伪代码简单描述 Continuation 的工作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 假装是 Continuation 接口
interface Continuation<T> {
fun resume(t: T)
}

fun request(continuation: Continuation<Response>) {
val response = doRequest()
continuation.resume(response)
}

fun test() {
request(object :Continuation<Response>{
override fun resume(response: Response) {
handle(response)
}
})
}

对于 suspend 关键词修饰的挂起函数,编译器会为其增加一个 Continuation 续体类型的参数(相当于 CPS 中的回调),可以通过这个 Continuation 续体对象的 resume 方法返回结果值来恢复协程的执行

协程创建与启动

SuspendLambda

Kotlin 编译时会将 lambda 协程代码块编译成 SuspendLambda 的子类:

1
2
3
4
5
6
7
fun main() {
GlobalScope.launch {
val id = getId()
val avatar = getAvatar(id)
println("${Thread.currentThread().name} - $id - $avatar")
}
}

对应的字节码可以看到:

1
final class Main$main$1 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function2

SuspendLambda 实现了 Continuation 续体接口,其 resume 方法可以恢复协程的执行;另外它将协程体封装成 SuspendLambda 对象,其内以状态机的形式消除回调地狱,并实现逻辑的顺序执行

继承关系

1
2
3
4
5
- Continuation: 续体,恢复协程的执行
- BaseContinuationImpl: 实现 resumeWith(Result) 方法,控制状态机的执行,定义了 invokeSuspend 抽象方法
- ContinuationImpl: 增加 intercepted 拦截器,实现线程调度等
- SuspendLambda: 封装协程体代码块
- 协程体代码块生成的子类: 实现 invokeSuspend 方法,其内实现状态机流转逻辑

这下子,是不是就清晰了许多?那我们接下来看协程是怎么开始启动的。

协程启动流程

CoroutineScope.launch

CoroutineScope.launch 开始跟踪协程启动流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
// newContext = scope作用域上下文 + context参数上下文 + Dispatchers.Default(未指定则添加)
val newContext = newCoroutineContext(context)
// 创建协程对象
val coroutine = if (start.isLazy) {
LazyStandaloneCoroutine(newContext, block)
} else {
StandaloneCoroutine(newContext, active = true)
}
// 启动协程
coroutine.start(start, coroutine, block)
return coroutine
}

// 启动协程
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}

上面 coroutine.start 的调用涉及到运算符重载,实际上会调到 CoroutineStart.invoke() 方法:

1
2
3
4
5
6
7
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}

我们可以注意下 completion 参数,它是一个续体 Continuation 类型,此时传入的实参为 StandaloneCoroutine/LazyStandaloneCoroutine 对象,在协程体的逻辑执行完后会调用到其 resume 方法(CPS),做一些收尾工作,比如说修改状态等

此时 receiver 和 completion 都是 launch() 中创建的 StandaloneCoroutine 协程对象。接着往下看:

1
2
3
4
5
6
7
8
9
10
11
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) = runSafely(completion) {
// 重新创建 SuspendLambda 子类对象
createCoroutineUnintercepted(receiver, completion)
// 调用拦截器逻辑,进行线程调度等
.intercepted()
// 真正执行协程逻辑
.resumeCancellableWith(Result.success(Unit), onCancellation)
}

创建SuspendLambda

看看上面 createCoroutineUnintercepted 中的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(receiver, probeCompletion)
else {
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
}
}
}

我们在前面说过,这个协程体会被编译成 SuspendLambda 的子类,其也是 BaseContinuationImpl 的子类对象,因此会走上面的 create() 方法,通过 completion 续体参数创建一个新的 SuspendLambda 对象,这是之前说的 协程的三层包装 里的第二层包装,它持有的 completion 对象是第一层封装(AbstractCoroutine)。

所以在协程启动过程中针对一个协程体会创建两个 SuspendLambda 的子类对象:

  1. 调用 launch() 时创建第一个,传入 null 作为参数,作为一个普通的 Function 对象使用
  2. 调用 create() 时创建第二个,传入 completion 续体作为参数
1
BuildersKt.launch$default(/*...*/ (Function2)(new Function2((Continuation)null))

线程调度

接着调用 SuspendLambda.intercepted() 方法执行拦截器逻辑,从上下文中获取拦截器(Dispatcher调度器)拦截当前 continuation 对象,将其包装成 DispatchedContinuation 类型,这就是协程的第三层包装,封装了线程调度等逻辑,其 continuation 参数就是第二层包装(SuspendLambda)实例。

关于线程调度的具体逻辑,后面再单独写篇文章整理,此处略过。

启动协程

在通过 SuspendLambda 对象创建了 DispatchedContinuation 续体后,接着执行其 resumeCancellableWith() 方法,具体执行代码不贴出了,最终会调用到 continuation.resumeWith(result) 方法,而这个 continuation 就是之前传入的第二层封装 SuspendLambda 对象,其 resumeWith() 方法在父类 BaseContinuationImpl 中:

1
2
3
4
5
6
// BaseContinuationImpl
public final override fun resumeWith(result: Result<Any?>) {
// ...
val outcome = invokeSuspend(param)
// ...
}

上面的 invokeSuspend() 是一个抽象方法,它的实现在编译器生成的 SuspendLambda 子类中,具体逻辑是通过状态机来执行协程体中的逻辑,具体见下章解析。

到这里我们 launch() 里的协程体逻辑就开始真正执行了。

协程挂起与恢复

协程的启动,挂起和恢复有两个关键方法: invokeSuspend()resumeWith(Result)invokeSuspend() 方法是对协程代码块的封装,内部加入状态机机制将整个逻辑分为多块,分隔点就是每个挂起点。协程启动时会先调用一次 invokeSuspend() 函数触发协程体的开始执行,后面每当调用到一个挂起函数时,挂起函数会返回 COROUTINE_SUSPENDED 标识,从而 return 停掉 invokeSuspend() 函数的执行,即非阻塞挂起。编译器会为挂起函数自动添加一个 continuation 续体对象参数,表示调用它的那个协程代码块,在该挂起函数执行完成后,就会调用到续体 continuation.resumeWith() 方法来返回结果(或异常),而在 resumeWith() 中又调用了 invokeSuspend() 方法,其内根据状态机的状态来恢复协程的执行。这就是整个协程的挂起和恢复过程。

接下来看具体解析。

协程的状态机

在之前 协程的状态机 一文里曾经分析过协程的状态机,并且贴出了对应的 Java 代码,分析其状态的流转过程,这次换个思路来看看,对如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun main() = CoroutineScope(Dispatchers.Main).launch {
println("label 0")
val isLogin = checkLogin() // suspend

println("label 1")
println(isLogin)
val login = login() // suspend

println("label 2")
println(login)
val id = getId() // suspend

println("label 3")
println(id)
}

对于协程体中的代码,首个挂起点前的代码可看为初始状态, 其后每两个挂起点之间都是一个新的状态,最后一个挂起点到结束是最终的状态。其对应的状态机伪代码如下,协程体被编译成 SuspendLambda 子类,它实现父类中的 invokeSuspend() 方法,是协程的真正执行逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
final class KotlinTest$main$1 extends SuspendLambda implements Function2 {
int label = 0; // 状态码

public final Object invokeSuspend(Object result) {
switch(this.label) {
case 0:
println("label 0");
label = 1;
result = checkLogin(this); // this 是编译器添加的续体参数
if (result == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
break;
case 1:
// 此时传入的 result 是 checkLogin() 的结果
println("label 1")
val isLogin = result;
println(isLogin)
label = 2;
result = login(this); // this 是编译器添加的续体参数
if (result == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
break;
case 2:
// 此时传入的 result 是 login() 的结果
println("label 2")
val login = result;
println(login)
label = 3;
result = getId(this); // this 是编译器添加的续体参数
if (result == COROUTINE_SUSPENDED) {
return COROUTINE_SUSPENDED;
}
break;
case 3:
// 此时传入的 result 是 getId() 的结果
println("label 3")
val id = result;
println(id)
return;
}
}
}

看上面每次调用 suspend 函数时都会传一个 this 参数(continuation),这个参数是编译器添加的续体参数,表示的是协程体自身,在 suspend 挂起函数执行完毕后会调用 continuation.resumeWith() -> invokeSuspend(result) 来恢复该状态机的执行。

协程挂起

上面给出了协程体 SuspendLambda.invokeSuspend() 方法的状态机伪代码,那再看下 SuspendLambda 父类 BaseContinuationImpl 中的 resumeWith() 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
with(current) {
val outcome: Result<Any?> = try {
// invokeSuspend() 执行续体下一个状态的逻辑
val outcome = invokeSuspend(param)
// 如果续体里调用到了挂起函数,则直接 return
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
// 对于 launch 启动的协程体,传入的 completion 是 AbstractCoroutine 子类对象
completion.resumeWith(outcome)
return
}
}
}
}
}

我们说过协程启动后会调用到上面这个 resumeWith() 方法,接着调用其 invokeSuspend() 方法:

  1. 当 invokeSuspend() 返回 COROUTINE_SUSPENDED 后,就直接 return 终止执行了,此时协程被挂起。
  2. 当 invokeSuspend() 返回非 COROUTINE_SUSPENDED 后,说明协程体执行完毕了,对于 launch 启动的协程体,传入的 completion 是 AbstractCoroutine 子类对象,最终会调用其 AbstractCoroutine.resumeWith() 方法做一些状态改变之类的收尾逻辑。至此协程便执行完毕了。

协程恢复

这里我们接着看上面第一条:协程执行到挂起函数被挂起后,当这个挂起函数执行完毕后是怎么恢复协程的,以下面挂起函数为例:

1
2
3
4
private suspend fun login() = withContext(Dispatchers.IO) {
Thread.sleep(1000)
return@withContext true
}

通过反编译可以看到上面挂起函数中的函数体也被编译成了 SuspendLambda 的子类,创建其实例时也需要传入 Continuation 续体参数(调用该挂起函数的协程所在续体)。贴下 withContext 的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// compute new context
val oldContext = uCont.context
val newContext = oldContext + context
// always check for cancellation of new context
newContext.ensureActive()
// FAST PATH #1 -- new context is the same as the old one
if (newContext === oldContext) {
val coroutine = ScopeCoroutine(newContext, uCont)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
// FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont)
// There are changes in the context, so this thread needs to be updated
withCoroutineContext(newContext, null) {
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
// SLOW PATH -- use new dispatcher
val coroutine = DispatchedCoroutine(newContext, uCont)
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
}

首先调用了 suspendCoroutineUninterceptedOrReturn 方法,看注释知道可以通过它来获取到当前的续体对象 uCont, 接着有几条分支调用,但最终都是会通过续体对象来创建挂起函数体对应的 SuspendLambda 对象,并执行其 invokeSuspend() 方法,在其执行完毕后调用 uCont.resume() 来恢复协程,具体逻辑大家感兴趣可以自己跟代码,与前面大同小异

至于其他的顶层挂起函数如 await(), suspendCoroutine(), suspendCancellableCoroutine() 等,其内部也是通过 suspendCoroutineUninterceptedOrReturn() 来获取到当前的续体对象,以便在挂起函数体执行完毕后,能通过这个续体对象恢复协程执行。

协程库没有直接提供创建续体对象的方式,一般都是通过 suspendCoroutineUninterceptedOrReturn() 函数获取的,感兴趣的同学可以看看这个方法的注释: Obtains the current continuation instance inside suspend functions and either suspends currently running coroutine or returns result immediately without suspension...

总结

Kotlin 协程本质就是利用 CPS 来实现对过程的控制,并解决了 CPS 会产生的问题(如回调地狱,栈空间占用)。

Kotlin suspend 挂起函数写法与普通函数一样,但编译器会对 suspend 关键字的函数做 CPS 变换;Kotlin 编译器并没有把代码转换成函数回调的形式,而是利用状态机模型,消除 callback hell, 解决栈空间占用问题。

即将协程代码块编译成 SuspendLambda 子类,实现 invokeSuspend() 方法。

invokeSuspend() 方法是对协程代码块的封装,内部加入状态机机制将整个逻辑分为多块,分隔点就是每个挂起点。协程启动时会先调用一次 invokeSuspend() 函数触发协程体的开始执行,后面每当调用到一个挂起函数时,挂起函数会返回 COROUTINE_SUSPENDED 标识,从而 return 停掉 invokeSuspend() 函数的执行,即非阻塞挂起。编译器会为挂起函数自动添加一个 continuation 续体对象参数,表示调用它的那个协程代码块,在该挂起函数执行完成后,就会调用到续体 continuation.resumeWith() 方法来返回结果(或异常),而在 resumeWith() 中又调用了 invokeSuspend() 方法,其内根据状态机的状态来恢复协程的执行。

Kotlin 协程中存在三层包装,每层包装都持有上层包装的引用,用来执行其 resumeWith() 方法做一些处理:

  • 第一层包装: launch & async 返回的 Job, Deferred 继承自 AbstractCoroutine, 里面封装了协程的状态,提供了 cancel 等接口;
  • 第二层包装: 编译器生成的 SuspendLambda 子类,封装了协程的真正执行逻辑,其继承关系为 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl, 它的 completion 参数就是第一层包装实例;
  • 第三层包装: DispatchedContinuation, 封装了线程调度逻辑,它的 continuation 参数就是第二层包装实例。

这三层包装都实现了 Continuation 续体接口,通过代理模式将协程的各层包装组合在一起,每层负责不同的功能。

下图的 resumeWith() 可能表示 resume(), 也可能表示 resumeCancellableWith() 等系列方法:

Kotlin协程