【Kotlin回顧】19.Kotlin協程—CoroutineScope是如何管理協程的

語言: CN / TW / HK

theme: cyanosis


開啟掘金成長之旅!這是我參與「掘金日新計劃 · 12 月更文挑戰」的第19天,點擊查看活動詳情

1.CoroutineScope

``` public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }

public fun CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T ): Deferred { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyDeferredCoroutine(newContext, block) else DeferredCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }

fun coroutineScopeTest() { val scope = CoroutineScope(Job()) scope.launch {

}

scope.async {

}

} ```

asynclaunch的擴展接收者都是CoroutineScope,這就意味着他們等價於CoroutineScope的成員方法,如果要調用就必須先獲取到CoroutineScope的對象。

``` public interface CoroutineScope {

/**
 * 此作用域的上下文
 * Context被作用域封裝,用於實現作為作用域擴展的協程構建器
 * 不建議在普通代碼中訪問此屬性,除非訪問[Job]實例以獲得高級用法
 */
public val coroutineContext: CoroutineContext

} ```

CoroutineScope是一個接口,這個接口所做的也只是對CoroutineContext做了一層封裝而已。CoroutineScope最大的作用就是可以方便的批量的控制協程,例如結構化併發。

2.CoroutineScope與結構化併發

``` fun coroutineScopeTest() { val scope = CoroutineScope(Job()) scope.launch { launch { delay(1000000L) logX("ChildLaunch 1") } logX("Hello 1") delay(1000000L) logX("Launch 1") }

scope.launch {
    launch {
        delay(1000000L)
        logX("ChildLaunch 2")
    }
    logX("Hello 2")
    delay(1000000L)
    logX("Launch 2")
}

Thread.sleep(1000L)
scope.cancel()

}

//輸出結果: //================================ //Hello 2 //Thread:DefaultDispatcher-worker-2 //================================ //================================ //Hello 1 //Thread:DefaultDispatcher-worker-1 //================================ ```

上面的代碼實現了結構化,只是創建了CoroutineScope(Job())和利用launch啟動了幾個協程就實現了結構化,結構如圖所示,那麼它的父子結構是如何建立的?

3.父子關係是如何建立的

這裏要先説明一下為什麼CoroutineScope是一個接口,可是在創建的時候卻可以以構造函數的方式使用。在Kotlin中的命名規則是以【駝峯法】為主的,在特殊情況下是可以打破這個規則的,CoroutineScope就是一個特殊的情況,它是一個頂層函數但它發揮的作用卻是構造函數,同樣的還有Job(),它也是頂層函數,在Kotlin中當頂層函數被用作構造函數的時候首字母都是大寫的。

再來看一下CoroutineScope作為構造函數使用時的源碼:

/** * 創建一個[CoroutineScope],包裝給定的協程[context]。 * * 如果給定的[context]不包含[Job]元素,則創建一個默認的' Job() '。 * * 這樣,任何子協程在這個範圍或[取消][協程]失敗。就像在[coroutineScope]塊中一樣, * 作用域本身會取消作用域的所有子作用域。 */ public fun CoroutineScope(context: CoroutineContext): CoroutineScope = ContextScope(if (context[Job] != null) context else context + Job())

構造函數的CoroutineScope傳入一個參數,這個參數如果包含Job元素則直接使用,如果不包含Job則會創建一個新的Job,這就説明每一個coroutineScope對象中的 Context中必定會存在一個Job對象。而在創建一個CoroutineScope對象時這個Job()是一定要傳入的,因為CoroutineScope就是通過這個Job()對象管理協程的。

public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }

上面的代碼是launch的源碼,分析一下LazyStandaloneCoroutineStandaloneCoroutine

``` private open class StandaloneCoroutine( parentContext: CoroutineContext, active: Boolean ) : AbstractCoroutine(parentContext, initParentJob = true, active = active) { override fun handleJobException(exception: Throwable): Boolean { handleCoroutineException(context, exception) return true } }

private class LazyStandaloneCoroutine( parentContext: CoroutineContext, block: suspend CoroutineScope.() -> Unit ) : StandaloneCoroutine(parentContext, active = false) { private val continuation = block.createCoroutineUnintercepted(this, this)

override fun onStart() {
    continuation.startCoroutineCancellable(this)
}

} ```

StandaloneCoroutineAbstractCoroutine子類,AbstractCoroutine協程的抽象類, 裏面的參數initParentJob = true表示協程創建之後需要初始化協程的父子關係。LazyStandaloneCoroutineStandaloneCoroutine的子類,active=false使命它是以懶加載的方式創建協程。

