0%

Kotlin笔记之协程异常与取消

父子协程

协程之间存在父子关系,以下面代码为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}

可知在 newCoroutineContext 中,新的协程会使用之前 CoroutineScope 的 coroutineContext 结合 launch 传入的 context, 然后调用 coroutine.start(start, coroutine, block):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public abstract class AbstractCoroutine<in T>(
@JvmField
protected val parentContext: CoroutineContext, active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
// ...
}

internal fun initParentJob() {
initParentJobInternal(parentContext[Job])
}
}

public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {
internal fun initParentJobInternal(parent: Job?) {
assert { parentHandle == null }
if (parent == null) {
parentHandle = NonDisposableHandle
return
}
parent.start() // make sure the parent is started
val handle = parent.attachChild(this)
parentHandle = handle
// now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
if (isCompleted) {
handle.dispose()
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
}
}

这里看下 GlobalScope 代码:

1
2
3
4
public object GlobalScope : CoroutineScope {
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}

其 coroutineContext 是一个 EmptyCoroutineContext, 因此 parentContext[Job] 是空的,所以 GlobalScope.launch{}GlobalScope.async{} 新建的协程没有父协程。

接下来重点在于 parent.attachChild 方法:

1
2
3
public final override fun attachChild(child: ChildJob): ChildHandle {
return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle
}

invokeOnCompletion 方法在之前已经看过,具体源码不给出了,主要是将 handler 节点添加到父协程的一个队列(state.list)中

协程完成

前面讲过协程的完成通过 AbstractCoroutine.resumeWith 实现,它会这样调用: makeCompletingOnce -> tryMakeCompleting -> tryMakeCompletingSlowPath -> tryWaitForChild:

1
2
3
4
5
6
7
8
9
private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean {
val handle = child.childJob.invokeOnCompletion(
invokeImmediately = false,
handler = ChildCompletion(this, state, child, proposedUpdate).asHandler
)
if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
val nextChild = child.nextChild() ?: return false
return tryWaitForChild(state, nextChild, proposedUpdate)
}

可知 tryWaitForChild 方法将 ChildCompletion 节点添加到了子协程的 state.list 队列中,当子协程完成或者取消时调用 ChildCompletion.invoke:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
override fun invoke(cause: Throwable?) {
parent.continueCompleting(state, child, proposedUpdate)
}

private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) {
assert { this.state === state } // consistency check -- it cannot change while we are waiting for children
// figure out if we need to wait for next child
val waitChild = lastChild.nextChild()
// try wait for next child
if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
// no more children to wait -- try update state
val finalState = finalizeFinishingState(state, proposedUpdate)
afterCompletion(finalState)
}

因此可以知道,父协程需要等待所有子协程处于完成或者取消状态才能完成自身

协程取消

先看看协程的 cancel 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public fun cancel(cause: CancellationException? = null)

// JobSupport
public override fun cancel(cause: CancellationException?) {
cancelInternal(cause ?: defaultCancellationException())
}

internal inline fun defaultCancellationException(message: String? = null, cause: Throwable? = null) =
JobCancellationException(message ?: cancellationExceptionMessage(), cause, this)

public open fun cancelInternal(cause: Throwable) {
cancelImpl(cause)
}

internal fun cancelImpl(cause: Any?): Boolean {
var finalState: Any? = COMPLETING_ALREADY
if (onCancelComplete) {
// make sure it is completing, if cancelMakeCompleting returns state it means it had make it
// completing and had recorded exception
finalState = cancelMakeCompleting(cause)
if (finalState === COMPLETING_WAITING_CHILDREN) return true
}
if (finalState === COMPLETING_ALREADY) {
finalState = makeCancelling(cause)
}
return when {
finalState === COMPLETING_ALREADY -> true
finalState === COMPLETING_WAITING_CHILDREN -> true
finalState === TOO_LATE_TO_CANCEL -> false
else -> {
afterCompletion(finalState)
true
}
}
}

接着会调用到 notifyCancelling 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// 这里的 list 即是上面添加了 handler 的 list
private fun notifyCancelling(list: NodeList, cause: Throwable) {
// first cancel our own children
onCancelling(cause)
// 会循环执行上面添加的 ChildHandleNode 的 invoke 方法
notifyHandlers<JobCancellingNode<*>>(list, cause)
// 可能取消父协程
cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
}

internal class ChildHandleNode(
parent: JobSupport,
@JvmField val childJob: ChildJob
) : JobCancellingNode<JobSupport>(parent), ChildHandle {
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
// ...
}

