Android協程(Coroutines)系列-深入理解suspend(掛起函式)關鍵字

語言: CN / TW / HK

highlight: a11y-dark theme: smartblue


小知識,大挑戰!本文正在參與“   程式設計師必備小知識   ”創作活動

本文同時參與 「掘力星計劃」   ,贏取創作大禮包,挑戰創作激勵金

Kotlin 協程把suspend 修飾符引入到了我們 Android 開發者的日常開發中。您是否好奇它的底層工作原理呢?編譯器是如何轉換我們的程式碼,使其能夠掛起和恢復協程操作的呢?

📚 如果您是 Android 平臺上協程的初學者,請查閱上一篇文章: Android協程(Coroutines)系列-入門

# suspend

掛起函式,是指把協程程式碼掛起不繼續執行的函式,也叫協程被函式掛起了。協程中呼叫掛起函式時,協程所在的執行緒不會掛起也不會阻塞,但是協程被掛起了。也就是說,協程內掛起函式之後的程式碼停止執行了,直到掛起函式完成後恢復協程,協程才繼續執行後續的程式碼。所有掛起函式都會通過suspend修飾符修飾。

suspend是協程的關鍵字,每一個被suspend修飾的方法都必須在另一個suspend函式或者Coroutine協程程式中進行呼叫。

掛起函式(由suspend關鍵字修飾)的目的是用來掛起協程的執行等待非同步計算的結果,所以一個掛起函式通常有兩個要點:掛起非同步

這裡涉及到一種機制俗稱CPS(Continuation-Passing-Style:續體傳遞風格)。每一個suspend修飾的方法或者lambda表示式都會在程式碼呼叫的時候為其額外新增Continuation(續體)型別的引數。

Kotlin協程中使用了狀態機,編譯器會將協程體編譯成一個匿名內部類,每一個掛起函式的呼叫位置對應一個掛起點。

| 掛起函式 | 意義 | 解釋 | | -------- | ------- | ----------------- | | join | 掛起當前協程,直到等待的子協程執行完畢 | 通過當前協程返回的Job介面的join方法,可以單純的掛起當前協程,等待子協程完成後再恢復繼續執行 | | await | 掛起當前協程,直到等待的子協程返回結果 | 和join的區別是,它屬於Job介面的子介面Deferred的方法,可以等待子協程完成後,帶著返回值恢復當前協程 | | delay | 掛起當前協程,直到指定時間後恢復當前協程 | 單純掛起當前協程,指定時長後恢復協程執行 | | withContext() | 掛起外部協程,直到自己內部協程全部返回後,才會恢復外部的協程。 | 沒有建立新的協程,在指定協程上執行掛起程式碼塊,並掛起該協程直至程式碼塊執行完成並返回結果。類似async.await的效果

協程掛起流程詳解