public abstract class AbstractCoroutine<in T>( parentContext: CoroutineContext, initParentJob: Boolean, active: Boolean ) : JobSupport(active), Job, Continuation<T>, CoroutineScope { init { /** * 在上下文中的父協程和當前協程之間建立父子關係 * 如果父協程已經被取消他可能導致當前協程也被取消 * 如果協程從onCancelled或者onCancelling內部操作其狀態, * 那麼此時建立父子關係是危險的 */ if (initParentJob) initParentJob(parentContext[Job]) } }

AbstractCoroutine是一個抽象類他繼承了JobSupport,而JobSupportJob的具體實現。

init函數中根據initParentJob判斷是否建立父子關係,initParentJob的默認值是true因此if中的initParentJob()函數是一定會執行的,這裏的parentContext[Job]取出的的Job就是在Demo中傳入的Job

initParentJobJobSupport中的方法,因為AbstractCoroutine繼承自JobSupport,所以進入JobSupport分析這個方法。

``` public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 { final override val key: CoroutineContext.Key<*> get() = Job

/**
 * 初始化父類的Job
 * 在所有初始化之後最多調用一次
 */
protected fun initParentJob(parent: Job?) {
    assert { parentHandle == null }
    //①
    if (parent == null) {
        parentHandle = NonDisposableHandle
        return
    }
    //②
    parent.start() // 確保父協程已經啟動
    @Suppress("DEPRECATION")
    //③
    val handle = parent.attachChild(this)
    parentHandle = handle
    // 檢查註冊的狀態
    if (isCompleted) {
        handle.dispose()
        parentHandle = NonDisposableHandle 
    }
}

} ```

上面的源碼initParentJob中添加了三處註釋,現在分別對這三處註釋進行分析:

  • if (parent == null): 這裏是對是否存在父Job的判斷,如果不存在則不再進行後面的工作,也就談不上建立父子關係了。因為在Demo中傳遞了Job()因此這裏的父Job是存在的,所以代碼可以繼續執行。
  • parent.start(): 這裏確保parent對應的Job啟動了;
  • parent.attachChild(this): 這裏就是將子Job添加到父Job中,使其成為parent的子Job這裏其實就是建立了父子關係。

用一句話來概括這個關係就是:每一個協程都有一個Job,每一個Job又有一個父Job和多個子Job,可以看做是一個樹狀結構。這個關係可以用下面這張圖表示:

4.結構化是如何取消的

結構化可以被創建的同時CoroutineScope還提供了可取消的函數,Demo中通過scope.cancel()取消了協程,它的流程又是怎樣的呢?先從scope.cancel中的cancel看起

