Kotlin Coroutines(协程) 完全解析(三),封装异步回调、协程间关系及协程的取消

Kotlin Coroutines(协程) 完全解析系列:

Kotlin Coroutines(协程) 完全解析(一),协程简介

Kotlin Coroutines(协程) 完全解析(二),深入理解协程的挂起、恢复与调度

Kotlin Coroutines(协程) 完全解析(三),封装异步回调、协程间关系及协程的取消

Kotlin Coroutines(协程) 完全解析(四),协程的异常处理

Kotlin Coroutines(协程) 完全解析(五),协程的并发

本文基于 Kotlin v1.3.0-rc-146,Kotlin-Coroutines v1.0.0-RC1

前面两篇文章解析了挂起函数通过状态机来实现,协程的本质就是有三层包装的Continuation,这篇文章进一步解析协程的运用。主要介绍如何将异步回调封装为挂起函数,解析协程之间的关系以及协程的取消。

1. 封装异步回调为挂起函数

在异步编程中,回调是非常常见的写法,那么如何将回调转换为协程中的挂起函数呢?可以通过两个挂起函数suspendCoroutine{}suspendCancellableCoroutine{},下面看如何将 OkHttp 的网络请求转换为挂起函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
suspend fun <T> Call<T>.await(): T = suspendCoroutine { cont ->
enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>, response: Response<T>) {
if (response.isSuccessful) {
cont.resume(response.body()!!)
} else {
cont.resumeWithException(ErrorResponse(response))
}
}
override fun onFailure(call: Call<T>, t: Throwable) {
cont.resumeWithException(t)
}
})
}

上面的await()的扩展函数调用时,首先会挂起当前协程,然后执行enqueue将网络请求放入队列中,当请求成功时,通过cont.resume(response.body()!!)来恢复之前的协程。

再来看下suspendCoroutine{}suspendCancellableCoroutine{}的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T =
suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
val safe = SafeContinuation(c.intercepted())
block(safe)
safe.getOrThrow()
}
public suspend inline fun <T> suspendCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
// 和 suspendCoroutine 的区别就在这里,如果协程已经被取消或者已完成,就会抛出 CancellationException 异常
cancellable.initCancellability()
block(cancellable)
cancellable.getResult()
}

它们的关键实现都是调用suspendCoroutineUninterceptedOrReturn()函数,它的作用是获取当前协程的实例,并且挂起当前协程或者不挂起直接返回结果。

协程中还有两个常见的挂起函数使用到了suspendCoroutineUninterceptedOrReturn()函数,分别是delay()yield()

1.1 delay 的实现

1
2
3
4
5
6
7
8
9
10
11
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
/** Returns [Delay] implementation of the given context */
internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay
internal actual val DefaultDelay: Delay = DefaultExecutor

delay 使用suspendCancellableCoroutine挂起协程,而协程恢复的一般情况下是关键在DefaultExecutor.scheduleResumeAfterDelay(),其中实现是schedule(DelayedResumeTask(timeMillis, continuation)),其中的关键逻辑是将 DelayedResumeTask 放到 DefaultExecutor 的队列最后,在延迟的时间到达就会执行 DelayedResumeTask,那么该 task 里面的实现是什么:

1
2
3
4
override fun run() {
// 直接在调用者线程恢复协程
with(cont) { resumeUndispatched(Unit) }
}

1.2 yield 的实现

yield()的作用是挂起当前协程,然后将协程分发到 Dispatcher 的队列,这样可以让该协程所在线程或线程池可以运行其他协程逻辑,然后在 Dispatcher 空闲的时候继续执行原来协程。简单的来说就是让出自己的执行权,给其他协程使用,当其他协程执行完成或也让出执行权时,一开始的协程可以恢复继续运行。

看下面的代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fun main(args: Array<String>) = runBlocking<Unit> {
launch {
repeat(3) {
println("job1 repeat $it times")
yield()
}
}
launch {
repeat(3) {
println("job2 repeat $it times")
yield()
}
}
}

通过yield()实现 job1 和 job2 两个协程交替运行,输出如下:

1
2
3
4
5
6
job1 repeat 0 times
job2 repeat 0 times
job1 repeat 1 times
job2 repeat 1 times
job1 repeat 2 times
job2 repeat 2 times

现在来看其实现:

1
2
3
4
5
6
7
8
9
10
11
public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
val context = uCont.context
// 检测协程是否已经取消或者完成,如果是的话抛出 CancellationException
context.checkCompletion()
// 如果协程没有线程调度器,或者像 Dispatchers.Unconfined 一样没有进行调度,则直接返回
val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
if (!cont.dispatcher.isDispatchNeeded(context)) return@sc Unit
// dispatchYield(Unit) 最终会调用到 dispatcher.dispatch(context, block) 将协程分发到调度器队列中,这样线程可以执行其他协程
cont.dispatchYield(Unit)
COROUTINE_SUSPENDED
}

所以注意到,yield()需要依赖协程的线程调度器,而调度器再次执行该协程时,在第二篇中有讲过会调用resume来恢复协程运行。

现在来看封装异步逻辑为挂起函数的关键是用suspendCoroutineUninterceptedOrReturn函数包装,然后在异步逻辑完成时调用resume手动恢复协程。

2. 协程之间的关系

官方文档中有提到协程之间可能存在父子关系,取消父协程时,也会取消所有子协程。在Job的源码中有这样一段话描述协程间父子关系:

1
2
3
4
5
6
7
8
9
10
11
* A parent-child relation has the following effect:
*
* * Cancellation of parent with [cancel] or its exceptional completion (failure)
* immediately cancels all its children.
* * Parent cannot complete until all its children are complete. Parent waits for all its children to
* complete in _completing_ or _cancelling_ state.
* * Uncaught exception in a child, by default, cancels parent. In particular, this applies to
* children created with [launch][CoroutineScope.launch] coroutine builder. Note, that
* [async][CoroutineScope.async] and other future-like
* coroutine builders do not have uncaught exceptions by definition, since all their exceptions are
* caught and are encapsulated in their result.

所以协程间父子关系有三种影响:

  • 父协程手动调用cancel()或者异常结束,会立即取消它的所有子协程。

  • 父协程必须等待所有子协程完成(处于完成或者取消状态)才能完成。

  • 子协程抛出未捕获的异常时,默认情况下会取消其父协程。

下面先来看看协程是如何建立父子关系的,launchasync新建协程时,首先都是newCoroutineContext(context)新建协程的 CoroutineContext 上下文,下面看其具体细节:

1
2
3
4
5
6
7
8
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
// 新协程继承了原来 CoroutineScope 的 coroutineContext
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
// 当新协程没有指定线程调度器时,会默认使用 Dispatchers.Default
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}

所以新的协程的 CoroutineContext 都继承了原来 CoroutineScope 的 coroutineContext,然后launchasync新建协程最后都会调用start(start: CoroutineStart, receiver: R, block: suspend R.() -> T),里面第一行是initParentJob(),通过注释可以知道就是这个函数建立父子关系的,下面看其实现细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// AbstractCoroutine.kt
internal fun initParentJob() {
initParentJobInternal(parentContext[Job])
}
// JobSupport.kt
internal fun initParentJobInternal(parent: Job?) {
check(parentHandle == null)
if (parent == null) {
parentHandle = NonDisposableHandle
return
}
parent.start() // make sure the parent is started
@Suppress("DEPRECATION")
// 关键在于 parent.attachChild(this)
val handle = parent.attachChild(this)
parentHandle = handle
// now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
if (isCompleted) {
handle.dispose()
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
}

这里需要注意的是GlobalScope和普通协程的CoroutineScope的区别,GlobalScope的 Job 是为空的,GlobalScope.launch{}GlobalScope.async{}新建的协程是没有父协程的。

下面继续看attachChild的实现:

1
2
3
public final override fun attachChild(child: ChildJob): ChildHandle {
return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle
}

invokeOnCompletion()函数在前一篇解析 Deferred.await() 中有提到,关键是将 handler 节点添加到父协程 state.list 的末尾。

2.1 父协程手动调用cancel()或者异常结束,会立即取消它的所有子协程

跟踪父协程的cancel()调用过程,其中关键过程为 cancel() -> cancel(null) -> cancelImpl(null) -> makeCancelling(null) -> tryMakeCancelling(state, causeException) -> notifyCancelling(list, rootCause),下面继续分析notifyCancelling(list, rootCause)的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// JobSupport.kt
private fun notifyCancelling(list: NodeList, cause: Throwable) {
// first cancel our own children
onCancellation(cause)
// 这里会调用所有子协程绑定的 ChildHandleNode.invoke(cause) -> childJob.parentCancelled(parentJob) 来取消所有子协程
notifyHandlers<JobCancellingNode<*>>(list, cause)
// then cancel parent
// cancelParent(cause) 不一定会取消父协程,cancel() 时不会取消父协程,因为此时产生 cause 的是 JobCancellationException,属于 CancellationException
cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
}
public final override fun parentCancelled(parentJob: ParentJob) {
// 父协程取消时,子协程会通过 parentCancelled 来取消自己
cancelImpl(parentJob)
}
private fun cancelParent(cause: Throwable): Boolean {
// CancellationException is considered "normal" and parent is not cancelled when child produces it.
// This allow parent to cancel its children (normally) without being cancelled itself, unless
// child crashes and produce some other exception during its completion.
if (cause is CancellationException) return true
if (!cancelsParent) return false
// 当 cancelsParent 为 true, 且子线程抛出未捕获的异常时,默认情况下 childCancelled() 会取消其父协程。
return parentHandle?.childCancelled(cause) == true
}

2.2 父协程必须等待所有子协程完成(处于完成或者取消状态)才能完成

前一篇文章有提到协程的完成通过AbstractCoroutine.resumeWith(result)实现,调用过程为 makeCompletingOnce(result.toState(), defaultResumeMode) -> tryMakeCompleting(),其中关键源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// JobSupport.kt
private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?, mode: Int): Int {
...
// now wait for children
val child = firstChild(state)
// 等待子协程完成
if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
return COMPLETING_WAITING_CHILDREN
// otherwise -- we have not children left (all were already cancelled?)
if (tryFinalizeFinishingState(finishing, proposedUpdate, mode))
return COMPLETING_COMPLETED
// otherwise retry
return COMPLETING_RETRY
}
private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean {
// 添加 ChildCompletion 节点到子协程的 state.list 末尾,当子协程完成时会调用 ChildCompletion.invoke()
val handle = child.childJob.invokeOnCompletion(
invokeImmediately = false,
handler = ChildCompletion(this, state, child, proposedUpdate).asHandler
)
if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
// 循环设置所有其他子协程
val nextChild = child.nextChild() ?: return false
return tryWaitForChild(state, nextChild, proposedUpdate)
}