協程實現非同步的核心原理就是通過掛起函式實現協程體的掛起,還不阻塞協程體所在的執行緒。 ```kotlin fun testInMain() { Log.d("["+Thread.currentThread().name+"]testInMain start") var job = CoroutineScope(Dispatchers.Main).launch { //啟動協程job Log.d("[" + Thread.currentThread().name+"]job start") var job1 = async(Dispatchers.IO) { //啟動協程job1 Log.d("["+Thread.currentThread().name+"]job1 start") delay(3000) //掛起job1協程 3秒 Log.d("["+Thread.currentThread().name+"]job1 end ") "job1-Return" } //job1協程 續體執行完畢

    var job2 = async(Dispatchers.Default) {
        Log.d("["+Thread.currentThread().name+"]job2 start" )
        delay(1000) //掛起job2協程 1秒
        Log.d("["+Thread.currentThread().name+"]job2 end")
        "job2-Return"
    } //job2協程 續體執行完畢

    Log.d("["+Thread.currentThread().name+"]before job1 return")
    Log.d("["+Thread.currentThread().name+"]job1 result = " + job1.await()) //掛起job協程,等待job1返回結果;如果已有結果,不掛起,直接返回

    Log.d("["+Thread.currentThread().name+"]before job2 return")
    Log.d("["+Thread.currentThread().name+"]job2 result = " + job2.await()) //掛起job協程,等待job2返回結果;如果已有結果,不掛起,直接返回

    Log.d("["+Thread.currentThread().name+"]job end ")
} //job協程 續體執行完畢

Log.d("["+Thread.currentThread().name+"]testInMain end")

} //testInMain 示例程式碼的log輸出如下,我們需要重點關注Log輸出的次序,和時間間隔:kotlin 10:15:04.046 26079-26079/com.example.myapplication D/TC: [main]testInMain start 10:15:04.067 26079-26079/com.example.myapplication D/TC: [main]testInMain end 10:15:04.080 26079-26079/com.example.myapplication D/TC: [main]job start 10:15:04.083 26079-26079/com.example.myapplication D/TC: [main]before job1 return 10:15:04.086 26079-26683/com.example.myapplication D/TC: [DefaultDispatcher-worker-1]job1 start 10:15:04.087 26079-26684/com.example.myapplication D/TC: [DefaultDispatcher-worker-2]job2 start 10:15:05.090 26079-26683/com.example.myapplication D/TC: [DefaultDispatcher-worker-1]job2 end 10:15:05.095 26079-26079/com.example.myapplication D/TC: [main]button-2 onclick now 10:15:07.090 26079-26685/com.example.myapplication D/TC: [DefaultDispatcher-worker-3]job1 end 10:15:07.091 26079-26079/com.example.myapplication D/TC: [main]job1 result = job1-Return 10:15:07.091 26079-26079/com.example.myapplication D/TC: [main]before job2 return 10:15:07.091 26079-26079/com.example.myapplication D/TC: [main]job2 result = job2-Return 10:15:07.091 26079-26079/com.example.myapplication D/TC: [main]job end ```

  • 步驟一:在主執行緒呼叫TestInMain,直接列印“[main]testInMain start”的log
  • 步驟二:TestInMain方法繼續執行完畢,列印“[main]testInMain end”的log
  • 步驟三:job協程被主執行緒排程執行,列印“[main]job start”的log
  • 步驟四:job協程繼續執行,列印“[main]before job1 return”的log
  • 步驟五:job協程被job1.await掛起函式中斷執行,退出main執行緒,等待job1返回結果後再恢復執行
  • 步驟六:job1協程被非同步排程到work-1子執行緒執行,列印“[DefaultDispatcher-worker-1]job1 start”的log,接著被delay掛起函式中斷執行,退出work-1子執行緒,等待delay 3秒結束後再恢復執行
  • 步驟七:job2協程被非同步排程到work-2子執行緒執行,列印“[DefaultDispatcher-worker-2]job2 start”的log,接著被delay掛起函式中斷執行,退出work-2子執行緒,等待delay 1秒結束後再恢復執行
  • 步驟八:1秒鐘後(從04秒-05秒),job2協程被delay掛起函式非同步排程到[DefaultDispatcher-worker-1]子執行緒恢復執行,列印“[DefaultDispatcher-worker-1]job2 end”的log,job2續體結束執行,同時將結果儲存到job2協程的result欄位中。
  • 步驟九:main執行緒中button-2點選事件被處理,列印“[main]button-2 onclick now”的log
  • 步驟十:3秒鐘後(從04秒-07秒),job1協程被delay掛起函式非同步排程到[DefaultDispatcher-worker-3]子執行緒恢復執行,列印“[DefaultDispatcher-worker-3]job1 end”的log,job1續體結束執行,同時將結果儲存到job1協程的result欄位中。
  • 步驟十一:job1.await掛起函式得到結果,job協程被await掛起函式非同步排程到main執行緒恢復執行,列印“[main]job1 result = job1-Return”的log
  • 步驟十二:job協程繼續執行,列印“[main]before job2 return”的log
  • 步驟十三:job協程繼續呼叫job2.await掛起函式,此時job2協程已經有result結果,所有它不會中斷job協程的執行,而是直接返回結果,列印“[main]job2 result = job2-Return”的log
  • 步驟十四:job協程繼續執行,列印“[main]job end”的log,job續體結束執行。

微信圖片_20211025132142.jpg 從圖中,我們可以清晰的得到幾點結論:

  1. job協程內部,通過await 阻塞了後續程式碼的執行。job1和job2協程,通過delay阻塞了後續程式碼的執行。
  2. 協程job1,job2 啟動後,保持並行執行。job2 並沒有等待job1執行完才啟動執行和恢復,而是在各自執行緒並行執行。
  3. job的後續程式碼被await 阻塞後,並沒有阻塞main執行緒,main執行緒中其它模組的程式碼能同時被執行,並打印出"[main]button 2 onclick now"。
  4. job1 被delay阻塞後續程式碼執行時,並沒有阻塞所線上程[DefaultDispatcher-worker-1],job2中的後續程式碼被恢復到此[DefaultDispatcher-worker-1]子執行緒中執行。
  5. job1 和 job2 協程在恢復執行時,並不能確保在原執行緒中執行後續程式碼。如log所示,job2在DefaultDispatcher-worker-2中啟動和阻塞後,卻在DefaultDispatcher-worker-1中恢復了後續的程式碼執行。