/** * 取消這個scope,包含當前Job和子Job * 如果沒有Job,可拋出異常IllegalStateException */ public fun CoroutineScope.cancel(cause: CancellationException? = null) { val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this") job.cancel(cause) }

scope.cancel又是通過job.cancel取消的,這個cancel具體實現是在JobSupport

``` public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 { ...

public override fun cancel(cause: CancellationException?) {
    cancelInternal(cause ?: defaultCancellationException())
}

public open fun cancelInternal(cause: Throwable) {
    cancelImpl(cause)
}

/**
 * 當cancelChild被調用的時候cause是Throwable或者ParentJob
 * 如果異常已經被處理則返回true,否則返回false
 */
internal fun cancelImpl(cause: Any?): Boolean {
    var finalState: Any? = COMPLETING_ALREADY
    if (onCancelComplete) {
        // 確保它正在完成,如果返回狀態是 cancelMakeCompleting 説明它已經完成
        finalState = cancelMakeCompleting(cause)
        if (finalState === COMPLETING_WAITING_CHILDREN) return true
    }
    if (finalState === COMPLETING_ALREADY) {
        //轉換到取消狀態,當完成時調用afterCompletion
        finalState = makeCancelling(cause)
    }
    return when {
        finalState === COMPLETING_ALREADY -> true
        finalState === COMPLETING_WAITING_CHILDREN -> true
        finalState === TOO_LATE_TO_CANCEL -> false
        else -> {
            afterCompletion(finalState)
            true
        }
    }
}

/**
 * 如果沒有需要協程體完成的任務返回true並立即進入完成狀態等待子類完成
 * 這裏代表的是當前Job是否有協程體需要執行
 */
internal open val onCancelComplete: Boolean get() = false

} ```

job.cancel最終調用的是JobSupport中的cancelImpl。這裏它分為兩種情況,判斷依據是onCancelComplete,代表的就是當前Job是否有協程體需要執行,如果沒有則返回true。這裏的Job是自己創建的且沒有需要執行的協程代碼因此返回結果是true,所以就執行cancelMakeCompleting表達式。

``` private fun cancelMakeCompleting(cause: Any?): Any? { loopOnState { state -> ... val finalState = tryMakeCompleting(state, proposedUpdate) if (finalState !== COMPLETING_RETRY) return finalState } }

private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? { ... return tryMakeCompletingSlowPath(state, proposedUpdate) }

private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? { //獲取狀態列表或提升為列表以正確操作子列表 val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY ... notifyRootCause?.let { notifyCancelling(list, it) } ... return finalizeFinishingState(finishing, proposedUpdate) } ```

進入cancelMakeCompleting後經過多次流轉最終會調用tryMakeCompletingSlowPath中的notifyCancelling,在這個函數中才是執行子Job和父Job取消的最終流程

private fun notifyCancelling(list: NodeList, cause: Throwable) { //首先取消子Job onCancelling(cause) //通知子Job notifyHandlers<JobCancellingNode>(list, cause) // 之後取消父Job cancelParent(cause) // 試探性取消——如果沒有parent也沒關係 }

private inline fun <reified T: JobNode> notifyHandlers(list: NodeList, cause: Throwable?) { var exception: Throwable? = null list.forEach<T> { node -> try { node.invoke(cause) } catch (ex: Throwable) { exception?.apply { addSuppressedThrowable(ex) } ?: run { exception = CompletionHandlerException("Exception in completion handler $node for $this", ex) } } } exception?.let { handleOnCompletionException(it) } }

notifyHandlers中的流程就是遍歷當前Job的子Job,並將取消的cause傳遞過去,這裏的invoke()最終會調用 ChildHandleNodeinvoke()方法

``` public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 { ...

internal class ChildHandleNode(
    @JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {
    override val parent: Job get() = job
    override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
    override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
}

public final override fun parentCancelled(parentJob: ParentJob) {
    cancelImpl(parentJob)
}

} ```

childJob.parentCancelled(job)的調用最終調用的是JobSupport中的parentCanceled()函數,然後又回到了cancelImpl()中,也就是 Job 取消的入口函數。這實際上就相當於在做遞歸調用

Job取消完成後接着就是取消父Job了,進入到cancelParent()函數中

``` /* * 取消Job時調用的方法,以便可能將取消傳播到父類。 * 如果父協程負責處理異常,則返回' true ',否則返回' false '。 / private fun cancelParent(cause: Throwable): Boolean { // Is scoped coroutine -- don't propagate, will be rethrown if (isScopedCoroutine) return true

/* 
* CancellationException被認為是“正常的”,當子協程產生它時父協程通常不會被取消。
* 這允許父協程取消它的子協程(通常情況下),而本身不會被取消,
* 除非子協程在其完成期間崩潰併產生其他異常。
*/
val isCancellation = cause is CancellationException
val parent = parentHandle

if (parent === null || parent === NonDisposableHandle) {
    return isCancellation
}

// 責任鏈模式
return parent.childCancelled(cause) || isCancellation

}

/* * 在這個方法中,父類決定是否取消自己(例如在重大故障上)以及是否處理子類的異常。 * 如果異常被處理,則返回' true ',否則返回' false '(調用者負責處理異常) / public open fun childCancelled(cause: Throwable): Boolean { if (cause is CancellationException) return true return cancelImpl(cause) && handlesException } ```

cancelParent的返回結果使用了責任鏈模式, 如果返回【true】表示父協程處理了異常,返回【false】則表示父協程沒有處理異常。

當異常是CancellationException時如果是子協程產生的父協程不會取消,或者説父協程會忽略子協程的取消異常,如果是其他異常父協程就會響應子協程的取消了。

5.總結

  • 每次創建CoroutineScope時都會保證CoroutineContext中一定存在Job元素,而CoroutineScope就是通過Job來管理協程的;
  • 每次通過launchasync啟動(創建)協程時都會創建AbstractCoroutine的子類,然後通過initParentJob()函數建立協程的父子關係。每個協程都會對應一個Job,每個Job都會由一個父Job以及多個子Job,這是一個N叉樹結構。
  • 因為是一個樹結構因此協程的取消以及異常的傳播都是按照這個結構進行傳遞。當取消Job時都會通知自己的父Job和子Job,取消子Job最終是以遞歸的方式傳遞給每一個Job。協程在向上取消父Job時通過責任鏈模式一步一步的傳遞到最頂層的協程,同時如果子Job產生CancellationException異常時父Job會將其忽略,如果是其他異常父Job則會響應這個異常。
  • 對於CancellationException引起的取消只會向下傳遞取消子協程;對於其他異常引起的取消既向上傳遞也向下傳遞,最終會使所有的協程被取消。