Kotlin 協程的取消機制超詳細解讀

語言: CN / TW / HK

在 Java 語言中提供了線程中斷的能力,但並不是所有的線程都可以中斷的,因為 interrupt 方法並不是真正的終止線程,而是將一個標誌位標記為中斷狀態,當運行到下一次中斷標誌位檢查時,才能觸發終止線程。

但無論如何,終止線程是一個糟糕的方案,因為在線程的銷燬和重建,是要消耗系統資源的,造成了不必要的開銷。Kotlin 協程提供了更優雅的取消機制,這也是協程比較核心的功能之一。

協程的狀態

在瞭解取消機制之前我們需要知道一些關於 Job 狀態的內容:

| State | isActive(是否活躍) | isCompleted(是否完成) | isCancelled(是否取消) | | ---------------- | -------------- | ----------------- | ----------------- | | New (可選初始狀態) | false | false | false | | Active (默認初始狀態) | true | false | false | | Completing (短暫態) | true | false | false | | Cancelling (短暫態) | false | false | true | | Cancelled (完成態) | false | true | true | | Completed (完成態) | false | true | false |

可以看出,在完成和取消的過程中,會經過一個短暫的進行中的狀態,然後才變成已完成/已取消。

在這裏只關注一下取消相關的狀態:

  • Cancelling

    拋出異常的 Job 會導致其進入 Cancelling 狀態,也可以使用 cancel 方法來隨時取消 Job 使其立即轉換為 Cancelling 狀態。

  • Cancelled

    當它遞歸取消子項,並等待所有的子項都取消後,該 Job 會進入 Cancelled 狀態。

取消協程的用法

協程在代碼中抽象的類型是 Job , 下面是一個官方的代碼示例,用來展示如何取消協程的執行:

suspend fun main(): Unit = coroutineScope {     val job = launch {         repeat(1000) { i ->             println("job: I'm sleeping $i ...")             delay(500L)         }     }     delay(1300L) // delay a bit     println("main: I'm tired of waiting!")     job.cancel() // cancels the job     job.join() // waits for job's completion      println("main: Now I can quit.") }

它的輸出是:

job: I'm sleeping 0 ... job: I'm sleeping 1 ... job: I'm sleeping 2 ... main: I'm tired of waiting! main: Now I can quit.

一旦 mian 方法中調用了 job.cancel() ,我們就看不到其他協程的任何輸出,因為它已被取消了。

協程取消的有效性

協程代碼必須通過與掛起函數的配合才能被取消。kotlinx.coroutines 中所有掛起函數(帶有 suspend 關鍵字函數)都是可以被取消的。suspend 函數會檢查協程是否需要取消並在取消時拋出 CancellationException

但是,如果協程在運行過程中沒有掛起點,則不能取消協程,如下例所示:

suspend fun main(): Unit = coroutineScope {     val startTime = System.currentTimeMillis()     val job = launch(Dispatchers.Default) {         var nextPrintTime = startTime         var i = 0         while (i < 5) { // computation loop, just wastes CPU             // print a message twice a second             if (System.currentTimeMillis() >= nextPrintTime) {                 println("job: I'm sleeping ${i++} ...")                 nextPrintTime += 500L             }         }     }     delay(1300L) // delay a bit     println("main: I'm tired of waiting!")     job.cancelAndJoin() // cancels the job and waits for its completion     println("main: Now I can quit.") }

在這個 job 中,並沒有執行任何 suspend 函數,所以在執行過程中並沒有對協程是否需要取消進行檢查,自然也就無法觸發取消。

同樣的問題也可以在通過 捕獲 CancellationException 並且不拋出的情況下 觀察到:

suspend fun main(): Unit = coroutineScope {     val job = launch(Dispatchers.Default) {         repeat(5) { i ->             try {                 // print a message twice a second                 println("job: I'm sleeping $i ...")                 delay(500)             } catch (e: Exception) {                 // log the exception                 println(e)             }         }     }     delay(1300L) // delay a bit     println("main: I'm tired of waiting!")     job.cancelAndJoin() // cancels the job and waits for its completion     println("main: Now I can quit.") }

打印結果是:

job: I'm sleeping 0 ... job: I'm sleeping 1 ... job: I'm sleeping 2 ... main: I'm tired of waiting! kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@614acfe9 job: I'm sleeping 3 ... kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@614acfe9 job: I'm sleeping 4 ... kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@614acfe9 main: Now I can quit.

從打印結果來看,循環 5 次全部執行了,好像取消並沒有起到作用。但實際上不是這樣的,為了便於觀察加上時間戳:

1665217217682: job: I'm sleeping 0 ... 1665217218196: job: I'm sleeping 1 ... 1665217218697: job: I'm sleeping 2 ... 1665217218996: main: I'm tired of waiting! 1665217219000: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@3a1efc0d 1665217219000: job: I'm sleeping 3 ... 1665217219000: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@3a1efc0d 1665217219000: job: I'm sleeping 4 ... 1665217219000: kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@3a1efc0d 1665217219001: main: Now I can quit.

加上時間可以看出,拋出第一次異常後的兩次循環和異常捕獲都是在同一瞬間完成的。這説明了捕獲到異常後,仍然會執行代碼,但是所有的 delay 方法都沒有生效,即該 Job 的所有子 Job 都失效了。但該 Job 仍在繼續循環打印。原因是,父 Job 會等所有子 Job 處理結束後才能完成取消。

而如果我們不使用 try-catch 呢?

suspend fun main(): Unit = coroutineScope {     val job = launch(Dispatchers.Default) {         repeat(5) { i ->             // print a message twice a second             println("job: I'm sleeping $i ...")             delay(500)         }     }     delay(1300L) // delay a bit     println("main: I'm tired of waiting!")     job.cancelAndJoin() // cancels the job and waits for its completion     println("main: Now I can quit.") }

打印結果:

job: I'm sleeping 0 ... job: I'm sleeping 1 ... job: I'm sleeping 2 ... main: I'm tired of waiting! main: Now I can quit.

很順利的取消了,這是因為協程拋出 Exception 直接終止了。

注意協程拋出 CancellationException 並不會導致 App Crash 。

使用 try-catch 來捕獲 CancellationException 時需要注意,在掛起函數前的代碼邏輯仍會多次執行,從而導致這部分代碼彷彿沒有被取消一樣。

如何寫出可以取消的代碼

有兩種方法可以使代碼是可取消的。第一種方法是定期調用掛起函數,檢查是否取消,就是上面的例子中的方法;另一個是顯式檢查取消狀態:

suspend fun main(): Unit = coroutineScope {     val startTime = System.currentTimeMillis()     val job = launch(Dispatchers.Default) {         var nextPrintTime = startTime         var i = 0         while (isActive) { // cancellable computation loop             // print a message twice a second             if (System.currentTimeMillis() >= nextPrintTime) {                 println("job: I'm sleeping ${i++} ...")                 nextPrintTime += 500L             }         }     }     delay(1300L) // delay a bit     println("main: I'm tired of waiting!")     job.cancelAndJoin() // cancels the job and waits for its completion     println("main: Now I can quit.") }

將上面的循環 5 次通過使用 while (isActive) 進行替換,實現顯示檢查取消的代碼。isActive 是通過 CoroutineScope 對象在協程內部可用的擴展屬性。

在 finally 中釋放資源

在前面的例子中我們使用 try-catch 捕獲 CancellationException 發現會產生父協程等待所有子協程完成後才能完成,所以建議不用 try-catch 而是 try{…} finally{…} ,讓父協程在被取消時正常執行終結操作:

val job = launch {     try {         repeat(1000) { i ->             println("job: I'm sleeping $i ...")             delay(500L)         }     } finally {         println("job: I'm running finally")     } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit.")

join 和 cancelAndJoin 都要等待所有終結操作完成,所以上面的例子產生了以下輸出:

job: I'm sleeping 0 ... job: I'm sleeping 1 ... job: I'm sleeping 2 ... main: I'm tired of waiting! job: I'm running finally main: Now I can quit.

使用不可取消的 block

如果在在上面的示例的 finally 代碼塊中使用 suspend 函數,會導致拋出 CancellationException 。

因為運行這些代碼的協程已經被取消了。通常情況下這不會有任何問題,然而,在極少數情況下,如果你需要在 finally 中使用一個掛起函數,你可以通過使用 withContext(NonCancellable) { ... }

val job = launch {     try {         repeat(1000) { i ->             println("job: I'm sleeping $i ...")             delay(500L)         }     } finally {         withContext(NonCancellable) {             println("job: I'm running finally")             delay(1000L)             println("job: And I've just delayed for 1 sec because I'm non-cancellable")         }     } } delay(1300L) // delay a bit println("main: I'm tired of waiting!") job.cancelAndJoin() // cancels the job and waits for its completion println("main: Now I can quit.")

CancellationException

在上面的內容中,我們知道協程的取消是通過拋出 CancellationException 來進行的,神奇的是拋出 Exception 並沒有導致應用程序 Crash 。

CancellationException 的真實實現是 j.u.c. 中的 CancellationException :

public actual typealias CancellationException = java.util.concurrent.CancellationException

如果協程的 Job 被取消,則由可取消的掛起函數拋出 CancellationException 。它表示協程的正常取消。在默認的 CoroutineExceptionHandler 下,它不會打印到控制枱/日誌。

上面引用了這個類的註釋,看來處理拋出異常的邏輯在 CoroutineExceptionHandler 中:

``` public interface CoroutineExceptionHandler : CoroutineContext.Element {     /*      * Key for [CoroutineExceptionHandler] instance in the coroutine context.      /     public companion object Key : CoroutineContext.Key

/*      * Handles uncaught [exception] in the given [context]. It is invoked      * if coroutine has an uncaught exception.      /     public fun handleException(context: CoroutineContext, exception: Throwable) } ```

通常,未捕獲的 Exception 只能由使用協程構建器的根協程產生。所有子協程都將異常的處理委託給他們的父協程,父協程也委託給它自身的父協程,直到委託給根協程處理。所以在子協程中的 CoroutineExceptionHandler 永遠不會被使用。

使用 SupervisorJob 運行的協程不會將異常傳遞給它們的父協程,SupervisorJob 被視為根協程。

使用 async 創建的協程總是捕獲它的所有異常通過結果 Deferred 對象回調出去,因此它不能導致未捕獲的異常。

CoroutineExceptionHandler 用於記錄異常、顯示某種類型的錯誤消息、終止和/或重新啟動應用程序。

如果需要在代碼的特定部分處理異常,建議在協程中的相應代碼周圍使用 try-catch。通過這種方式,您可以阻止異常協程的完成(異常現在被捕獲),重試操作,和/或採取其他任意操作。 這也就是我們前面論證的在協程中使用 try-catch 導致的取消失效。

默認情況下,如果協程沒有配置用於處理異常的 Handler ,未捕獲的異常將按以下方式處理:

  • 如果 exception 是 CancellationException ,那麼它將被忽略(因為這是取消正在運行的協程的假定機制)。

  • 其他情況:

    • 如果上下文中有一個 Job,那麼調用 job.cancel()
    • 否則,通過 ServiceLoader 找到的 CoroutineExceptionHandler 的所有實例並調用當前線程的 Thread.uncaughtExceptionHandler 來處理異常。

超時取消

取消協程執行的最合適的應用場景是它的執行時間超過了規定的最大時間時自動取消任務。在 Kotlin 協程庫中提供了 withTimeout 方法來實現這個功能:

withTimeout(1300L) {     repeat(1000) { i ->         println("I'm sleeping $i ...")         delay(500L)     } }

執行結果:

I'm sleeping 0 ... I'm sleeping 1 ... I'm sleeping 2 ... Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms

TimeoutCancellationException 是 CancellationException 的子類,TimeoutCancellationException 通過 withTimeout 函數拋出。

在本例中,我們在main函數中使用了withTimeout ,運行過程中會導致 Crash 。

有兩種解決辦法,就是使用 try{…} catch (e: TimeoutCancellationException){…} 代碼塊;另一種辦法是使用在超時的情況下不是拋出異常而是返回 null 的 withTimeoutOrNull 函數:

val result = withTimeoutOrNull(1300L) {     repeat(1000) { i ->         println("I'm sleeping $i ...")         delay(500L)     }     "Done" // will get cancelled before it produces this result } println("Result is $result")

打印結果:

I'm sleeping 0 ... I'm sleeping 1 ... I'm sleeping 2 ... Result is null

異步的超時和資源

withTimeout 中的超時事件相對於在其塊中運行的代碼是異步的,並且可能在任何時間發生,甚至在從超時塊內部返回之前。如果你在塊內部打開或獲取一些資源,需要關閉或釋放到塊外部。

例如,在這裏,我們用 Resource 類模擬一個可關閉資源,它只是通過對獲得的計數器遞增,並對該計數器從其關閉函數遞減來跟蹤創建次數。讓我們用小超時運行大量的協程,嘗試在一段延遲後從withTimeout塊內部獲取這個資源,並從外部釋放它。

``` var acquired = 0

class Resource {     init { acquired++ } // Acquire the resource     fun close() { acquired-- } // Release the resource }

fun main() {     runBlocking {         repeat(100_000) { // Launch 100K coroutines             launch {                  val resource = withTimeout(60) { // Timeout of 60 ms                     delay(50) // Delay for 50 ms                     Resource() // Acquire a resource and return it from withTimeout block                      }                 resource.close() // Release the resource             }         }     }     // Outside of runBlocking all coroutines have completed     println(acquired) // Print the number of resources still acquired } ```

如果運行上面的代碼,您將看到它並不總是打印 0,儘管它可能取決於您的機器的時間,在本例中您可能需要調整超時以實際看到非零值。

要解決這個問題,可以在變量中存儲對資源的引用,而不是從withTimeout塊返回它。

fun main() {     runBlocking {         repeat(100_000) { // Launch 100K coroutines             launch {                 var resource: Resource? = null // Not acquired yet                 try {                     withTimeout(60) { // Timeout of 60 ms                         delay(50) // Delay for 50 ms                         resource = Resource() // Store a resource to the variable if acquired                     }                     // We can do something else with the resource here                 } finally {                     resource?.close() // Release the resource if it was acquired                 }             }         }     } // Outside of runBlocking all coroutines have completed     println(acquired) // Print the number of resources still acquired }

這樣這個例子總是輸出0。資源不會泄漏。

取消檢查的底層原理

在探索協程取消的有效性時,我們知道協程代碼必須通過與掛起函數的配合才能被取消。

kotlinx.coroutines 中所有掛起函數(帶有 suspend 關鍵字函數)都是可以被取消的。suspend 函數會檢查協程是否需要取消並在取消時拋出 CancellationException 。

關於協程的取消機制,很明顯和 suspend 關鍵字有關。為了測試 suspend 關鍵字的作用,實現下面的代碼:

class Solution {     suspend fun func(): String {         return "測試 suspend 關鍵字"     } }

作為對照組,另一個是不加 suspend 關鍵字的 func 方法:

class Solution {     fun func(): String {         return "測試 suspend 關鍵字"     } }

兩者反編譯成 Java :

``` // 普通的方法 public final class Solution {     public static final int $stable = LiveLiterals$SolutionKt.INSTANCE.Int$class-Solution();

@NotNull     public final String func() {         return LiveLiterals$SolutionKt.INSTANCE.String$fun-func$class-Solution();     } }

// 帶有 suspend 關鍵字的方法 public final class Solution {     public static final int $stable = LiveLiterals$SolutionKt.INSTANCE.Int$class-Solution();

@Nullable     public final Object func(@NotNull Continuation<? super String> $completion) {         return LiveLiterals$SolutionKt.INSTANCE.String$fun-func$class-Solution();     } } ```

suspend 關鍵字修飾的方法反編譯後默認生成了帶有 Continuation 參數的方法。説明 suspend 關鍵字的玄機在 Continuation 類中。

Continuation 是 Kotlin 協程的核心思想 Continuation-Passing Style 的實現。原理參考簡述協程的底層實現原理

通過在普通函數的參數中增加一個 Continuation 參數,這個 continuation 的性質類似於一個 lambda 對象,將方法的返回值類型傳遞到這個 lambda 代碼塊中。

什麼意思呢?就是本來這個方法的返回類型直接 return 出來的:

val a: String = func() print(a)

而經過 suspend 修飾,代碼變成了這個樣子:

func { a ->     print(a) }

Kotlin 協程就是通過這樣的包裝,將比如 launch 方法,實際上是 launch 最後一個參數接收的是 lambda 參數。也就是把外部邏輯傳遞給函數內部執行。

回過頭來再來理解 suspend 關鍵字,我們知道帶有 suspend 關鍵字的方法會對協程的取消進行檢查,從而取消協程的執行。從這個能力上來看,我理解他應該會自動生成類似下面的邏輯代碼:

生成的函數 {     if(!當前協程.isActive) {         throw CancellationException()     }     // ... 這裏是函數真實邏輯 }

suspend 修飾的函數,會自動生成一個掛起點,來檢查協程是否應該被掛起。

顯然 Continuation 中聲明的函數也證實了掛起的功能:

``` public interface Continuation {     /*      * The context of the coroutine that corresponds to this continuation.      /     public val context: CoroutineContext

/*      * 恢復相應協程的執行,將成功或失敗的結果作為最後一個掛起點的返回值傳遞。      /     public fun resumeWith(result: Result) } ```

協程本質上是產生了一個 switch 語句,每個掛起點之間的邏輯都是一個 case 分支的邏輯。參考 協程是如何實現的 中的例子:

```         Function1 lambda = (Function1)(new Function1((Continuation)null) {             int label;

@Nullable             public final Object invokeSuspend(@NotNull Object $result) {                 byte text;                 @BlockTag1: {                     Object result;                     @BlockTag2: {                         result = IntrinsicsKt.getCOROUTINE_SUSPENDED();                         switch(this.label) {                             case 0:                                 ResultKt.throwOnFailure($result);                                 this.label = 1;                                 if (SuspendTestKt.dummy(this) == result) {                                     return result;                                 }                                 break;                             case 1:                                 ResultKt.throwOnFailure($result);                                 break;                             case 2:                                 ResultKt.throwOnFailure($result);                                 break @BlockTag2;                             case 3:                                 ResultKt.throwOnFailure($result);                                 break @BlockTag1;                             default:                                 throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");                         }

text = 1;                         System.out.println(text);                         this.label = 2;                         if (SuspendTestKt.dummy(this) == result) {                             return result;                         }                     }

text = 2;                     System.out.println(text);                     this.label = 3;                     if (SuspendTestKt.dummy(this) == result) {                         return result;                     }                 }                 text = 3;                 System.out.println(text);                 return Unit.INSTANCE;             }

@NotNull             public final Continuation create(@NotNull Continuation completion) {                 Intrinsics.checkNotNullParameter(completion, "completion");                 Function1 funcation = new (completion);                 return funcation;             }

public final Object invoke(Object object) {                 return (()this.create((Continuation)object)).invokeSuspend(Unit.INSTANCE);             }         }); ```

可以看出,在每個分支都會執行一次 ResultKt.throwOnFailure($result);,從名字上就知道,這就是檢查是否需要取消並拋出異常的代碼所在:

@PublishedApi @SinceKotlin("1.3") internal fun Result<*>.throwOnFailure() {     if (value is Result.Failure) throw value.exception }

這裏的 Result 類是一個包裝類,它將成功的結果封裝為類型 T 的值,或將失敗的結果封裝為帶有任意Throwable異常的值。

```         @Suppress("INAPPLICABLE_JVM_NAME")         @InlineOnly         @JvmName("success")         public inline fun  success(value: T): Result =             Result(value)

/*          * Returns an instance that encapsulates the given [Throwable] [exception] as failure.          /         @Suppress("INAPPLICABLE_JVM_NAME")         @InlineOnly         @JvmName("failure")         public inline fun  failure(exception: Throwable): Result =             Result(createFailure(exception)) ```

成功和失敗的方法類型是不一樣的,證實了這一點,success 方法接收類型為 T 的參數;failure 接收 Throwable 類型的參數。

到這裏 suspend 方法掛起的原理就明瞭了:在協程的狀態機中,通過掛起點會分割出不同的狀態,對每一個狀態,會先進行掛起結果的檢查。 這會導致以下結果:

  • 協程的取消機制是通過掛起函數的掛起點檢查來進行取消檢查的。證實了為什麼如果沒有 suspend 函數(本質是掛起點),協程的取消就不會生效。
  • 協程的取消機制是需要函數合作的,就是通過 suspend 函數來增加取消檢查的時機。
  • 父協程會執行完所有的子協程(掛起函數),因為代碼的本質是一個循環執行 switch 語句,當一個子協程(或掛起函數)執行結束,會繼續執行到下一個分支。但是最後一個掛起點後續的代碼並不會被執行,因為最後一個掛起點檢查到失敗,不會繼續跳到最後的 label 分支。