帶着問題分析Kotlin協程原理

語言: CN / TW / HK

theme: scrolls-light highlight: androidstudio


我報名參加金石計劃1期挑戰——瓜分10萬獎池,這是我的第1篇文章,點擊查看活動詳情


協程是一個較為複雜的東西,弄清協程的原理也不是簡簡單單的一篇文章就能講的清,這個過程中需要做的就是使用、看源碼、debug、總結、回顧。本節內容主要以弄清以下幾個問題為主:

  • 如何創建一個協程
  • 協程是如何被創建出來的
  • 啟動策略是什麼
  • 啟動策略完成了什麼工作
  • 協程是如何被啟動的
  • 協程啟動過程中的Dispatchers是什麼
  • 協程啟動過程中的Dispatchers做了什麼
  • Worker是什麼
  • Worker中的任務被找到後是如何執行的?
  • 協程創建的時候CoroutineScope是什麼
  • 協程的結構化中父子關係是怎樣建立的
  • 結構化建立後又是怎麼取消的
  • newCoroutineContext(context)做了什麼

1.如何創建一個協程

創建一個協程有三種方式 ``` fun main() { CoroutineScope(Job()).launch(Dispatchers.Default) { delay(1000L) println("Kotlin") }

println("Hello")
Thread.sleep(2000L)

}

//執行結果: //Hello //Kotlin

fun main() = runBlocking { val deferred = CoroutineScope(Job()).async(Dispatchers.Default) { println("Hello") delay(1000L) println("Kotlin") }

deferred.await()

}

//執行結果: //Hello //Kotlin

fun main() { runBlocking(Dispatchers.Default) {
println("launch started!")
delay(1000L)
println("Kotlin")
}

println("Hello")                    
Thread.sleep(2000L)          
println("Process end!")

}

//執行結果: //launch started! //Hello //Kotlin

```

三者區別如下:

  • launch:無法獲取執行結果,返回類型Job,不會阻塞;
  • async:可獲取執行結果,返回類型Deferred,調用await()會阻塞,不調用則不會阻塞但也無法獲取執行結果;
  • runBlocking:可獲取執行結果,阻塞當前線程的執行,多用於Demo、測試,官方推薦只用於連接線程與協程。

launch為例,看一下它的源碼 public fun CoroutineScope.launch( //① context: CoroutineContext = EmptyCoroutineContext, //② start: CoroutineStart = CoroutineStart.DEFAULT, //③ block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine } launch源碼有三個參數分別對其解釋一下:

  • context: 協程的上下文,用於提供協程啟動和運行時需要的信息,默認值是EmptyCoroutineContext,有默認值就可以不傳,但是也可以傳遞Kotlin提供的Dispatchers來指定協程運行在哪一個線程中
  • start: 協程的啟動模式;
  • block: 這個可以理解為協程的函數體,函數類型為suspend CoroutineScope.() -> Unit

通過對參數的説明,上面關於launch的創建也可以這麼寫: ``` fun coroutineTest() { val scope = CoroutineScope(Job())

val block: suspend CoroutineScope.() -> Unit = {
    println("Hello")
    delay(1000L)
    println("Kotlin")
}

scope.launch(block = block)

} 反編譯成Java代碼如下: public final class CoroutineDemoKt { public static final void main() { coroutineTest(); Thread.sleep(2000L); }

// $FF: synthetic method
public static void main(String[] var0) {
    main();
}

public static final void coroutineTest() {
    CoroutineScope scope = CoroutineScopeKt.CoroutineScope((CoroutineContext)JobKt.Job$default((Job)null, 1, (Object)null));

    //Function2 是Kotlin 為 block 變量生成的靜態變量以及方法。
    //實現狀態機的邏輯
    Function2 block = (Function2)(new Function2((Continuation)null) {
        int label;

        @Nullable
        public final Object invokeSuspend(@NotNull Object $result) {
            Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            String var2;
            switch(this.label) {
                case 0:
                    ResultKt.throwOnFailure($result);
                    var2 = "Hello";
                    System.out.println(var2);
                    this.label = 1;
                    if (DelayKt.delay(1000L, this) == var3) {
                            return var3;
                    }
                    break;
                case 1:
                    ResultKt.throwOnFailure($result);
                    break;
                default:
                    throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }

            var2 = "Kotlin";
            System.out.println(var2);
            return Unit.INSTANCE;
        }

        @NotNull
        public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
        }

        public final Object invoke(Object var1, Object var2) {
                return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
        }
    });
    BuildersKt.launch$default(scope, (CoroutineContext)null, (CoroutineStart)null, block, 3, (Object)null);
}

} ```

2.協程是怎麼被創建出來的

還是以launch的源碼為例進行分析 public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) //協程的創建看這裏 coroutine.start(start, coroutine, block) return coroutine } coroutine.start為協程的創建與啟動,這個start進入到了AbstractCoroutine,是一個抽象類,裏面的start方法專門用於啟動協程。 ``` public abstract class AbstractCoroutine( parentContext: CoroutineContext, initParentJob: Boolean, active: Boolean ) : JobSupport(active), Job, Continuation, CoroutineScope { ...

