【Kotlin回顧】13.Kotlin協程—Job

語言: CN / TW / HK

theme: cyanosis


開啟掘金成長之旅!這是我參與「掘金日新計劃 · 12 月更文挑戰」的第13天,點選檢視活動詳情

前面在學習協程啟動方式的時候在launch的原始碼中有一個返回值是Jobasync的返回Deferred也是實現了Job,那麼而也就是說launchasync在建立一個協程的時候也會建立一個對應的Job物件。還提到過Job是協程的控制代碼,那麼Job到底是什麼?它有什麼用?

1.Job的生命週期

先看一下Job的原始碼,這裡只保留了跟標題相關的內容

``` public interface Job : CoroutineContext.Element {

// ------------ 狀態查詢API ------------

/**
* 當該Job處於活動狀態時,返回true——它已經開始,沒有完成,也沒有取消。
* 如果沒有取消或失敗,等待其子任務完成的Job仍被認為是活動的。
*/
public val isActive: Boolean

/**
* 當Job因任何原因完成時返回true。作業被取消或失敗並已完成其執行也被視為完成。
* Job只有在所有子任務完成後才算完成。
*/
public val isCompleted: Boolean

/**
*如果該作業因任何原因被取消,無論是通過顯式呼叫cancel,還是因為它失敗或其子或父作業被取消,
* 則返回true。在一般情況下,它並不意味著任務已經完成,因為它可能仍然在完成它正在做的事情,
* 並等待它的子任務完成。
*/
public val isCancelled: Boolean

// ------------ 操控狀態API ------------

/**
* 如果Job所在的協程還沒有被啟動那麼呼叫這個方法就會啟動協程
* 如果這個協程被啟動了返回true,如果已經啟動或者執行完畢了返回false
*/
public fun start(): Boolean

/**
* 取消此Job,可用於指定錯誤訊息或提供有關取消原因的其他詳細資訊
*/
public fun cancel(cause: CancellationException? = null)

/**
* 取消此Job
*/
public fun cancel(): Unit = cancel(null)

public fun cancel(cause: Throwable? = null): Boolean

// ------------ 等待狀態API ------------

/**
* 掛起協程,知道任務完成再恢復
*/
public suspend fun join()

// ------------ 完成狀態回撥API ------------

/**
* 註冊Job完成時同步呼叫的處理程式.
* 當Job已經完成時,將處理程式將立即呼叫Job的異常或取消原因或null
* 否則,該處理程式將在此Job完成時呼叫一次。
*/
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle

/**
* 註冊在取消或完成此Job時同步呼叫的處理程式。
* 當Job已經被取消並完成執行時,處理程式將立即呼叫Job的取消原因或null,
* 除非將invokeImmediately設定為false。否則,
* 當Job取消或完成時將呼叫一次handler。
*/
public fun invokeOnCompletion(
    onCancelling: Boolean = false,
    invokeImmediately: Boolean = true,
    handler: CompletionHandler): DisposableHandle

} ```

從原始碼中可以發現這幾個函式和變數跟Actviity或者Fragment非常像,所以我們可以總結出兩個結論:

  • Job可以監測協程的生命週期
  • Job可以操控協程

在例子中使用這幾個函式和變數再來校驗一下上面的結論:

``` fun main() = runBlocking { val job = launch { delay(1000L) } job.log() job.cancel() job.log() }

fun Job.log() { println( """ isActive:$isActive isCompleted:$isCompleted isCancelled:$isCancelled Thread:${Thread.currentThread().name}
================================ """.trimIndent() ) }

//輸出結果 //isActive:true //isCompleted:false //isCancelled:false //Thread:main @coroutine#1
//================================ //isActive:false //isCompleted:false //isCancelled:true //Thread:main @coroutine#1
//================================ ```

Job.log用了擴充套件函式,方便呼叫Job中的狀態監測返回值。

上面的程式碼通過launch建立了一個協程,接收了Job的返回值,這裡用這個job物件做了三件事:

  • 第一個job.log()launch的建立標誌著協程已經被啟動所以在第一個job.log()的日誌中isActive返回值是true;
  • job.cancel() 這裡呼叫了job的取消函式將協程任務取消;
  • 第二個job.log() 上面的程式碼將協程任務取消了,然後再次獲取協程狀態發現isActivte返回false,isCancelled返回true。

上面的程式碼也印證了前面提出的結論,還有一個函式start沒使用,再來呼叫它之後輸出的日誌:

``` fun main() = runBlocking { //變化1 val job = launch(start = CoroutineStart.LAZY) { delay(1000L) } job.log() //變化2 job.start() job.log() job.cancel() job.log() }

fun Job.log() { println( """ isActive:$isActive isCompleted:$isCompleted isCancelled:$isCancelled Thread:${Thread.currentThread().name}
================================ """.trimIndent() ) }

//輸出結果: //isActive:false //isCompleted:false //isCancelled:false //Thread:main @coroutine#1
//================================ //isActive:true //isCompleted:false //isCancelled:false //Thread:main @coroutine#1
//================================ //isActive:false //isCompleted:false //isCancelled:true //Thread:main @coroutine#1
//================================ ```

