kotlin協程系列之基礎設施

語言: CN / TW / HK

前一篇文章介紹了kotlin協程的歷史及現狀,接下來就介紹一下kotlin協程吧。

kotlin協程分為標準庫提供的基礎設施以及官方協程庫兩部分,標準庫的基礎設施依照協程的思想提供基本的掛起、恢復介面,官方協程庫基於這些介面提供豐富實用的功能。

今天先介紹一下kotlin協程的基礎設施。為了讓大家對kotlin協程有更為直觀的認識,先展示一個demo:

```kotlin // 程式碼清單2-1 val continuation = suspend { println("simpleTest: In Coroutine") 5 }.createCoroutine(object : Continuation { override fun resumeWith(result: Result) { println("resumeWith: Continuation End: $result") }

override val context: CoroutineContext
get() = EmptyCoroutineContext

}) continuation.resume(Unit) ```

demo由三個部分組成:

  • 由suspend宣告的掛起函式
  • 呼叫createCoroutine方法建立Continuation例項
  • 通過continuation.resume啟動協程

suspend函式作為協程體,輸出一條日誌,並且返回5作為執行結果。協程體執行完成後,回撥Continuation的resumeWith方法,其內輸出協程的執行結果。

suspend函式

suspend函式能在不阻塞執行緒執行的情況下將協程掛起,適合用於將非同步程式碼轉為同步形式。在suspend函式中可以呼叫普通函式和suspend函式,然而在普通函式中卻不能呼叫suspend函式(由於編譯時存在CPS轉換,後續講協程實現時詳細說明)。

當suspend函式被真正掛起的時候,對應的呼叫處被乘坐掛起點

suspend函式內部通過呼叫suspendCoroutine方法實現掛起操作的,如果沒有呼叫suspendCoroutine,那麼suspend函式並不會真正的掛起,前面的demo在實際執行時就不會掛起。

suspendCoroutine的函式簽名如下:

kotlin // 程式碼清單2-2 suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T

它是個行內函數,其中block是引數為Continuation的回撥函式,在非同步任務完成時,早block內部呼叫continuation.resumeWith()或者呼叫continuation.resume()continuation.resumeWithException()恢復被掛起的協程。

下面看一個suspend函式的例子:

```kotlin // 程式碼清單2-3 suspend fun suspendFun1() { println("suspendFun1") }

suspend fun suspendFun2() = suspendCoroutine { thread { sleep(50) it.resume(65) } }

fun sleep(time:Long) { try { Thread.sleep(time) } catch (e: InterruptedException) { e.printStackTrace() } }

suspend { suspendFun1() val res2 = suspendFun2() println("suspendFun2 result $res2") res2 + 3 }.startCoroutine(object : Continuation { override val context: CoroutineContext get() = EmptyCoroutineContext

override fun resumeWith(result: Result<Int>) {
    println("continuationInterceptorTest resume $result")
}

}) ```

suspendFun1不會真正執行掛起邏輯,而suspendFun2通過suspendCoroutine建立一個真正的掛起函式,並在內部開啟子執行緒模擬耗時任務執行。所以在執行到suspendFun2時,當前協程先被掛起,等待模擬耗時操作完成後,通過continuation的resume方法恢復被掛起的協程。

構建協程

在程式碼清單2-1中,我們使用createCoroutinecontinuation.resume建立並執行協程,當然我們還可以使用startCoroutine直接建立並啟動協程:

```kotlin // 程式碼清單2-4 suspend { println("simpleTest: In Coroutine with start") 6 }.startCoroutine(object : Continuation { override val context: CoroutineContext get() = EmptyCoroutineContext

override fun resumeWith(result: Result<Int>) {
    println("resumeWith: Continuation with start End: $result")
    println()
}

}) ```

可以看出startCoroutine用法和createCoroutine類似,下面就看看它們的函式簽名吧:

```kotlin // 程式碼清單2-5 fun (suspend () -> T).createCoroutine( completion: Continuation ): Continuation

fun (suspend () -> T).startCoroutine( completion: Continuation ) ```