/**
 * 用給定的block和start啟動協程
 * 最多調用一次
 */
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
    start(block, receiver, this)
}

} ``AbstractCoroutine`中的start函數負責啟動協程,同時啟動是根據block和啟動策略決定的,那麼啟動策略是什麼? 以及啟動策略完成了什麼工作?

3.啟動策略是什麼

``` public enum class CoroutineStart {

/* * 根據上下文立即調度協程執行。 / DEFAULT

/* * 延遲啟動協程,只在需要時才啟動。 * 如果協程[Job]在它有機會開始執行之前被取消,那麼它根本不會開始執行 * 而是以一個異常結束。 / LAZY

/* * 以一種不可取消的方式,根據其上下文安排執行的協程; * 類似於[DEFAULT],但是協程在開始執行之前不能取消。 * 協程在掛起點的可取消性取決於掛起函數的具體實現細節,如[DEFAULT]。 / ATOMIC

/* * 立即執行協程,直到它在當前線程中的第一個掛起點; / UNDISPATCHED: } ```

4.啟動策略完成了什麼工作

協程在確定啟動策略之後就會開始執行它的任務,它的任務在invoke()函數中被分為不同的執行方式 /** * 用這個協程的啟動策略啟動相應的block作為協程 */ public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit = when (this) { DEFAULT -> block.startCoroutineCancellable(completion) ATOMIC -> block.startCoroutine(completion) UNDISPATCHED -> block.startCoroutineUndispatched(completion) LAZY -> Unit } 這裏直接對block.startCoroutine(completion)進行分析,block.startCoroutineCancellableblock.startCoroutineUndispatched也只是在startCoroutine的基礎上增加了一些額外的功能,前者表示啟動協程以後可以響應取消,後者表示協程啟動以後不會被分發。 /** * 啟動一個沒有接收器且結果類型為[T]的協程。 * 每次調用這個函數時,它都會創建並啟動一個新的、可掛起計算實例。 * 當協程以一個結果或一個異常完成時,將調用[completion]延續。 */ public fun <T> (suspend () -> T).startCoroutine(completion: Continuation<T) { createCoroutineUnintercepted(completion).intercepted().resume(Unit) }

先來看一下createCoroutineUnintercepted()做了哪些工作

//可以理解為一種聲明 // ↓ public expect fun <T> (suspend () -> T).createCoroutineUnintercepted(completion: Continuation<T> ): Continuation<Unit> expect的意思是期望、期盼,這裏可以理解為一種聲明,期望在具體的平台中實現。

進入到createCoroutineUnintercepted()的源碼中看到並沒有什麼實現,這主要是因為Kotlin是面向多個平台的具體的實現需要在特定平台中才能找到,這裏進入IntrinsicsJvm.kt中分析。 //IntrinsicsJvm#createCoroutineUnintercepted //actual代表的是createCoroutineUnintercepted在JVM平台上的具體實現 // ↓ public actual fun <T> (suspend () -> T).createCoroutineUnintercepted( completion: Continuation<T> ): Continuation<Unit> { val probeCompletion = probeCoroutineCreated(completion) // 難點 // ↓ return if (this is BaseContinuationImpl) //會進入這裏執行 create(probeCompletion) else createCoroutineFromSuspendFunction(probeCompletion) { (this as Function1<Continuation<T>, Any?>).invoke(it) } } 這裏的actual就是在具體品台上的實現。

上面的代碼中有一個難點是【this】 這個this的含義如果只是在反編譯代碼或者源碼中去看很難發現它是什麼,這裏要通過源碼、字節碼、反編譯的Java代碼進行分析,這裏我以截圖進行展示 image.png com/example/coroutines/CoroutineDemoKt$coroutineTest$block$1是block具體的實現類。它繼承自kotlin/coroutines/jvm/internal/SuspendLambda

``` internal abstract class SuspendLambda( public override val arity: Int, completion: Continuation? ) : ContinuationImpl(completion), FunctionBase, SuspendFunction { constructor(arity: Int) : this(arity, null)

public override fun toString(): String =
    if (completion == null)
        Reflection.renderLambdaToString(this) // this is lambda
    else
        super.toString() // this is continuation

} ```

``` internal abstract class ContinuationImpl( completion: Continuation?, private val _context: CoroutineContext? ) : BaseContinuationImpl(completion) {

constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)

} ```

``` internal abstract class BaseContinuationImpl( public val completion: Continuation?) : Continuation, CoroutineStackFrame, Serializable {

public open fun create(completion: Continuation<*>): Continuation<Unit> {
    throw UnsupportedOperationException("create(Continuation) has not been overridden")
}

} `` **SuspendLambda是ContinuationImpl的子類,ContinuationImpl又是BaseContinuationImpl的子類,** 所以可以得到結論if (this is BaseContinuationImpl)的結果為**true,** 然後會進入到create(probeCompletion)`函數中

這個create()函數拋出了一個異常,意思就是方法沒有被重寫,潛台詞就是create()這個方法是要被重寫的,如果不重寫就會拋出異常。 那麼create()方法又是在哪裏重寫的呢。答案就在反編譯後的Java代碼中的create方法 //這段代碼來自launch創建的反編譯後的Java代碼 //create函數被重寫 public final Continuation create(@Nullable Object value, @NotNull Continuation completion) { Intrinsics.checkNotNullParameter(completion, "completion"); Function2 var3 = new <anonymous constructor>(completion); return var3; } 這行代碼,其實就對應着協程被創建的時刻。

分析完了startContinue()再來分析一下startCoroutineCancellable()做了什麼,因為協程默認的啟動策略是CoroutineStart.DEFAULT ``` //Cancellable#startCoroutineCancellable /* * 使用此函數以可取消的方式啟動協程,以便它可以在等待調度時被取消。 / public fun (suspend () -> T).startCoroutineCancellable(completion: Continuation): Unit = runSafely(completion) { createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit)) }

//Continuation#startCoroutine public fun (suspend () -> T).startCoroutine(completion: Continuation) { createCoroutineUnintercepted(completion).intercepted().resume(Unit) } `` 通過對比可以發現startCoroutineCancellable()startCoroutine()的內部並沒有太大區別,他們最終都會調用createCoroutineUnintercepted(),只不過前者在最後調用了resumeCancellableWith(),後者調用的是resume()`,這個稍後分析。

5.協程是如何被啟動的

協程的創建分析完成後再來分析一下協程是如何啟動的,再回過頭看一下createCoroutineUnintercepted()之後做了什麼 //Cancellable#startCoroutineCancellable /** * 使用此函數以可取消的方式啟動協程,以便它可以在等待調度時被取消。 */ public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) { // 現在看這裏 // ↓ createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit)) } 進入到intercepted(),它也是需要找到對應平台上的具體實現,這裏還是以JVM平台進行分析 ``` //需要找到對應平台的具體實現 public expect fun Continuation.intercepted(): Continuation

//JVM平台的實現 //IntrinsicsJvm.kt#intercepted / * 使用[ContinuationInterceptor]攔截continuation。 */ public actual fun Continuation.intercepted(): Continuation = (this as? ContinuationImpl)?.intercepted() ?: this 在分析協程的創建過程中已經分析過上面的**this**代表的就是block變量,所以這裏的強轉是成立的,那麼這裏的`intercepted()`調用的就是`ContinuationImpl`對象中的函數 / * 命名掛起函數的狀態機擴展自這個類 */ internal abstract class ContinuationImpl( completion: Continuation?, private val _context: CoroutineContext? ) : BaseContinuationImpl(completion) { constructor(completion: Continuation?) : this(completion, completion?.context)

public override val context: CoroutineContext
    get() = _context!!

@Transient
private var intercepted: Continuation<Any?>? = null

//重點看這裏
public fun intercepted(): Continuation<Any?> =
    intercepted
        ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
            .also { intercepted = it }

} `` 首先intercepted()方法會判斷它的成員變量intercepted是否為空,如果為空則調用context[ContinuationInterceptor]獲取上下文當中的Dispatchers對象,這個Dispatchers`對象又是什麼呢?

6.協程啟動過程中的Dispatchers是什麼

這裏以launch的源碼為主進行分析 ``` fun main() { CoroutineScope(Job()).launch(Dispatchers.Default) { delay(1000L) println("Kotlin") }

println("Hello")
Thread.sleep(2000L)

}