// 子协程通过该方法取消自己
public final override fun parentCancelled(parentJob: ParentJob) {
cancelImpl(parentJob)
}

// The method that is invoked when the job is cancelled to possibly propagate cancellation to the parent.
// Returns `true` if the parent is responsible for handling the exception, `false` otherwise.
// never returns `false` for instances of [CancellationException], otherwise such exception
// may leak to the [CoroutineExceptionHandler].
private fun cancelParent(cause: Throwable): Boolean {
// isScopedCoroutine 为 true 则不传播且不取消父协程直接返回,默认为false,子类可以重写
if (isScopedCoroutine) return true

/* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
* This allow parent to cancel its children (normally) without being cancelled itself, unless
* child crashes and produce some other exception during its completion.
*/
val isCancellation = cause is CancellationException
val parent = parentHandle
// No parent -- ignore CE, report other exceptions.
if (parent === null || parent === NonDisposableHandle) {
return isCancellation
}

// 调用之前的ChildHandleNode.childCancelled
return parent.childCancelled(cause) || isCancellation
}

internal class ChildHandleNode(
parent: JobSupport,
@JvmField val childJob: ChildJob
) : JobCancellingNode<JobSupport>(parent), ChildHandle {
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
// ...
}

// JobSupport
// Child was cancelled with a cause. In this method parent decides whether it cancels itself
// It is overridden in supervisor implementations to completely ignore any child cancellation.
// Returns `true` if exception is handled, `false` otherwise (then caller is responsible for handling an exception)
public open fun childCancelled(cause: Throwable): Boolean {
if (cause is CancellationException) return true
return cancelImpl(cause) && handlesException
}

private class SupervisorCoroutine<in T>(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
// supervisorScope 启动的协程调用 cancel 和传递异常时,只能由父协程向子协程传播,
// 不会取消父协程
override fun childCancelled(cause: Throwable): Boolean = false
}

因此,协程调用 cancel 时会取消它的所有子协程,默认不会取消它的父协程

协程的取消只是在第一层包装 AbstractCoroutine 中修改协程的状态,不会影响到第二层包装 BaseContinuationImpl 中的执行逻辑,即协程的取消只是修改状态,不会取消协程的实际执行逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
GlobalScope.launch {
println("1")
val job = launch {
println("2")
try {
delay(500)
} catch (e: CancellationException) {
println("exception")
}
println("3")
if (isActive) {
println("4")
}
delay(500) // 抛出 CancellationException
println("5")
}
delay(100)
job.cancel()
}
// output
1
2
exception
3

异常处理

异常处理

协程构建器有两种形式:自动传播异常(launch 与 actor)或向用户暴露异常(async 与 produce)。当这些构建器用于创建一个根协程时,即该协程不是另一个协程的子协程,前者这类构建器将异常视为未捕获异常,类似 Java 的 Thread.uncaughtExceptionHandler,而后者则依赖用户来最终消费异常,例如通过 await 或 receive。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
fun log(obj: Any) {
println("${Thread.currentThread()}: $obj")
}

fun main() = runBlocking {
val job = GlobalScope.launch { // launch 根协程
log("Throwing exception from launch")
throw IndexOutOfBoundsException() // 我们将在控制台打印 Thread.defaultUncaughtExceptionHandler
}
job.join()
log("Joined failed job")
val deferred = GlobalScope.async { // async 根协程
log("Throwing exception from async")
throw ArithmeticException() // 没有打印任何东西,依赖用户去调用等待
}
try {
deferred.await()
log("Unreached")
} catch (e: ArithmeticException) {
log("Caught ArithmeticException")
}
}

// output
Thread[DefaultDispatcher-worker-1,5,main]: Throwing exception from launch
Exception in thread "DefaultDispatcher-worker-1" java.lang.IndexOutOfBoundsException
// ...
Thread[main,5,main]: Joined failed job
Thread[DefaultDispatcher-worker-1,5,main]: Throwing exception from async
Thread[main,5,main]: Caught ArithmeticException

可以使用 CoroutineExceptionHandler 来捕获 launch 的异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception")
}
val job = GlobalScope.launch(handler) { // 根协程,运行在 GlobalScope 中
throw AssertionError()
}
val deferred = GlobalScope.async(handler) { // 同样是根协程,但使用 async 代替了 launch
throw ArithmeticException() // 没有打印任何东西,依赖用户去调用 deferred.await()
}
joinAll(job, deferred)
}