上面的程式碼增加了兩處修改:

  • 變化1:協程在創建出來的時候就已經被啟動,因此為了檢視呼叫Job.start()前的日誌需要加上懶啟動
  • 變化2:呼叫start函式啟動協程

從輸出結果來看沒有呼叫start函式前isActive返回true,呼叫後就返回了true,當使用懶啟動後在呼叫cancel函式與前面使用cancel函式輸出的日誌是一樣的,可以得知懶啟動後對協程的生命週期並沒有設麼影響(這可能是句廢話)。

現在還有最後一個變數沒有看isCompleted,在上面的程式碼中新增一個延時函式,等協程任務結束再列印日誌

``` fun main() = runBlocking { val job = launch(start = CoroutineStart.LAZY) { delay(1000L) } job.log() job.start() job.log() job.cancel() delay(2000L) //變化在這裡 job.log() }

fun Job.log() { println( """ isActive:$isActive isCompleted:$isCompleted isCancelled:$isCancelled Thread:${Thread.currentThread().name}
================================ """.trimIndent() ) }

//輸出結果: //isActive:false //isCompleted:false //isCancelled:false //Thread:main @coroutine#1
//================================ //isActive:true //isCompleted:false //isCancelled:false //Thread:main @coroutine#1
//================================ //isActive:false //isCompleted:true //isCancelled:true //Thread:main @coroutine#1
//================================ ```

從輸出結果中看到當呼叫isCancelisCompleted也返回了true,也就是說任務結束了。

上面的程式碼為了監測isCompleted的狀態加了一個延時函式delay,但是這種方式並不建議使用,因為這個時間他不是固定的,例如從後臺請求資料或者下載檔案,這種情況下的時間是完全無法預知的。

現在假設已經知道協程執行完畢需要delay(1000L)的時間,如果將協程內的delay時長設定的大於外部的delay時長,會帶來什麼問題?

``` fun main() = runBlocking { val job = launch(start = CoroutineStart.LAZY) { delay(4000L) } job.log() job.start() job.log() delay(1000L) job.log() println("Process end!") }

fun Job.log() { println( """ isActive:$isActive isCompleted:$isCompleted isCancelled:$isCancelled Thread:${Thread.currentThread().name}
================================ """.trimIndent() ) }

//輸出結果: //isActive:false //isCompleted:false //isCancelled:false //Thread:main @coroutine#1
//================================ //isActive:true //isCompleted:false //isCancelled:false //Thread:main @coroutine#1
//================================ //isActive:true //isCompleted:false //isCancelled:false //Thread:main @coroutine#1
//================================ //Process end! ```

由輸出結果可知isCompleted狀態是false,協程任務是否執行完畢不得而知。另外當println("Process end!")執行完畢後程序並沒有立即輸出Process finished with exit code 0,這是因為runBlocking 會一直阻塞,等到 job 任務執行完畢以後才真正退出。

那要如何解決這個問題?

``` //Job#join

/* * 掛起協程,知道任務完成再恢復 / public suspend fun join() ```

joinJob中的一個掛起函式,呼叫後會掛起當前程式的執行流程,等待job當中的協程任務執行完畢然後再恢復當前程式的執行流程。

join將任務掛起後再恢復,那要如何知道任務是否執行完畢了?invokeOnCompletion可以監聽任務執行的狀態

``` //Job#invokeOnCompletion /* * 註冊Job完成時同步呼叫的處理程式. * 當Job已經完成時,將處理程式將立即呼叫Job的異常或取消原因或null * 否則,該處理程式將在此Job完成時呼叫一次。 / public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle

//Job#invokeOnCompletion /* * 註冊在取消或完成此Job時同步呼叫的處理程式。 * 當Job已經被取消並完成執行時,處理程式將立即呼叫Job的取消原因或null, * 除非將invokeImmediately設定為false。否則, * 當Job取消或完成時將呼叫一次handler。 / public fun invokeOnCompletion( onCancelling: Boolean = false, invokeImmediately: Boolean = true, handler: CompletionHandler): DisposableHandle ```

joininvokeOnCompletion的使用如下:

``` fun main() = runBlocking { val job = launch(start = CoroutineStart.LAZY) { delay(4000L) } job.log() job.start() job.log() //新增 job.join() //新增 job.invokeOnCompletion { println("==========Task status==========") job.log() } println("Process end!") }

fun Job.log() { println( """ isActive:$isActive isCompleted:$isCompleted isCancelled:$isCancelled Thread:${Thread.currentThread().name}
================================ """.trimIndent() ) }

//輸出結果: //isActive:false //isCompleted:false //isCancelled:false //Thread:main @coroutine#1
//================================ //isActive:true //isCompleted:false //isCancelled:false //Thread:main @coroutine#1
//================================ //==========Task status========== //isActive:false //isCompleted:true //isCancelled:false //Thread:main @coroutine#1
//================================ //Process end! ```

可以看到加入joininvokeOnCompletion之後isCompleted的狀態就正確了,同時Process end!輸出後Process finished with exit code 0也會很快的輸出,這說明任務確實執行完畢了。