tryWaitForChild()也是通过invokeOnCompletion()添加节点到子协程的 state.list 中,当子协程完成时会调用 ChildCompletion.invoke():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// ChildCompletion class
override fun invoke(cause: Throwable?) {
parent.continueCompleting(state, child, proposedUpdate)
}
private fun continueCompleting(state: Finishing, lastChild: ChildHandleNode, proposedUpdate: Any?) {
require(this.state === state) // consistency check -- it cannot change while we are waiting for children
// figure out if we need to wait for next child
val waitChild = lastChild.nextChild()
// try wait for next child
if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child
// no more children to wait -- try update state
// 当所有子协程都完成时,才会 tryFinalizeFinishingState() 完成自己
if (tryFinalizeFinishingState(state, proposedUpdate, MODE_ATOMIC_DEFAULT)) return
}

2.3 子协程抛出未捕获的异常时,默认情况下会取消其父协程。

子线程抛出未捕获的异常时,后续的处理会如何呢?在前一篇解析中协程的运算在第二层包装 BaseContinuationImpl 中,我们再看一次:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
...
var param = result
while (true) {
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
// 调用 invokeSuspend 方法执行,执行协程的真正运算逻辑
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
// 协程抛出未捕获的异常,会在这里被拦截,然后作为结果完成协程
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// 协程的状态修改在 AbstractCoroutine.resumeWith() 中
completion.resumeWith(outcome)
return
}
}
}
}
}

所以协程有未捕获的异常中,会在第二层包装中的resumeWith()捕获到,然后调用第一层包装 AbstractCoroutine.resumeWith() 来取消当前协程,处理过程为 AbstractCoroutine.resumeWith(Result.failure(exception)) -> JobSupport.makeCompletingOnce(CompletedExceptionally(exception), defaultResumeMode) -> tryMakeCompleting(state, CompletedExceptionally(exception), defaultResumeMode) -> notifyCancelling(list, exception) -> cancelParent(exception),所以出现未捕获的异常时,和手动调用cancel()一样会调用到 notifyCancelling(list, exception) 来取消当前协程,和手动调用cancel()的区别在于 exception 不是 CancellationException。

1
2
3
4
5
6
7
8
9
10
11
12
13
private fun cancelParent(cause: Throwable): Boolean {
// CancellationException is considered "normal" and parent is not cancelled when child produces it.
// This allow parent to cancel its children (normally) without being cancelled itself, unless
// child crashes and produce some other exception during its completion.
if (cause is CancellationException) return true
if (!cancelsParent) return false
// launch 和 async 新建的协程的 cancelsParent 都为 true, 所以子线程抛出未捕获的异常时,默认情况下 childCancelled() 会取消其父协程。
return parentHandle?.childCancelled(cause) == true
}
// 默认情况下 childCancelled() 会取消取消协程
public open fun childCancelled(cause: Throwable): Boolean =
cancelImpl(cause) && handlesException