// output
CoroutineExceptionHandler got java.lang.AssertionError

协程内部使用 CancellationException 来进行取消,这个异常会被所有的处理者忽略。如果一个协程遇到了 CancellationException 以外的异常,默认情况下它将使用该异常取消它的父协程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
fun main() = runBlocking {
coroutineScope {
log(0)
launch {
log(1)
throw Exception()
}
log(2)
delay(100)
log(3)
log(isActive)
}
}

// output
Thread[main,5,main]: 0
Thread[main,5,main]: 2
Thread[main,5,main]: 1
Exception in thread "main" java.lang.Exception

fun main() = runBlocking {
supervisorScope {
log(0)
launch {
log(1)
throw Exception()
}
log(2)
delay(100)
log(3)
log(isActive)
}
}

// output
Thread[main,5,main]: 0
Thread[main,5,main]: 2
Thread[main,5,main]: 1
Exception in thread "main" java.lang.Exception
Thread[main,5,main]: 3
Thread[main,5,main]: true

fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception")
}
val job = GlobalScope.launch(handler) {
val inner = launch { // 该栈内的协程都将被取消
launch {
launch {
throw IOException() // 原始异常
}
sleep(100)
println(isActive)
}
sleep(100)
println(isActive)
}
println("before join")
try {
inner.join()
println("join")
} catch (e: CancellationException) {
println("Rethrowing CancellationException with original cause")
// 如果不重写抛出取消异常,则下面的"after join"会被执行打印
throw e // 取消异常被重新抛出,但原始 IOException 得到了处理
}
println("after join")
}
job.join()
}

// output
before join
Rethrowing CancellationException with original cause
false
false
CoroutineExceptionHandler got java.io.IOException

当父协程的所有子协程都结束后,原始的异常才会被父协程处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception")
}
val job = GlobalScope.launch(handler) {
launch { // 第一个子协程
try {
delay(Long.MAX_VALUE)
} finally {
withContext(NonCancellable) {
println("Children are cancelled, but exception is not handled until all children terminate")
delay(100)
println("The first child finished its non cancellable block")
}
}
}
launch { // 第二个子协程
delay(10)
println("Second child throws an exception")
throw ArithmeticException()
}
}
job.join()
}

// output
Second child throws an exception
Children are cancelled, but exception is not handled until all children terminate
The first child finished its non cancellable block
CoroutineExceptionHandler got java.lang.ArithmeticException

当协程的多个子协程因异常而失败时,一般规则是“取第一个异常”,因此将处理第一个异常。在第一个异常之后发生的所有其他异常都作为被抑制的异常绑定至第一个异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}")
}
val job = GlobalScope.launch(handler) {
launch {
try {
delay(Long.MAX_VALUE) // 当另一个同级的协程因 IOException 失败时,它将被取消
} finally {
throw ArithmeticException() // 第二个异常
}
}
launch {
delay(100)
throw IOException() // 首个异常
}
delay(Long.MAX_VALUE)
}
job.join()
}

// output
CoroutineExceptionHandler got java.io.IOException with suppressed [java.lang.ArithmeticException]

源码解读

在 BaseContinuationImpl.resumeWith 方法中会 catch 协程抛出的异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
while (true) {
// ...
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
// 子协程抛出异常时,在这里捕获并作为结果给 outcome
Result.failure(exception)
}
if (completion is BaseContinuationImpl) {
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
}

在捕获了异常后,调用 AbstractCoroutine.resumeWith 来处理,其流程为: AbstractCoroutine.resumeWith -> JobSupport.makeCompletingOnce -> JobSupport.tryMakeCompleting -> JobSupport.tryMakeCompletingSlowPath:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final override fun resumeWith(result: Result<T>) {
// 通过 result 获取 state
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}

private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
val finishing = state as? Finishing ?: Finishing(list, false, null)
var notifyRootCause: Throwable? = null
synchronized(finishing) {
if (finishing.isCompleting) return COMPLETING_ALREADY
finishing.isCompleting = true
if (finishing !== state) {
if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY
}
val wasCancelling = finishing.isCancelling
(proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) }
// If it just becomes cancelling --> must process cancelling notifications
notifyRootCause = finishing.rootCause.takeIf { !wasCancelling }
}
// process cancelling notification here -- it cancels all the children _before_ we start to to wait them (sic!!!)
notifyRootCause?.let { notifyCancelling(list, it) }
val child = firstChild(state) // now wait for children
if (child != null && tryWaitForChild(finishing, child, proposedUpdate)) return COMPLETING_WAITING_CHILDREN
// otherwise -- we have not children left (all were already cancelled?)
return finalizeFinishingState(finishing, proposedUpdate)
}