在講協程的啟動方式的時候提出一個觀點:launch的返回值Job代表的是協程的控制代碼。那麼Job是協程的控制代碼該怎麼理解?

控制代碼: 是指一箇中間媒介,可以操控一個東西。就類似於遙控器操作空調場景中遙控器就是控制代碼,開關控制燈具場景中開關就是控制代碼。

所以Job和協程的關係就類似於遙控器和空調,開關和燈具。Job可以監測協程的執行狀態也可以控制協程的執行狀態。那麼Job就和遙控器、開關一樣看做是一個控制代碼。

2.Deffered

launch直接建立了Jobasync通過Deffered間接建立了Job物件,但是它並沒有在 Job 的基礎上擴展出很多其他功能,而接收一個返回值是依靠 await()方法,那await方法是如何實現的?

``` fun main() = runBlocking { val deferred = async { logX("Coroutine start!") delay(1000L) logX("Coroutine end!") "Coroutine result!" } val result = deferred.await() println("Result = $result") logX("Process end!") }

fun logX(any: Any?) { println( """ ================================ $any Thread:${Thread.currentThread().name} ================================ """.trimIndent() ) }

//輸出結果: //Coroutine start! //Thread:main @coroutine#2 //================================ //================================ //Coroutine end! //Thread:main @coroutine#2 //================================ //Result = Coroutine result! //================================ //Process end! //Thread:main @coroutine#1 ```

從輸出結果來看,await方法可以獲取協程執行結果外,好像還會阻塞協程的執行流程,直到協程任務執行完畢。看一下await的原始碼

//Deferred#await public interface Deferred<out T> : Job { ... public suspend fun await(): T ... }

從原始碼來看await也是一個掛起函式,它跟join是一樣的,看似阻塞的過程其實是協程的掛起恢復能力。

所以,總的來說,Deferred 只是比 Job 多了一個 await()掛起函式而已,通過這個掛起函式,就可以等待協程執行完畢的同時,還可以直接拿到協程的執行結果。

3.Job與結構化併發

在其他地方看過這麼一句話:協程的優勢在於結構化併發, 這句話該如何理解?

這句話可以理解為帶有結構和層級的併發,用程式碼表現就像這樣:

``` fun main() = runBlocking { val parentJob: Job var childJob1: Job? = null var childJob2: Job? = null var childJob3: Job? = null

parentJob = launch {
    childJob1 = launch {
        delay(1000L)
    }

    childJob2 = launch {
        delay(3000L)
    }

    childJob3 = launch {
        delay(5000L)
    }
}

delay(500L)

parentJob.children.forEachIndexed { index, job ->
    when (index) {
        0 -> println("childJob1 === childJob1 is ${childJob1 === job}")
        1 -> println("childJob2 === childJob2 is ${childJob2 === job}")
        2 -> println("childJob3 === childJob3 is ${childJob3 === job}")
    }
}

parentJob.join()
logX("Process end!")

}

//輸出結果: //childJob1 === childJob1 is true //childJob2 === childJob2 is true //childJob3 === childJob3 is true //================================ //Process end! //Thread:main @coroutine#1 ```

上面的程式碼是父子層級,父Job使用launch啟動了協程同時它的內部還有三個Job,三個子Job是併發執行的,同時也是用過launch啟動的協程,呼叫了parentJob.join()那麼掛起的時間就是childJob3的時長—5秒,因為它要等待所有任務都執行完畢才會恢復執行,然後通過children.forEachIndexed進行遍歷並分別對比他們與三個子Job的引用是否相等“===”代表了引用相等,即是否是同一個物件)。圖示如下

前面講過,Job可以呼叫cancel方法取消執行,那麼當呼叫parentJob.cancel會有什麼樣的情況?

``` fun main() = runBlocking { val parentJob: Job var childJob1: Job? = null var childJob2: Job? = null var childJob3: Job? = null

parentJob = launch {
    childJob1 = launch {
        println("childJob1 start")
        delay(1000L)
        println("childJob1 end")
    }

    childJob2 = launch {
        println("childJob2 start")
        delay(3000L)
        println("childJob2 start")
    }

    childJob3 = launch {
        println("childJob3 start")
        delay(5000L)
        println("childJob3 start")
    }
}

delay(500L)

parentJob.cancel()
logX("Process end!")

}

//輸出結果: //childJob1 start //childJob2 start //childJob3 start //================================ //Process end! //Thread:main @coroutine#1 ```

parentJob.cancel呼叫後,每個子Job只是輸出了start,這就可以得出一個結論:父Job取消後子Job也會依次跟著取消。如果呼叫任何一個子Jobcancel則不會對父Job和其他子Job產生影響。

到這裡對於開頭的那句協程的優勢在於結構化併發就有更更好的理解了,這是Kotlin協程的第二大優勢。

4.launch和async的使用場景

  • launch: 主要用來發起一些不需要任何結果的耗時任務,這個任務在執行中可以改變它的執行狀態。
  • async: 主要用來發起一些需要結果的耗時任務,以及與掛起函式結合,優化併發。