所以可以看出,協程的掛起,並不會阻塞協程所在的執行緒,而只是中斷了協程後面的程式碼執行。然後等待掛起函式完成後,恢復協程的後續程式碼執行。這就是協程掛起最最基本的關鍵點。

協程掛起的實現原理

上節中的示例程式碼,經過反編譯後的核心程式碼如下: ```kotlin //TestCoroutin.decompiled.java public final void testInMain() { Log.d("cjf---", var10001.append("testInMain start").toString());

Job job = BuildersKt.launch$default( CoroutineScopeKt.CoroutineScope((CoroutineContext)Dispatchers.getMain()),  (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
    @Nullable
    public final Object invokeSuspend(@NotNull Object $result) {
    //單獨拆分到下面,需要詳細講解
}

    public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {/......./}

    public final Object invoke(Object var1, Object var2){/......./}

}), 3, (Object)null);

Log.d("cjf---", var10001.append("testInMain end ").toString());

}

//job協程的SuspendLambda續體,其invokeSuspend方法程式碼 public final Object invokeSuspend(@NotNull Object $result) { ... ... label17: { Object var8 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch(this.label) { case 0: Log.d(var10001.append("job start ").toString()); Deferred job1 = BuildersKt.async$default(/......./); job2 = BuildersKt.async$defaultdefault(/......./); Log.d(var10001.append("before job1 return").toString()); var6 = var10001.append("job1 result ="); this.L$0 = job2; this.L$1 = var5; this.L$2 = var6; this.label = 1; var10000 = job1.await(this); if (var10000 == var8) { return var8; } break; case 1: var6 = (StringBuilder)this.L$2; var5 = (String)this.L$1; job2 = (Deferred)this.L$0; ResultKt.throwOnFailure($result); var10000 = $result; break; case 2: var6 = (StringBuilder)this.L$1; var5 = (String)this.L$0; ResultKt.throwOnFailure($result); var10000 = $result; break label17; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); }

    var7 = var10000;
    Log.d(var5, var6.append((String)var7).toString());
    Log.d(var10001.append("before wait job2 return").toString());
    var6 = var10001.append("job2 result = ");
    this.L$0 = var5;
    this.L$1 = var6;
    this.L$2 = null;
    this.label = 2;
    var10000 = job2.await(this);
    if (var10000 == var8) {
        return var8;
    }
} //end of label17

Log.d(var5, var6.append((String)var7).toString());
Log.d("cjf---", var10001.append("job end ").toString());
return Unit.INSTANCE;

} //end of invokeSuspend ``` 反編譯後的主要區別在job協程,其Lambda程式碼塊轉換成了Function2 實現。

我們藉助APK反編譯工具,可以看到執行程式碼中,Function2 實際上被SuspendLambda 類繼承實現。

微信圖片_20211025132929.jpg

SuspendLambda實現類的關鍵邏輯在invokeSuspend方法中,而invokeSuspend方法中採用了CPS(Continuation-Passing-Style) 續體傳遞風格

續體傳遞風格會將job協程的Lambda程式碼塊,通過label標籤和switch分割成多個程式碼塊。程式碼塊分割的點,就是協程中呼叫suspend掛起函式的地方。

分支程式碼呼叫到await掛起函式時,如果返回了COROUTINE_SUSPENDED,就退出invokeSuspend,進入掛起狀態。

我們用流程圖來描述上面示例程式碼,轉換後的續體傳遞風格程式碼,如下:

微信圖片_20211025132944.jpg

我們可以看到,整個示例程式碼,被分割成了5個程式碼塊。其中case1 程式碼塊主要負責為label17 程式碼塊進行引數轉換;case2 程式碼塊主要負責為最外層程式碼塊進行引數轉換;所以相當於2個await掛起函式,將lambda程式碼塊分割成了3個實際執行的程式碼塊。

而且job1.await和job2.await會根據掛起函式的返回值進行不同處理,如果返回掛起,則進行協程掛起,當前協程退出執行;如果返回其它值,則協程繼續後續程式碼塊的執行。

