【Kotlin回顾】19.Kotlin协程—CoroutineScope是如何管理协程的

语言: CN / TW / HK

theme: cyanosis


开启掘金成长之旅!这是我参与「掘金日新计划 · 12 月更文挑战」的第19天,点击查看活动详情

1.CoroutineScope

``` public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }

public fun CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T ): Deferred { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyDeferredCoroutine(newContext, block) else DeferredCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }

fun coroutineScopeTest() { val scope = CoroutineScope(Job()) scope.launch {

}

scope.async {

}

} ```

asynclaunch的扩展接收者都是CoroutineScope,这就意味着他们等价于CoroutineScope的成员方法,如果要调用就必须先获取到CoroutineScope的对象。

``` public interface CoroutineScope {

/**
 * 此作用域的上下文
 * Context被作用域封装,用于实现作为作用域扩展的协程构建器
 * 不建议在普通代码中访问此属性,除非访问[Job]实例以获得高级用法
 */
public val coroutineContext: CoroutineContext

} ```

CoroutineScope是一个接口,这个接口所做的也只是对CoroutineContext做了一层封装而已。CoroutineScope最大的作用就是可以方便的批量的控制协程,例如结构化并发。

2.CoroutineScope与结构化并发

``` fun coroutineScopeTest() { val scope = CoroutineScope(Job()) scope.launch { launch { delay(1000000L) logX("ChildLaunch 1") } logX("Hello 1") delay(1000000L) logX("Launch 1") }

scope.launch {
    launch {
        delay(1000000L)
        logX("ChildLaunch 2")
    }
    logX("Hello 2")
    delay(1000000L)
    logX("Launch 2")
}

Thread.sleep(1000L)
scope.cancel()

}

//输出结果: //================================ //Hello 2 //Thread:DefaultDispatcher-worker-2 //================================ //================================ //Hello 1 //Thread:DefaultDispatcher-worker-1 //================================ ```

上面的代码实现了结构化,只是创建了CoroutineScope(Job())和利用launch启动了几个协程就实现了结构化,结构如图所示,那么它的父子结构是如何建立的?

3.父子关系是如何建立的

这里要先说明一下为什么CoroutineScope是一个接口,可是在创建的时候却可以以构造函数的方式使用。在Kotlin中的命名规则是以【驼峰法】为主的,在特殊情况下是可以打破这个规则的,CoroutineScope就是一个特殊的情况,它是一个顶层函数但它发挥的作用却是构造函数,同样的还有Job(),它也是顶层函数,在Kotlin中当顶层函数被用作构造函数的时候首字母都是大写的。

再来看一下CoroutineScope作为构造函数使用时的源码:

/** * 创建一个[CoroutineScope],包装给定的协程[context]。 * * 如果给定的[context]不包含[Job]元素,则创建一个默认的' Job() '。 * * 这样,任何子协程在这个范围或[取消][协程]失败。就像在[coroutineScope]块中一样, * 作用域本身会取消作用域的所有子作用域。 */ public fun CoroutineScope(context: CoroutineContext): CoroutineScope = ContextScope(if (context[Job] != null) context else context + Job())

构造函数的CoroutineScope传入一个参数,这个参数如果包含Job元素则直接使用,如果不包含Job则会创建一个新的Job,这就说明每一个coroutineScope对象中的 Context中必定会存在一个Job对象。而在创建一个CoroutineScope对象时这个Job()是一定要传入的,因为CoroutineScope就是通过这个Job()对象管理协程的。

public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }

上面的代码是launch的源码,分析一下LazyStandaloneCoroutineStandaloneCoroutine

``` private open class StandaloneCoroutine( parentContext: CoroutineContext, active: Boolean ) : AbstractCoroutine(parentContext, initParentJob = true, active = active) { override fun handleJobException(exception: Throwable): Boolean { handleCoroutineException(context, exception) return true } }

private class LazyStandaloneCoroutine( parentContext: CoroutineContext, block: suspend CoroutineScope.() -> Unit ) : StandaloneCoroutine(parentContext, active = false) { private val continuation = block.createCoroutineUnintercepted(this, this)

override fun onStart() {
    continuation.startCoroutineCancellable(this)
}

} ```

StandaloneCoroutineAbstractCoroutine子类,AbstractCoroutine协程的抽象类, 里面的参数initParentJob = true表示协程创建之后需要初始化协程的父子关系。LazyStandaloneCoroutineStandaloneCoroutine的子类,active=false使命它是以懒加载的方式创建协程。