startCoroutinecreateCoroutine都是作為(suspend () -> T)函式型別的擴充套件函式定義的,因此在程式碼清單2-1和2-4中可以直接在suspend函式後直接呼叫,並且它們都接收一個Continuation型別例項作為回撥,當協程執行完成後,會呼叫completion的resumeWith函式,並將協程執行結果通過result引數傳遞。

標準庫還定義了帶Receiver的startCoroutinecreateCoroutine,用於將協程體的作用域設定成Receiver:

```kotlin // 程式碼清單2-6 fun (suspend R.() -> T).createCoroutine( receiver: R, completion: Continuation )

fun (suspend R.() -> T).startCoroutine( receiver: R, completion: Continuation ) ```

Continuation

startCoroutinecreateCoroutine建立協程時都用到Continuation物件,它用來記錄被掛起的協程在掛起點的狀態,其內儲存著掛起點之後要執行的程式碼。考慮如下序列生成器:

kotlin // 程式碼清單2-7 sequence { for (i in 1..10) yield(i * i) println("over") }

該序列生成器在for迴圈內呼叫yield生成新的序列,並將當前協程掛起,所以在yield內部一定會有Continuation記錄剩餘要執行的程式碼,並且有十個Continuation物件:第一次執行i=2和迴圈並掛起,第二次執行i=3並掛起,以此邏輯依次執行。當協程被建立,但還沒開始執行時,即呼叫了createCoroutine後未呼叫resume,此時存在一個初始的Continuation<Unit>表示所有的協程程式碼。

下面我們看看Continuation的定義:

kotlin // 程式碼清單2-8 interface Continuation<in T> { val context: CoroutineContext fun resumeWith(result: Result<T>) }

context表示當前協程的上下文,使用者可以根據需求自定義上下文,稍後會詳細介紹。在前面的demo中,我們都是直接使用EmptyCoroutineContext,這是一個自帶的上下文,無特殊需求時可以用它。

resumeWith函式是協程的完成回撥,不論協程執行成功或失敗,都通過此函式通知使用者。為了方便使用,kotlin提供了兩個擴充套件函式:

kotlin // 程式碼清單2-9 fun <T> Continuation<T>.resume(value: T) fun <T> Continuation<T>.resumeWithException(exception: Throwable)

resume作為成功通知,resumeWithException作為失敗通知。

上下文

協程上下文類似Set集合,用於保存於協程關聯的自定義資料:可以包含協程的執行緒策略、日誌資訊、協程安全和事務相關資訊、協程id及名字等等。可以將協程當做輕量級執行緒,那麼協程上下文就類似執行緒的ThreadLocal變數,不過ThreadLocal是可變的,而協程上下文是不可變的。

協程上下文在kotlin中用CoroutineContext表示,這是一個集合,下面是其在標準庫中的定義:

```kotlin // 程式碼清單2-10 interface CoroutineContext { operator fun get(key: Key): E? fun fold(initial: R, operation: (R, Element) -> R): R operator fun plus(context: CoroutineContext): CoroutineContext fun minusKey(key: Key<*>): CoroutineContext

interface Element : CoroutineContext {
    val key: Key<*>
}

interface Key<E : Element>

} ```

從定義中可以看出,CoroutineContext由Element組成,Element僅包含欄位key,明顯key是作為Element的索引。

Key介面通過泛型將Key與Element關聯起來,可以看做Key即為Element本身,這與Set類似。然而CoroutineContext過載了操作符[],即程式碼清單2-10中的get函式,它接收Key型別索引,而返回Element型別的元素,從這方面看,CoroutineContext又喝Map相似。

foldplusminusKey屬於集合操作,這裡就不詳細介紹了,請自行查閱文件。

總之,CoroutineContext是一種混合了Set和Map結構的新型集合。

EmptyCoroutineContext

EmptyCoroutineContext 是標準庫提供的一個不包含任何資料的CoroutineContext例項,在沒有特殊需求時可以使用此例項,其定義如下:

```kotlin // 程式碼清單2-11 public object EmptyCoroutineContext : CoroutineContext, Serializable { private const val serialVersionUID: Long = 0 private fun readResolve(): Any = EmptyCoroutineContext

public override fun <E : Element> get(key: Key<E>): E? = null
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial
public override fun plus(context: CoroutineContext): CoroutineContext = context
public override fun minusKey(key: Key<*>): CoroutineContext = this
public override fun hashCode(): Int = 0
public override fun toString(): String = "EmptyCoroutineContext"

} ```

實現

在實現自定義上下文時,不能直接實現CoroutineContext介面,標準庫提供了AbstractCoroutineContextElement,應該實現此類,其定義如下:

kotlin // 程式碼清單2-12 public abstract class AbstractCoroutineContextElement(public override val key: Key<*>) : Element

AbstractCoroutineContextElement內部通過重寫Element介面的key欄位來指定具體的Element型別。

下面看一個自定義上下文的demo:

```kotlin // 程式碼清單2-13 class CoroutineName(val name: String) : AbstractCoroutineContextElement(Key) { companion object Key : CoroutineContext.Key }

var coroutineContext: CoroutineContext = EmptyCoroutineContext coroutineContext += CoroutineName("c_test")

suspend { println("coroutineContextTest: In Coroutine ${coroutineContext[CoroutineName]?.name} with start") }.startCoroutine(object : Continuation { override val context: CoroutineContext get() = coroutineContext

override fun resumeWith(result: Result<Int>) {
    println("resumeWith: Continuation with start End: $result")
}

}) ```

CoroutineName中通過伴生物件志明Key型別,並且通過父類建構函式將其傳遞上去,以指定CoroutineName所對應的Element型別。

接下去建立CoroutineName例項,並加到EmptyCoroutineContext中,最後再協程體內部通過coroutineContext以及對應的Key(即伴生物件CoroutineName),即可獲的CoroutineName的例項。

攔截器

在android程式碼中,更新UI都要在主執行緒中執行,當發起網路請求或其他耗時操作都會切到子執行緒中執行,而suspend函式恢復執行依賴呼叫continuation.resumeWIth()所在的執行緒,這樣就要手動切換執行緒,容易引發bug。

ContinuationInterceptor提供了攔截並重新包裝Continuation例項的能力,通過重新包裝Continuation例項,就可以實現自定義的需求,比如每次都自動切換回主執行緒執行。

ContinuationInterceptor介面定義如下:

kotlin // 程式碼清單2-14 interface ContinuationInterceptor : CoroutineContext.Element { companion object Key : CoroutineContext.Key<ContinuationInterceptor> fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> fun releaseInterceptedContinuation(continuation: Continuation<*>) }

通過實現interceptContinuation方法重新包裝原始的continuation,以實現自定義的需求。下面我們看一個在協程恢復時新增日誌的攔截器示例:

```kotlin // 程式碼清單2-15 class LogInterceptor() : ContinuationInterceptor { override val key: CoroutineContext.Key<*> = ContinuationInterceptor override fun interceptContinuation(continuation: Continuation): Continuation = LogContinuation(continuation) }

class LogContinuation(private val continuation: Continuation) : Continuation by continuation { override fun resumeWith(result: Result) { println("before resumeWith: $result") continuation.resumeWith(result) println("after resumeWith") } }

fun continuationInterceptorTest() { suspend { suspendFun1() val res2 = suspendFun2() println("suspendFun2 result $res2") res2 + 3 }.startCoroutine(object : Continuation { override val context: CoroutineContext get() = LogInterceptor()

    override fun resumeWith(result: Result<Int>) {
        println("continuationInterceptorTest resume $result")
    }

})

}

suspend fun suspendFun1() { println("suspendFun1") }

suspend fun suspendFun2() = suspendCoroutine { thread { sleep(50) it.resume(65) } } ```

demo中首先定義了LogInterceptorLogContinuation兩個類。LogContinuation接受Continuation作為建構函式引數,通過委託方式實現Continuation介面,在resumeWith中新增日誌以記錄協程的執行記錄。

continuationInterceptorTest中有suspendFun1suspendFun2兩個掛起函式,由於suspendFun1並沒有真正掛起,所以在執行suspendFun1時沒有LogContinuation的日誌,suspendFun2則是在呼叫it.resume(65)時會列印相關的日誌。

參考連結