編譯器在編譯期間,會對所有suspend修飾的函式呼叫處進行續體傳遞風格變換, Continuation可以稱之為協程續體,它提供了協程恢復的基本方法:resumeWith。Continuation續體宣告很簡單: ```kotlin public interface Continuation { /* * The context of the coroutine that corresponds to this continuation. / public val context: CoroutineContext

/**
 * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
 * return value of the last suspension point.
 */
public fun resumeWith(result: Result<T>)

} 其具體實現在SuspendLambda的父類BaseContinuationImpl中:kotlin //class BaseContinuationImpl 中 fun resumeWith 內部核心程式碼 while (true) { probeCoroutineResumed(current) with(current) { val completion = completion!! val outcome: Result = //協程返回了結果,說明協程執行完畢 try { val outcome = invokeSuspend(param)//執行協程的續體程式碼塊 if (outcome === COROUTINE_SUSPENDED) return //掛起函式返回掛起標誌,退出後續程式碼執行 Result.success(outcome) //沒有返回掛起標誌,將返回值outcome封裝為Result返給外層outcome } catch (exception: Throwable) { Result.failure(exception)//將異常Result返給外層outcome } releaseIntercepted() // 釋放當前協程的攔截器 if (completion is BaseContinuationImpl) {//如果上一層續體是一個單純的續體,則將結果作為上一層續體的恢復引數,進行上一層續體的恢復 current = completion param = outcome } else {//上一層續體是一個協程,則呼叫協程的恢復函式,進行上一層的協程恢復 completion.resumeWith(outcome) return } } } ``` 如果invokeSuspend函式返回中斷標誌時,會直接從函式中返回,等待後續繼續被恢復執行。

如果invokeSuspend函式返回的是結果,且上一層續體不是單純的續體而是協程體,它會呼叫引數completion的resumeWith函式,恢復上一層協程的invokeSuspend程式碼的執行。

協程被resumeWith恢復後,會繼續呼叫invokeSuspend函式,根據label值執行下一個case分支程式碼塊。按照這個恢復流程,直到所有invokeSuspend程式碼執行完,返回非COROUTINE_SUSPENDED的結果,協程就執行結束。

我們繼續看job續體在invokeSuspend中呼叫到job1.await函式時,await是怎麼實現返回掛起標誌,和後續恢復job協程的。核心程式碼可以在awaitSuspend中檢視:

```kotlin // JobSupport.kt中 awaitSuspend方法 private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont -> val cont = AwaitContinuation(uCont.intercepted(), this) cont.disposeOnCancellation(invokeOnCompletion( ResumeAwaitOnCompletion(this, cont).asHandler)) cont.getResult() }

// JobSupport.kt中 invokeOnCompletion方法 public final override fun invokeOnCompletion(...):DisposableHandle { var nodeCache: JobNode<*>? = null loopOnState { state -> when (state) { is Empty -> { // 沒有completion handlers,直接建立Node放入state val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it } if (_state.compareAndSet(state, node)) return nod } is Incomplete -> {// 有completion handlers,加入到node list列表 val list = state.list val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it } if (!addLastAtomic(state, list, node)) [email protected] / } else -> { // 已經完成,不需要加入結果監聽Node if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause) return NonDisposableHandle } } } }

// AbstractCoroutine.kt 中 resumeWith方法 // 通知state node,進行恢復 public final override fun resumeWith(result: Result) { // makeCompletingOnce 大致實現是修改協程狀態,如果需要的話還會將結果返回給呼叫者協程,並恢復呼叫者協程 makeCompletingOnce(result.toState(), defaultResumeMode) } ``` 可以看出,job1.await()首先會通過getResult()去獲取job1的結果,如果有結果則直接返回結果,否則立即返回中斷標誌,這樣就實現了await掛起點掛起job協程了。await()掛起函式恢復job協程的流程是,將job 協程封裝為 ResumeAwaitOnCompletion,並將其再次封裝成handler 節點,新增job1協程的 state.list。

等job1協程完成後,會通知 handler 節點呼叫job協程的 resumeWith(result) 方法,從而恢復 job協程await 掛起點之後的程式碼塊的執行。

我們再次結合示例程式碼, 來梳理這個掛起和恢復流程:

微信圖片_20211025145009.jpg

note:綠色底色,表示在主執行緒執行;紅色字型,表示呼叫掛起函式;

可以看到整個過程:

  • job協程沒有阻塞呼叫者TestInMain,job協程會被post到主執行緒執行;
  • 子協程job1,job2會同時排程到不同子執行緒中執行,會並行執行;
  • job協程通過job1,和job2 的 await掛起函式等待非同步結果。等待非同步結果的時候,job協程也沒有阻塞主執行緒。

通過續體傳遞風格的invokeSuspend程式碼,和續體之間形成的resumewith恢復鏈,協程得以實現掛起和恢復的核心流程。