public abstract class AbstractCoroutine<in T>( parentContext: CoroutineContext, initParentJob: Boolean, active: Boolean ) : JobSupport(active), Job, Continuation<T>, CoroutineScope { init { /** * 在上下文中的父协程和当前协程之间建立父子关系 * 如果父协程已经被取消他可能导致当前协程也被取消 * 如果协程从onCancelled或者onCancelling内部操作其状态, * 那么此时建立父子关系是危险的 */ if (initParentJob) initParentJob(parentContext[Job]) } }

AbstractCoroutine是一个抽象类他继承了JobSupport,而JobSupportJob的具体实现。

init函数中根据initParentJob判断是否建立父子关系,initParentJob的默认值是true因此if中的initParentJob()函数是一定会执行的,这里的parentContext[Job]取出的的Job就是在Demo中传入的Job

initParentJobJobSupport中的方法,因为AbstractCoroutine继承自JobSupport,所以进入JobSupport分析这个方法。

``` public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 { final override val key: CoroutineContext.Key<*> get() = Job

/**
 * 初始化父类的Job
 * 在所有初始化之后最多调用一次
 */
protected fun initParentJob(parent: Job?) {
    assert { parentHandle == null }
    //①
    if (parent == null) {
        parentHandle = NonDisposableHandle
        return
    }
    //②
    parent.start() // 确保父协程已经启动
    @Suppress("DEPRECATION")
    //③
    val handle = parent.attachChild(this)
    parentHandle = handle
    // 检查注册的状态
    if (isCompleted) {
        handle.dispose()
        parentHandle = NonDisposableHandle 
    }
}

} ```

上面的源码initParentJob中添加了三处注释,现在分别对这三处注释进行分析:

  • if (parent == null): 这里是对是否存在父Job的判断,如果不存在则不再进行后面的工作,也就谈不上建立父子关系了。因为在Demo中传递了Job()因此这里的父Job是存在的,所以代码可以继续执行。
  • parent.start(): 这里确保parent对应的Job启动了;
  • parent.attachChild(this): 这里就是将子Job添加到父Job中,使其成为parent的子Job这里其实就是建立了父子关系。

用一句话来概括这个关系就是:每一个协程都有一个Job,每一个Job又有一个父Job和多个子Job,可以看做是一个树状结构。这个关系可以用下面这张图表示:

4.结构化是如何取消的

结构化可以被创建的同时CoroutineScope还提供了可取消的函数,Demo中通过scope.cancel()取消了协程,它的流程又是怎样的呢?先从scope.cancel中的cancel看起