public fun CoroutineScope.launch( // 傳入的Dispatchers.Default表示的就是這個context context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine } `` 傳入的Dispatchers.Default對應的是context參數,從源碼可知這個參數不是必傳的,因為它有默認值EmptyCoroutineContext`,Kotlin官方用它來替代了null,這是Kotlin空安全思維。

傳入Dispatchers.Default之後就是用它替代了EmptyCoroutineContext,那麼這裏的Dispatchers的定義跟CoroutineContext有什麼關係呢?看一下Dispatchers的源碼 ``` /* * 對[CoroutineDispatcher]的各種實現進行分組。 / public actual object Dispatchers {

/**
 * 用於CPU密集型任務的線程池,一般來説它內部的線程個數是與機器 CPU 核心數量保持一致的
 * 不過它有一個最小限制2,
 */
public actual val Default: CoroutineDispatcher = DefaultScheduler

/**
 * 主線程,在Android中才可以使用,主要用於UI的繪製,在普通JVM上無法使用
 */
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

/**
 * 不侷限於任何特定線程,會根據運行時的上下文環境決定
 */
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined

/**
 * 用於執行IO密集型任務的線程池,它的數量會多一些,默認最大線程數量為64個
 * 具體的線程數量可以通過kotlinx.coroutines.io.parallelism配置
 * 它會和Default共享線程,當Default還有其他空閒線程時是可以被IO線程池複用。
 */
public val IO: CoroutineDispatcher = DefaultIoScheduler

} `Dispatchers`是一個單例對象,它裏面的幾個類型都是`CoroutineDispatcher`。 /* * 所有協程調度器實現擴展的基類。 / public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { }

/* * 標記攔截協程延續的協程上下文元素。 / public interface ContinuationInterceptor : CoroutineContext.Element { }

/* * CoroutineContext的一個元素。協程上下文的一個元素本身就是一個單例上下文。 / public interface Element : CoroutineContext { } ``CoroutineDispatcher本身又是CoroutineContext`,從上面的源碼就可以得出他們的關係可以這麼表示: image.png

7.協程啟動過程中的Dispatchers做了什麼

協程的運行是離不開線程的,Dispatchers的作用就是確定協程運行在哪個線程,默認是Default,然後它也可以運行在IO、Main等線程,它負責將任務調度的指定的現呈上,具體的分析後面再寫。

通過前分析在協程中默認的是Default線程池,因此這裏進入的就是Default線程池。 那麼我們回到intercepted()函數繼續進行分析,通過Debug進入到CoroutineDispatcher中的interceptContinuation()函數 ``` public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {

/**
 * 返回一個封裝了提供的[continuation]的continuation,從而攔截所有的恢復。
 * 這個方法通常應該是異常安全的。
 * 從此方法拋出的異常可能會使使用此調度程序的協程處於不一致且難以調試的狀態。
 */
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)

} ``interceptContinuation()返回了一個DispatchedContinuation對象,其中的**this**就是默認的線程池Dispatchers.Default`。

然後通過DispatchedContinuation調用它的resumeCancellableWith()函數,這個函數前面分析過是從哪裏進入的,這裏不再説明。 ``` internal class DispatchedContinuation( @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation ) : DispatchedTask(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation by continuation {

...

public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}

//我們內聯它來保存堆棧上的一個條目,在它顯示的情況下(無限制調度程序)
//它只在Continuation<T>.resumeCancellableWith中使用
inline fun resumeCancellableWith(
    result: Result<T>,
    noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
    val state = result.toState(onCancellation)
    if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
            dispatcher.dispatch(context, this)
        } else {
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled(state)) {
                    resumeUndispatchedWith(result)
                }
            }
        }
    }
...

} `DispatchedContinuation`繼承了`DispatchedTask`: internal abstract class DispatchedTask( @JvmField public var resumeMode: Int ) : SchedulerTask() { }

internal actual typealias SchedulerTask = Task

internal abstract class Task( @JvmField var submissionTime: Long, @JvmField var taskContext: TaskContext ) : Runnable { constructor() : this(0, NonBlockingContext) inline val mode: Int get() = taskContext.taskMode // TASK_XXX } ``DispatchedTask繼承了SchedulerTask,同時SchedulerTask還是Task的別名,Task又實現了Runnable`接口,這意味着它可以被分發到Java的線程中去執行了。

同時可以得出一個結論:DispatchedContinuation是一個Runnable。

DispatchedContinuation還實現了Continuation接口,它還使用了類委託的語法將接口的具體實現交給了它的成員屬性continuation,那麼這裏對上面的結論進行補充:DispatchedContinuation不僅是一個Runnable,還是一個Continuation。

DispatchedContinuation分析完了進入它的resumeCancellableWith()函數分析:

inline fun resumeCancellableWith( result: Result<T>, noinline onCancellation: ((cause: Throwable) -> Unit)? ) { val state = result.toState(onCancellation) //① if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE //② dispatcher.dispatch(context, this) } else { //這裏就是Dispatchers.Unconfined情況,這個時候協程不會被分發到別的線程,只運行在當前線程中。 executeUnconfined(state, MODE_CANCELLABLE) { if (!resumeCancelled(state)) { resumeUndispatchedWith(result) } } } } - 註釋①: dispatcher來自CoroutineDispatcher,isDispatchNeeded就是它的成員函數 ``` public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {

/**
 * 如果協程的執行應該使用[dispatch]方法執行,則返回' true '。
 * 大多數dispatchers的默認行為是返回' true '。
 */
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true

} `isDispatchNeeded()`**默認返回true**,且在大多數情況下都是**true,** 但是也有個例外就是在它的子類中`Dispatchers.Unconfined`會將其重寫成 false。 internal object Unconfined : CoroutineDispatcher() { // 只有Unconfined會重寫成false override fun isDispatchNeeded(context: CoroutineContext): Boolean = false } ``` 因為默認是true,所以接下來會進入註釋②

  • 註釋②: 註釋②調用了CoroutineDispatcher中的dispatch()方法將block代碼塊調度到另一個線程上,這裏的線程池默認值是Dispatchers.Default所以任務被分發到Default線程池,第二個參數是Runnable,這裏傳入的是this,因為DispatchedContinuation間接的實現了Runnable接口。 ``` public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { /**

    • 在給定的[上下文]中,將一個可運行的block調度到另一個線程上。
    • 這個方法應該保證給定的[block]最終會被調用,否則系統可能會達到死鎖狀態並且永遠不會終止。 */ public abstract fun dispatch(context: CoroutineContext, block: Runnable) } 因為默認線程池是`Dispatchers.Default`,所以這裏的`dispatch()`其實調用的是`Dispatchers.Default.dispatch`,這裏的`Dispatchers.Default`的本質是一個單例對象**DefaultScheduler,** 它繼承了`SchedulerCoroutineDispatcher`: //繼承了SchedulerCoroutineDispatcher internal object DefaultScheduler : SchedulerCoroutineDispatcher( CORE_POOL_SIZE, MAX_POOL_SIZE, IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME ) { // 關閉調度程序,僅用於Dispatchers.shutdown() internal fun shutdown() { super.close() }

    //重寫 (Dispatchers.Default as ExecutorCoroutineDispatcher).close() override fun close() { throw UnsupportedOperationException("Dispatchers.Default cannot be closed") }

    override fun toString(): String = "Dispatchers.Default" }

internal open class SchedulerCoroutineDispatcher( private val corePoolSize: Int = CORE_POOL_SIZE, private val maxPoolSize: Int = MAX_POOL_SIZE, private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, private val schedulerName: String = "CoroutineScheduler", ) : ExecutorCoroutineDispatcher() {

private var coroutineScheduler = createScheduler()

override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)

} `SchedulerCoroutineDispatcher`中實際調用`dispatch()`方法的實際是`coroutineScheduler`,所以`dispatcher.dispatch()`實際調用的是**coroutineScheduler.dispatch()** internal class CoroutineScheduler( @JvmField val corePoolSize: Int, @JvmField val maxPoolSize: Int, @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME ) : Executor, Closeable {

//Executor接口中的方法被覆蓋
override fun execute(command: Runnable) = dispatch(command)

fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
    trackTask() 
    //將傳入的 Runnable 類型的 block(也就是 DispatchedContinuation),包裝成 Task。
    val task = createTask(block, taskContext)
    // 拿到當前的任務隊列, 嘗試將任務提交到本地隊列並根據結果進行操作
    //Worker其實是一個內部類,其實就是Java的Thread類
    val currentWorker = currentWorker()
    //將當前的 Task 添加到 Worker 線程的本地隊列,等待執行。
    val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
    if (notAdded != null) {
        if (!addToGlobalQueue(notAdded)) {
            // 全局隊列在關閉/關閉的最後一步關閉——不再接受任何任務
            throw RejectedExecutionException("$schedulerName was terminated")
        }
    }
    val skipUnpark = tailDispatch && currentWorker != null
    // 
    if (task.mode == TASK_NON_BLOCKING) {
        if (skipUnpark) return
        signalCpuWork()
    } else {
        //增加阻塞任務 
        signalBlockingWork(skipUnpark = skipUnpark)
    }
}

} ```

8.Worker是什麼?

``` internal inner class Worker private constructor() : Thread() { init { isDaemon = true //守護線程默認為true }

  private fun runWorker() {
    var rescanned = false
    while (!isTerminated && state != WorkerState.TERMINATED) {
        //在while循環中一直嘗試從隊列中找到任務
        val task = findTask(mayHaveLocalTasks)
        // 找到任務則進行下一步
        if (task != null) {
            rescanned = false
            minDelayUntilStealableTaskNs = 0L
            //執行任務
            executeTask(task)
            continue
        } else {
            mayHaveLocalTasks = false
        }

        if (minDelayUntilStealableTaskNs != 0L) {
            if (!rescanned) {
                rescanned = true
            } else {
                rescanned = false
                tryReleaseCpu(WorkerState.PARKING)
                interrupted()
                LockSupport.parkNanos(minDelayUntilStealableTaskNs)
                minDelayUntilStealableTaskNs = 0L
            }
            continue
        }

        //沒有任務則停止執行,線程可能會關閉
        tryPark()
    }
    tryReleaseCpu(WorkerState.TERMINATED)
}

} ```

9.Worker中的任務被找到後是如何執行的?

``` internal inner class Worker private constructor() : Thread() { private fun executeTask(task: Task) { val taskMode = task.mode //當它找到一個任務時,這個工作者就會調用它 idleReset(taskMode) beforeTask(taskMode) runSafely(task) afterTask(taskMode) }

fun runSafely(task: Task) {
    try {
        task.run()
    } catch (e: Throwable) {
        val thread = Thread.currentThread()
        thread.uncaughtExceptionHandler.uncaughtException(thread, e)
    } finally {
        unTrackTask()
    }
}

}

internal abstract class Task( @JvmField var submissionTime: Long, @JvmField var taskContext: TaskContext ) : Runnable { constructor() : this(0, NonBlockingContext) inline val mode: Int get() = taskContext.taskMode // TASK_XXX } `` 最終進入到runSafely()`函數中,然後調用run方法,前面分析過,將DispatchedContinuation包裝成一個實現了Runnable接口的Task,所以這裏的task.run()本質上就是調用的Runnable.run(),到這裏任務就協程任務就真正的執行了。

那麼也就可以知道這裏的run() 函數其實調用的就是DispatchedContinuation父類DispatchedTask中的run()函數: ``` internal abstract class DispatchedTask( @JvmField public var resumeMode: Int ) : SchedulerTask() {

public final override fun run() {
    assert { resumeMode != MODE_UNINITIALIZED } 
    val taskContext = this.taskContext
    var fatalException: Throwable? = null
    try {
        val delegate = delegate as DispatchedContinuation<T>
        val continuation = delegate.continuation
        withContinuationContext(continuation, delegate.countOrElement) {
            val context = continuation.context
            val state = takeState() // 
            val exception = getExceptionalResult(state)
             // 檢查延續最初是否在異常情況下恢復。
             // 如果是這樣,它將主導取消,否則原始異常將被靜默地丟失。
            val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
            if (job != null && !job.isActive) {
                //①
                val cause = job.getCancellationException()
                cancelCompletedResult(state, cause)
                continuation.resumeWithStackTrace(cause)
            } else {
                if (exception != null) {
                    //②
                    continuation.resumeWithException(exception)
                } else {
                    //③
                    continuation.resume(getSuccessfulResult(state))
                }
            }
        }
    } catch (e: Throwable) {
        // 
        fatalException = e
    } finally {
        val result = runCatching { taskContext.afterTask() }
        handleFatalException(fatalException, result.exceptionOrNull())
    }
}

} `` - **註釋①:** 在代碼執行之前這裏會判斷當前協程是否被取消。如果被取消了就會調用continuation.resumeWithStackTrace(cause)將具體的原因傳出去; - **註釋②:** 判斷協程是否發生了異常如果已經發生異常就調用continuation.resumeWithException(exception)將異常傳遞出去; - **註釋③:** 如果前面運行沒有問題,就進入最後一步continuation.resume(getSuccessfulResult(state)`,此時協程正式被啟動並且執行launch當中傳入的block或者Lambda函數。

這裏其實就是協程與線程產生關聯的地方。

以上就是協程的創建、啟動的流程,但是還有幾個問題沒有弄明白: - 協程創建的時候CoroutineScope是什麼 - 協程的結構化中父子關係是怎樣建立的 - 結構化建立後又是怎麼取消的

接下來對這幾個問題進行解答:

10.協程創建的時候CoroutineScope是什麼

前面對於協程的三種創建方式中的launch、async的創建方式中都有CoroutineScope(Job()),現在先來分析下CoroutineScope做了什麼,先來看下launch和async的源碼。 ``` public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }

public fun CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T ): Deferred { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyDeferredCoroutine(newContext, block) else DeferredCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }

async、launch的擴展接收者都是`CoroutineScope`,這就意味着他們等價於`CoroutineScope`的成員方法,如果要調用就必須先獲取到`CoroutineScope`的對象。 public interface CoroutineScope {

/**
 * 此作用域的上下文
 * Context被作用域封裝,用於實現作為作用域擴展的協程構建器
 * 不建議在普通代碼中訪問此屬性,除非訪問[Job]實例以獲得高級用法
 */
public val coroutineContext: CoroutineContext

} ``CoroutineScope是一個接口,這個接口所做的也只是對CoroutineContext做了一層封裝而已。CoroutineScope`最大的作用就是可以方便的批量的控制協程,例如結構化併發。

11.CoroutineScope與結構化併發

``` fun coroutineScopeTest() { val scope = CoroutineScope(Job()) scope.launch { launch { delay(1000000L) logX("ChildLaunch 1") } logX("Hello 1") delay(1000000L) logX("Launch 1") }

scope.launch {
    launch {
        delay(1000000L)
        logX("ChildLaunch 2")
    }
    logX("Hello 2")
    delay(1000000L)
    logX("Launch 2")
}

Thread.sleep(1000L)
scope.cancel()

}

//輸出結果: //================================ //Hello 2 //Thread:DefaultDispatcher-worker-2 //================================ //================================ //Hello 1 //Thread:DefaultDispatcher-worker-1 //================================ `` 上面的代碼實現了結構化,只是創建了CoroutineScope(Job())和利用launch啟動了幾個協程就實現了結構化,結構如圖所示,那麼它的**父子結構是如何建立的?** ![image.png](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/e4d01ee13bda49109aa857d8fe77c391~tplv-k3u1fbpfcp-watermark.image?)CoroutineScope這裏要説明一下為什麼明明是是一個接口,可是在創建的時候卻可以以構造函數的方式使用。在Kotlin中的命名規則是以【駝峯法】為主的,在特殊情況下是可以打破這個規則的,CoroutineScope就是一個特殊的情況,它是一個**頂層函數**但它發揮的作用卻是**構造函數**,同樣的還有Job()`,它也是頂層函數,在Kotlin中當頂層函數被用作構造函數的時候首字母都是大寫的。

12.協程的結構化中父子關係是怎樣建立的

再來看一下CoroutineScope作為構造函數使用時的源碼: /** * 創建一個[CoroutineScope],包裝給定的協程[context]。 * * 如果給定的[context]不包含[Job]元素,則創建一個默認的' Job() '。 * * 這樣,任何子協程在這個範圍或[取消][協程]失敗。就像在[coroutineScope]塊中一樣, * 作用域本身會取消作用域的所有子作用域。 */ public fun CoroutineScope(context: CoroutineContext): CoroutineScope = ContextScope(if (context[Job] != null) context else context + Job()) 構造函數的CoroutineScope傳入一個參數,這個參數如果包含Job元素則直接使用,如果不包含Job則會創建一個新的Job,這就説明每一個coroutineScope對象中的 Context中必定會存在一個Job對象。而在創建一個CoroutineScope對象時這個Job()是一定要傳入的,因為CoroutineScope就是通過這個Job()對象管理協程的。 public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine } 上面的代碼是launch的源碼,分析一下LazyStandaloneCoroutineStandaloneCoroutine。 ``` private open class StandaloneCoroutine( parentContext: CoroutineContext, active: Boolean ) : AbstractCoroutine(parentContext, initParentJob = true, active = active) { override fun handleJobException(exception: Throwable): Boolean { handleCoroutineException(context, exception) return true } }

private class LazyStandaloneCoroutine( parentContext: CoroutineContext, block: suspend CoroutineScope.() -> Unit ) : StandaloneCoroutine(parentContext, active = false) { private val continuation = block.createCoroutineUnintercepted(this, this)

override fun onStart() {
    continuation.startCoroutineCancellable(this)
}

} `StandaloneCoroutine`是`AbstractCoroutine`子類,`AbstractCoroutine`是**協程的抽象類,** 裏面的參數`initParentJob = true`表示協程創建之後需要初始化協程的父子關係。`LazyStandaloneCoroutine`是`StandaloneCoroutine`的子類,`active=false`使命它是以懶加載的方式創建協程。 public abstract class AbstractCoroutine( parentContext: CoroutineContext, initParentJob: Boolean, active: Boolean ) : JobSupport(active), Job, Continuation, CoroutineScope { init { /* * 在上下文中的父協程和當前協程之間建立父子關係 * 如果父協程已經被取消他可能導致當前協程也被取消 * 如果協程從onCancelled或者onCancelling內部操作其狀態, * 那麼此時建立父子關係是危險的 / if (initParentJob) initParentJob(parentContext[Job]) } } ``AbstractCoroutine是一個抽象類他繼承了JobSupport,而JobSupportJob`的具體實現。

init函數中根據initParentJob判斷是否建立父子關係,initParentJob的默認值是true因此if中的initParentJob()函數是一定會執行的,這裏的parentContext[Job]取出的的Job就是在launche創建時傳入的Job

initParentJob()JobSupport中的方法,因為AbstractCoroutine繼承自JobSupport,所以進入JobSupport分析這個方法。 ``` public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 { final override val key: CoroutineContext.Key<*> get() = Job

/**
 * 初始化父類的Job
 * 在所有初始化之後最多調用一次
 */
protected fun initParentJob(parent: Job?) {
    assert { parentHandle == null }
    //①
    if (parent == null) {
        parentHandle = NonDisposableHandle
        return
    }
    //②
    parent.start() // 確保父協程已經啟動
    @Suppress("DEPRECATION")
    //③
    val handle = parent.attachChild(this)
    parentHandle = handle
    // 檢查註冊的狀態
    if (isCompleted) {
        handle.dispose()
        parentHandle = NonDisposableHandle 
    }
}

} `` 上面的源碼initParentJob`中添加了三處註釋,現在分別對這三處註釋進行分析:

  • if (parent == null): 這裏是對是否存在父Job的判斷,如果不存在則不再進行後面的工作,也就談不上建立父子關係了。因為在Demo中傳遞了Job()因此這裏的父Job是存在的,所以代碼可以繼續執行。
  • parent.start(): 這裏確保parent對應的Job啟動了;
  • parent.attachChild(this): 這裏就是將子Job添加到父Job中,使其成為parent的子Job這裏其實就是建立了父子關係。

用一句話來概括這個關係就是:每一個協程都有一個Job,每一個Job又有一個父Job和多個子Job,可以看做是一個樹狀結構。這個關係可以用下面這張圖表示: image.png

13.協程的結構化建立後又是怎麼取消的

結構化可以被創建的同時CoroutineScope還提供了可取消的函數,Demo中通過scope.cancel()取消了協程,它的流程又是怎樣的呢?先從scope.cancel中的cancel看起

/** * 取消這個scope,包含當前Job和子Job * 如果沒有Job,可拋出異常IllegalStateException */ public fun CoroutineScope.cancel(cause: CancellationException? = null) { val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this") job.cancel(cause) }

scope.cancel又是通過job.cancel取消的,這個cancel具體實現是在JobSupport

``` public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 { ...

public override fun cancel(cause: CancellationException?) {
    cancelInternal(cause ?: defaultCancellationException())
}

public open fun cancelInternal(cause: Throwable) {
    cancelImpl(cause)
}

/**
 * 當cancelChild被調用的時候cause是Throwable或者ParentJob
 * 如果異常已經被處理則返回true,否則返回false
 */
internal fun cancelImpl(cause: Any?): Boolean {
    var finalState: Any? = COMPLETING_ALREADY
    if (onCancelComplete) {
        // 確保它正在完成,如果返回狀態是 cancelMakeCompleting 説明它已經完成
        finalState = cancelMakeCompleting(cause)
        if (finalState === COMPLETING_WAITING_CHILDREN) return true
    }
    if (finalState === COMPLETING_ALREADY) {
        //轉換到取消狀態,當完成時調用afterCompletion
        finalState = makeCancelling(cause)
    }
    return when {
        finalState === COMPLETING_ALREADY -> true
        finalState === COMPLETING_WAITING_CHILDREN -> true
        finalState === TOO_LATE_TO_CANCEL -> false
        else -> {
            afterCompletion(finalState)
            true
        }
    }
}

/**
 * 如果沒有需要協程體完成的任務返回true並立即進入完成狀態等待子類完成
 * 這裏代表的是當前Job是否有協程體需要執行
 */
internal open val onCancelComplete: Boolean get() = false

} ```

job.cancel()最終調用的是JobSupport中的cancelImpl()。這裏它分為兩種情況,判斷依據是onCancelComplete,代表的就是當前Job是否有協程體需要執行,如果沒有則返回true。這裏的Job是自己創建的且沒有需要執行的協程代碼因此返回結果是true,所以就執行cancelMakeCompleting()表達式。

``` private fun cancelMakeCompleting(cause: Any?): Any? { loopOnState { state -> ... val finalState = tryMakeCompleting(state, proposedUpdate) if (finalState !== COMPLETING_RETRY) return finalState } }

private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? { ... return tryMakeCompletingSlowPath(state, proposedUpdate) }

private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? { //獲取狀態列表或提升為列表以正確操作子列表 val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY ... notifyRootCause?.let { notifyCancelling(list, it) } ... return finalizeFinishingState(finishing, proposedUpdate) } ```

進入cancelMakeCompleting()後經過多次流轉最終會調用tryMakeCompletingSlowPath()中的notifyCancelling(),在這個函數中才是執行子Job和父Job取消的最終流程

``` private fun notifyCancelling(list: NodeList, cause: Throwable) { //首先取消子Job onCancelling(cause) //通知子Job notifyHandlers(list, cause) // 之後取消父Job cancelParent(cause) // 試探性取消——如果沒有parent也沒關係 }

private inline fun notifyHandlers(list: NodeList, cause: Throwable?) { var exception: Throwable? = null list.forEach { node -> try { node.invoke(cause) } catch (ex: Throwable) { exception?.apply { addSuppressedThrowable(ex) } ?: run { exception = CompletionHandlerException("Exception in completion handler $node for $this", ex) } } } exception?.let { handleOnCompletionException(it) } } ```

notifyHandlers()中的流程就是遍歷當前Job的子Job,並將取消的cause傳遞過去,這裏的invoke()最終會調用 ChildHandleNodeinvoke()方法

``` public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 { ...

internal class ChildHandleNode(
    @JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {
    override val parent: Job get() = job
    override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
    override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
}

public final override fun parentCancelled(parentJob: ParentJob) {
    cancelImpl(parentJob)
}

} ```

childJob.parentCancelled(job)的調用最終調用的是JobSupport中的parentCanceled()函數,然後又回到了cancelImpl()中,也就是 Job 取消的入口函數。這實際上就相當於在做遞歸調用

Job取消完成後接着就是取消父Job了,進入到cancelParent()函數中

``` /* * 取消Job時調用的方法,以便可能將取消傳播到父類。 * 如果父協程負責處理異常,則返回' true ',否則返回' false '。 / private fun cancelParent(cause: Throwable): Boolean { // Is scoped coroutine -- don't propagate, will be rethrown if (isScopedCoroutine) return true

/* 
* CancellationException被認為是“正常的”,當子協程產生它時父協程通常不會被取消。
* 這允許父協程取消它的子協程(通常情況下),而本身不會被取消,
* 除非子協程在其完成期間崩潰併產生其他異常。
*/
val isCancellation = cause is CancellationException
val parent = parentHandle

if (parent === null || parent === NonDisposableHandle) {
    return isCancellation
}

// 責任鏈模式
return parent.childCancelled(cause) || isCancellation

}

/* * 在這個方法中,父類決定是否取消自己(例如在重大故障上)以及是否處理子類的異常。 * 如果異常被處理,則返回' true ',否則返回' false '(調用者負責處理異常) / public open fun childCancelled(cause: Throwable): Boolean { if (cause is CancellationException) return true return cancelImpl(cause) && handlesException } ```

cancelParent的返回結果使用了責任鏈模式, 如果返回【true】表示父協程處理了異常,返回【false】則表示父協程沒有處理異常。

當異常是CancellationException時如果是子協程產生的父協程不會取消,或者説父協程會忽略子協程的取消異常,如果是其他異常父協程就會響應子協程的取消了。

14.newCoroutineContext(context)

launch源碼中第一行代碼做了什麼目前還不得而知,這裏分析一下做了什麼事 public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { // 就是這一行 val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) coroutine.start(start, coroutine, block) return coroutine }

/** * 為新的協程創建上下文。當沒有指定其他調度程序或[ContinuationInterceptor]時, * 它會設置[Dispatchers.Default],並添加對調試工具的可選支持(當打開時)。 */ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext { //① val combined = coroutineContext.foldCopiesForChildCoroutine() + context //② val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined //③ return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) debug + Dispatchers.Default else debug } - 註釋①:這行代碼首先調用了coroutineContext,這是因為newCoroutineContext()CoroutineScope的擴展函數,CoroutineScopeCoroutineContext進行了封裝,所以newCoroutineContext()函數中可直接訪問CoroutineScopecoroutineContextfoldCopiesForChildCoroutine()函數返回子協程要繼承的[CoroutineContext];然後跟傳入的context參數進行合併。這行代碼就是讓子協程可以繼承父協程的上下文元素。 - 註釋②:它的作用是在調試模式下,為我們的協程對象增加唯一的 ID,這個ID就是在調試協程程序時出現的日誌如:Thread:DefaultDispatcher-worker-1 @coroutine#1中的@coroutine#1,其中的1就是這個ID。 - 註釋③:如果合併後的combined沒有指定調度程序就默認使用Dispatcher.Default

通過上面的分析可以得出newCoroutineContext函數確定了默認使用的線程池是Dispatcher.Default那麼這裏為什麼會默認使用Default線程池而不是Main呢? 因為Kotlin並不是只針對Android開發的,它支持多個平台Main線程池僅在UI相關的平台中才會用到,而協程是不能脱離線程運行的,所以這裏默認使用Default線程池。

15.總結:

1. 協程是如何被創建的

協程在確認自己的啟動策略後進入到createCoroutineUnintercepted函數中創建了協程的Continuation實例,Continuation的實現類是ContinuationImpl,它繼承自BaseContinuationImpl,在BaseContinuationImpl中調用了它的create()方法,而這個create()方法需要重寫才可以實現否則會拋出異常,那麼這個create()的重寫就是反編譯後的Java代碼中create()函數。

2. 協程是如何被啟動的

協程通過createCoroutineUnintercepted函數創建後緊接着就會調用它的intercepted()方法,將其封裝成DispatchedContinuation對象,DispatchedContinuationRunnable的子類,DispatchedContinuation會持有CoroutineDispatcher以及前面創建的Continuation對象,DispatchedContinuation調用內部的resumeCancellableWith()方法,然後進入到resumeCancellableWith()中的dispatched.dispatch(),這裏會將協程的Continuation包裝成Task並添加到Worker的本地任務隊列等待執行。而這裏的Worker本質上是Java中的Thread,在這一步協程完成了線程的切換,任務添加到Worker的本地任務隊列後就會通過run()方法啟動任務,這裏調用的是task.run(),這裏的run最終是調用的DispatchedContinuation的父類DispatchedTask中的run()方法,在這個run方法中如果前面沒有異常最終會調用continuation.resume(),然後就開始執行執行協程體中的代碼了也就是反編譯代碼中的invokeSuspend(),這裏開始了協程狀態機流程,這樣協程就被啟動了。

3. CoroutineScope與結構化的關係

CoroutineScope是一個接口,這個接口所做的也只是對CoroutineContext做了一層封裝而已。CoroutineScope最大的作用就是可以方便的批量的控制協程,CoroutineScope在創建它的實例的時候是需要傳入Job()對象的,因為CoroutineScope就是通過這個Job()對象管理協程的。協程的結構化關係也就因此而產生。 協程的結構化關係是一種父子關係,父子關係可以看做是一個N叉樹的結構,用一句話來概括這個關係就是:每一個協程都有一個Job,每一個Job又有一個父Job和多個子Job,可以看做是一個樹狀結構。 父子關係的建立是通過AbstractCoroutine中的initParentJob()進行的,而AbstractCoroutineJobSuppert的子類,建立父子關係的過程就是首先確定是否有父類如果沒有則不建立父子關係,如果有父類則需要確保父Job已經被啟動,然後通過attachChild()函數將子Job添加到父Job中,這樣就完成了父子關係的建立。

4. 父子關係建立後如何取消結構化的運行

因為是一個樹結構因此協程的取消以及異常的傳播都是按照這個結構進行傳遞。當取消Job時都會通知自己的父Job和子Job,取消子Job最終是以遞歸的方式傳遞給每一個Job。協程在向上取消父Job時通過責任鏈模式一步一步的傳遞到最頂層的協程,同時如果子Job產生CancellationException異常時父Job會將其忽略,如果是其他異常父Job則會響應這個異常。對於CancellationException引起的取消只會向下傳遞取消子協程;對於其他異常引起的取消既向上傳遞也向下傳遞,最終會使所有的協程被取消。