3. 协程的取消

前面分析协程父子关系中的取消协程时,可以知道协程的取消只是在协程的第一层包装中 AbstractCoroutine 中修改协程的状态,并没有影响到第二层包装中 BaseContinuationImpl 中协程的实际运算逻辑。所以协程的取消只是状态的变化,并不会取消协程的实际运算逻辑,看下面的代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
fun main(args: Array<String>) = runBlocking {
val job1 = launch(Dispatchers.Default) {
repeat(5) {
println("job1 sleep ${it + 1} times")
delay(500)
}
}
delay(700)
job1.cancel()
val job2 = launch(Dispatchers.Default) {
var nextPrintTime = 0L
var i = 1
while (i <= 3) {
val currentTime = System.currentTimeMillis()
if (currentTime >= nextPrintTime) {
println("job2 sleep ${i++} ...")
nextPrintTime = currentTime + 500L
}
}
}
delay(700)
job2.cancel()
}

输出结果如下:

1
2
3
4
5
job1 sleep 1 times
job1 sleep 2 times
job2 sleep 1 ...
job2 sleep 2 ...
job2 sleep 3 ...

上面代码中 job1 取消后,delay()会检测协程是否已取消,所以 job1 之后的运算就结束了;而 job2 取消后,没有检测协程状态的逻辑,都是计算逻辑,所以 job2 的运算逻辑还是会继续运行。

所以为了可以及时取消协程的运算逻辑,可以检测协程的状态,使用isActive来判断,上面示例中可以将while(i <= 3)替换为while(isActive)

3.1 运行不能取消的代码块

当手动取消协程后,像delay()这样的可取消挂起函数会在检测到已取消状态时,抛出 CancellationException 异常,然后退出协程。此时可以使用try { ... } finally { ... }表达式或<T : Closeable?, R> T.use {}函数执行终结动作或关闭资源。

但是如果在finally块中调用自定义的或系统的可取消挂起函数,都会再次抛出 CancellationException 异常。通常我们在finally块中关闭一个文件,取消一个任务或者关闭一个通信通道都是非阻塞,并且不会调用任何挂起函数。当需要挂起一个被取消的协程时,可以将代码包装在withContext(NonCancellable) { ... }中。

3.2 超时取消

实际上大多数时候取消一个协程的理由是因为超时。协程库中已经提供来withTimeout() { ... }挂起函数来实现在超时后自动取消协程。它会在超时后抛出TimeoutCancellationException,它是 CancellationException 的子类,它是协程结束的正常原因,不会打印堆栈跟踪信息,更详细的原因见下一篇解析Kotlin Coroutines(协程) 完全解析(四),协程的异常处理。如果在取消后需要执行一些关闭资源的操作可以使用前面提到的try { ... } finally { ... }表达式。

1
2
3
4
5
6
7
8
9
10
try {
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
} finally {
println("I'm running finally")
}

还有一个withTimeoutOrNull() { ... }挂起函数在超时后返回null,而不是抛出一个异常:

1
2
3
4
5
6
7
8
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done" // 在它运行得到结果之前取消它
}
println("Result is $result")

上面代码运行输出如下,不再抛出异常:

1
2
3
4
I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null

4. 小结

最后总结下本文的内容,封装异步代码为挂起函数其实非常简单,只需要用suspendCoroutine{}suspendCancellableCoroutine{},还要异步逻辑完成用resume()resumeWithException来恢复协程。

新建协程时需要协程间关系,GlobalScope.launch{}GlobalScope.async{}新建的协程是没有父协程的,而在协程中使用launch{}aysnc{}一般都是子协程。对于父子协程需要注意下面三种关系:

  • 父协程手动调用cancel()或者异常结束,会立即取消它的所有子协程。

  • 父协程必须等待所有子协程完成(处于完成或者取消状态)才能完成。

  • 子协程抛出未捕获的异常时,默认情况下会取消其父协程。

对于协程的取消,cancel()只是将协程的状态修改为已取消状态,并不能取消协程的运算逻辑,协程库中很多挂起函数都会检测协程状态,如果想及时取消协程的运算,最好使用isActive判断协程状态。而withContext(NonCancellable) { ... }可以执行不会被取消的代码,withTimeout() { ... }withTimeoutOrNull() { ... }则可以简化超时处理逻辑。

END
Johnny Shieh wechat
我的公众号,不只有技术,还有咖啡和彩蛋!