可知当协程抛出异常后,会调用 notifyCancelling 方法,该方法在上面已经解析过了。

因此,当协程发生异常时会取消它的所有子协程,默认会取消它的父协程。

接下来看看 finalizeFinishingState 方法:

1
2
3
4
5
6
7
8
private fun finalizeFinishingState(state: Finishing, proposedUpdate: Any?): Any? {
// ...
if (finalException != null) {
val handled = cancelParent(finalException) || handleJobException(finalException)
if (handled) (finalState as CompletedExceptionally).makeHandled()
}
// ...
}

cancelParent 的逻辑在之前看过,这里可以看出如果协程抛出未捕获的非取消异常,则会一步步取消上层的协程,最后根协程调用 handleJobException 处理异常:

1
2
3
4
5
6
7
8
9
10
11
12
// JobSupport
protected open fun handleJobException(exception: Throwable): Boolean = false

private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
}
}

可以看到 handleJobException 默认是空的,而 DeferredCoroutine 也没有重写这个方法,StandaloneCoroutine 中重写了。因此如果根协程为 async 启动的协程,则不会 crash,而 launch 式协程则会调用 handleCoroutineException 处理异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
try {
// 定义了 CoroutineExceptionHandler 则由它处理
context[CoroutineExceptionHandler]?.let {
it.handleException(context, exception)
return
}
} catch (t: Throwable) {
handleCoroutineExceptionImpl(context, handlerException(exception, t))
return
}
// If a handler is not present in the context or an exception was thrown, fallback to the global handler
handleCoroutineExceptionImpl(context, exception)
}

// 根据 ServiceLoader, 在 Android 平台中还有 AndroidExceptionPreHandler 处理异常
private val handlers: List<CoroutineExceptionHandler> = ServiceLoader.load(
CoroutineExceptionHandler::class.java,
CoroutineExceptionHandler::class.java.classLoader
).iterator().asSequence().toList()

internal actual fun handleCoroutineExceptionImpl(context: CoroutineContext, exception: Throwable) {
// use additional extension handlers
for (handler in handlers) {
try {
handler.handleException(context, exception)
} catch (t: Throwable) {
// Use thread's handler if custom handler failed to handle exception
val currentThread = Thread.currentThread()
currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, handlerException(exception, t))
}
}
// use thread's handler
val currentThread = Thread.currentThread()
currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
}

// Thread
public UncaughtExceptionHandler getUncaughtExceptionHandler() {
// uncaughtExceptionHandler 为 null 则使用线程组
// private ThreadGroup group;
return uncaughtExceptionHandler != null ? uncaughtExceptionHandler : group;
}

// ThreadGroup
public void uncaughtException(Thread t, Throwable e) {
if (parent != null) {
首先交给父进程组处理
parent.uncaughtException(t, e);
} else {
// 优先使用线程通用的 DefaultUncaughtExceptionHandler, 否则在控制台打印异常堆栈信息
Thread.UncaughtExceptionHandler ueh = Thread.getDefaultUncaughtExceptionHandler();
if (ueh != null) {
ueh.uncaughtException(t, e);
} else if (!(e instanceof ThreadDeath)) {
System.err.print("Exception in thread \"" + t.getName() + "\" ");
e.printStackTrace(System.err);
}
}
}

总结

  • 父协程需要等待所有子协程处于完成或者取消状态才能完成自身。
  • 协程调用 cancel 时会取消它的所有子协程,默认不会取消它的父协程(被取消的协程会在挂起点抛出 CancellationException 异常且它会被协程的机制所忽略,当前协程不会再继续执行)。
  • 当协程发生未捕获的异常时会取消它的所有子协程,且不会接着执行自身,可能会取消它的父协程(默认会取消),默认会一步步取消上层的协程,最后取消根协程,且根协程处理异常。当异常属于 CancellationException 或者使用了 SupervisorJob 和 supervisorScope 时,不会影响父协程,原理是重写了 childCancelled 方法。
  • launch 协程和 actor 协程默认处理异常的方式在 JVM 中是打印堆栈信息,在 Android 中会崩溃(参考Android异常机制),可以自定义 CoroutineExceptionHandler 来处理异常。
  • async 协程本身不会处理异常,自定义 CoroutineExceptionHandler 无效,但是会在 await 恢复调用者协程时抛出异常。