/** * 取消这个scope,包含当前Job和子Job * 如果没有Job,可抛出异常IllegalStateException */ public fun CoroutineScope.cancel(cause: CancellationException? = null) { val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this") job.cancel(cause) }

scope.cancel又是通过job.cancel取消的,这个cancel具体实现是在JobSupport

``` public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 { ...

public override fun cancel(cause: CancellationException?) {
    cancelInternal(cause ?: defaultCancellationException())
}

public open fun cancelInternal(cause: Throwable) {
    cancelImpl(cause)
}

/**
 * 当cancelChild被调用的时候cause是Throwable或者ParentJob
 * 如果异常已经被处理则返回true,否则返回false
 */
internal fun cancelImpl(cause: Any?): Boolean {
    var finalState: Any? = COMPLETING_ALREADY
    if (onCancelComplete) {
        // 确保它正在完成,如果返回状态是 cancelMakeCompleting 说明它已经完成
        finalState = cancelMakeCompleting(cause)
        if (finalState === COMPLETING_WAITING_CHILDREN) return true
    }
    if (finalState === COMPLETING_ALREADY) {
        //转换到取消状态,当完成时调用afterCompletion
        finalState = makeCancelling(cause)
    }
    return when {
        finalState === COMPLETING_ALREADY -> true
        finalState === COMPLETING_WAITING_CHILDREN -> true
        finalState === TOO_LATE_TO_CANCEL -> false
        else -> {
            afterCompletion(finalState)
            true
        }
    }
}

/**
 * 如果没有需要协程体完成的任务返回true并立即进入完成状态等待子类完成
 * 这里代表的是当前Job是否有协程体需要执行
 */
internal open val onCancelComplete: Boolean get() = false

} ```

job.cancel最终调用的是JobSupport中的cancelImpl。这里它分为两种情况,判断依据是onCancelComplete,代表的就是当前Job是否有协程体需要执行,如果没有则返回true。这里的Job是自己创建的且没有需要执行的协程代码因此返回结果是true,所以就执行cancelMakeCompleting表达式。

``` private fun cancelMakeCompleting(cause: Any?): Any? { loopOnState { state -> ... val finalState = tryMakeCompleting(state, proposedUpdate) if (finalState !== COMPLETING_RETRY) return finalState } }

private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? { ... return tryMakeCompletingSlowPath(state, proposedUpdate) }

private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? { //获取状态列表或提升为列表以正确操作子列表 val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY ... notifyRootCause?.let { notifyCancelling(list, it) } ... return finalizeFinishingState(finishing, proposedUpdate) } ```

进入cancelMakeCompleting后经过多次流转最终会调用tryMakeCompletingSlowPath中的notifyCancelling,在这个函数中才是执行子Job和父Job取消的最终流程

private fun notifyCancelling(list: NodeList, cause: Throwable) { //首先取消子Job onCancelling(cause) //通知子Job notifyHandlers<JobCancellingNode>(list, cause) // 之后取消父Job cancelParent(cause) // 试探性取消——如果没有parent也没关系 }

private inline fun <reified T: JobNode> notifyHandlers(list: NodeList, cause: Throwable?) { var exception: Throwable? = null list.forEach<T> { node -> try { node.invoke(cause) } catch (ex: Throwable) { exception?.apply { addSuppressedThrowable(ex) } ?: run { exception = CompletionHandlerException("Exception in completion handler $node for $this", ex) } } } exception?.let { handleOnCompletionException(it) } }

notifyHandlers中的流程就是遍历当前Job的子Job,并将取消的cause传递过去,这里的invoke()最终会调用 ChildHandleNodeinvoke()方法

``` public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 { ...

internal class ChildHandleNode(
    @JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {
    override val parent: Job get() = job
    override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
    override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
}

public final override fun parentCancelled(parentJob: ParentJob) {
    cancelImpl(parentJob)
}

} ```

childJob.parentCancelled(job)的调用最终调用的是JobSupport中的parentCanceled()函数,然后又回到了cancelImpl()中,也就是 Job 取消的入口函数。这实际上就相当于在做递归调用

Job取消完成后接着就是取消父Job了,进入到cancelParent()函数中

``` /* * 取消Job时调用的方法,以便可能将取消传播到父类。 * 如果父协程负责处理异常,则返回' true ',否则返回' false '。 / private fun cancelParent(cause: Throwable): Boolean { // Is scoped coroutine -- don't propagate, will be rethrown if (isScopedCoroutine) return true

/* 
* CancellationException被认为是“正常的”,当子协程产生它时父协程通常不会被取消。
* 这允许父协程取消它的子协程(通常情况下),而本身不会被取消,
* 除非子协程在其完成期间崩溃并产生其他异常。
*/
val isCancellation = cause is CancellationException
val parent = parentHandle

if (parent === null || parent === NonDisposableHandle) {
    return isCancellation
}

// 责任链模式
return parent.childCancelled(cause) || isCancellation

}

/* * 在这个方法中,父类决定是否取消自己(例如在重大故障上)以及是否处理子类的异常。 * 如果异常被处理,则返回' true ',否则返回' false '(调用者负责处理异常) / public open fun childCancelled(cause: Throwable): Boolean { if (cause is CancellationException) return true return cancelImpl(cause) && handlesException } ```

cancelParent的返回结果使用了责任链模式, 如果返回【true】表示父协程处理了异常,返回【false】则表示父协程没有处理异常。

当异常是CancellationException时如果是子协程产生的父协程不会取消,或者说父协程会忽略子协程的取消异常,如果是其他异常父协程就会响应子协程的取消了。

5.总结

  • 每次创建CoroutineScope时都会保证CoroutineContext中一定存在Job元素,而CoroutineScope就是通过Job来管理协程的;
  • 每次通过launchasync启动(创建)协程时都会创建AbstractCoroutine的子类,然后通过initParentJob()函数建立协程的父子关系。每个协程都会对应一个Job,每个Job都会由一个父Job以及多个子Job,这是一个N叉树结构。
  • 因为是一个树结构因此协程的取消以及异常的传播都是按照这个结构进行传递。当取消Job时都会通知自己的父Job和子Job,取消子Job最终是以递归的方式传递给每一个Job。协程在向上取消父Job时通过责任链模式一步一步的传递到最顶层的协程,同时如果子Job产生CancellationException异常时父Job会将其忽略,如果是其他异常父Job则会响应这个异常。
  • 对于CancellationException引起的取消只会向下传递取消子协程;对于其他异常引起的取消既向上传递也向下传递,最终会使所有的协程被取消。