聊一聊Kotlin協程"低階"api

語言: CN / TW / HK

theme: cyanosis

聊一聊kotlin協程“低階”api

Kotlin協程已經出來很久了,相信大家都有不同程度的用上了,由於最近處理的需求有遇到協程相關,因此今天來聊一Kotlin協程的“低階”api,首先低階api並不是它真的很“低階”,而是kotlin協程庫中的基礎api,我們一般開發用的,其實都是通過低階api進行封裝的高階函式,本章會通過低階api的組合,實現一個自定義的async await 函式(下文也會介紹kotlin 高階api的async await),涉及的低階api有startCoroutineContinuationInterceptor

startCoroutine

我們知道,一個suspend關鍵字修飾的函式,只能在協程體中執行,伴隨著suspend 關鍵字,kotlin coroutine common庫(平臺無關)也提供出來一個api,用於直接通過suspend 修飾的函式直接啟動一個協程,它就是startCoroutine

@SinceKotlin("1.3") @Suppress("UNCHECKED_CAST") public fun <R, T> (suspend R.() -> T).startCoroutine( 作為Receiver receiver: R, 當前協程結束時的回撥 completion: Continuation<T> ) { createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit) } 可以看到,它的Receiver是(suspend R.() -> T),即是一個suspend修飾的函式,那麼這個有什麼作用呢?我們知道,在普通函式中無法調起suspend函式(因為普通函式沒有隱含的Continuation物件,這裡我們不在這章講,可以參考kotlin協程的資料)

image.png 但是普通函式是可以調起一個以suspend函式作為Receiver的函式(本質也是一個普通函式)

image.png 其中startCoroutine就是其中一個,本質就是我們直接從外部提供了一個Continuation,同時呼叫了resume方法,去進入到了協程的世界 ```

startCoroutine實現

createCoroutineUnintercepted(completion).intercepted().resume(Unit) ``` 這個原理我們就不細講下去原理,之前也有寫過相關的文章。通過這種呼叫,我們其實就可以實現在普通的函式環境,開啟一個協程環境(即帶有了Continuation),進而呼叫其他的suspend函式。

ContinuationInterceptor

我們都知道攔截器的概念,那麼kotlin協程也有,就是ContinuationInterceptor,它提供以AOP的方式,讓外部在resume(協程恢復)前後進行自定義的攔截操作,比如高階api中的Diapatcher就是。當然什麼是resume協程恢復呢,可能讀者有點懵,我們還是以上圖中出現的mySuspendFunc舉例子 ``` mySuspendFunc是一個suspned函式 ::mySuspendFunc.startCoroutine(object : Continuation { override val context: CoroutineContext get() = EmptyCoroutineContext

override fun resumeWith(result: Result<Unit>) {

}

}) 它其實等價於 val continuation = ::mySuspendFunc.createCoroutine(object :Continuation{ override val context: CoroutineContext get() = EmptyCoroutineContext

override fun resumeWith(result: Result<Unit>) {
    Log.e("hello","當前協程執行完成的回撥")
}

}) continuation.resume(Unit) ``` startCoroutine方法就相當於建立了一個Continuation物件,並呼叫了resume。建立Continuation可通過createCoroutine方法,返回一個Continuation,如果我們不呼叫resume方法,那麼它其實什麼也不會執行,只有呼叫了resume等執行方法之後,才會執行到後續的協程體(這個也是協程內部實現,感興趣可以看看之前文章)

而我們的攔截器,就相當於在continuation.resume前後,可以新增自己的邏輯。我們可以通過繼承ContinuationInterceptor,實現自己的攔截器邏輯,其中需要複寫的方法是interceptContinuation方法,用於返回一個自己定義的Continuation物件,而我們可以在這個Continuation的resumeWith方法裡面(當呼叫了resume之後,會執行到resumeWith方法),進行前後列印/其他自定義操作(比如切換執行緒)

``` class ClassInterceptor() :ContinuationInterceptor { override val key = ContinuationInterceptor override fun interceptContinuation(continuation: Continuation): Continuation =MyContinuation(continuation)

} class MyContinuation(private val continuation: Continuation):Continuation by continuation{ override fun resumeWith(result: Result) { Log.e("hello","MyContinuation start ${result.getOrThrow()}") continuation.resumeWith(result)

    Log.e("hello","MyContinuation end ")
}

} ```

其中的key是ContinuationInterceptor,協程內部會在每次協程恢復的時候,通過coroutineContext取出key為ContinuationInterceptor的攔截器,進行攔截呼叫,當然這也是kotlin協程內部實現,這裡簡單提一下。

實戰

kotlin協程api中的 async await

我們來看一下kotlon Coroutine 的高階api async await用法 ``` CoroutineScope(Dispatchers.Main).launch { val block = async(Dispatchers.IO) { // 阻塞的事項

}
// 處理其他主執行緒的事務

// 此時必須需要async的結果時,則可通過await()進行獲取
val result =  block.await()

} ``` 我們可以通過async方法,在其他執行緒中處理其他阻塞事務,當主執行緒必須要用async的結果的時候,就可以通過await等待,這裡如果結果返回了,則直接獲取值,否則就等待async執行完成。這是Coroutine提供給我們的高階api,能夠將任務簡單分層而不需要過多的回撥處理。

通過startCoroutine與ContinuationInterceptor實現自定義的 async await

我們可以參考其他語言的async,或者Dart的非同步方法呼叫,都有類似這種方式進行執行緒呼叫 async { val result = await { suspend 函式 } 消費result } await在async作用域裡面,同時獲取到result後再進行消費,async可以直接在普通函式呼叫,而不需要在協程體內,下面我們來實現一下這個做法。

首先我們想要限定await函式只能在async的作用域才能使用,那麼首先我們就要定義出來一個Receiver,我們可以在Receiver裡面定義出自己想要暴露的方法 ``` interface AsyncScope { fun myFunc(){

}

} fun async( context: CoroutineContext = EmptyCoroutineContext, block: suspend AsyncScope.() -> Unit ) { // 這個有兩個作用 1.充當receiver 2.completion,接收回調 val completion = AsyncStub(context) block.startCoroutine(completion, completion) }

注意這個類,resumeWith 只會跟startCoroutine的這個協程繫結關係,跟await的協程沒有關係 class AsyncStub(override val context: CoroutineContext = EmptyCoroutineContext) : Continuation, AsyncScope { override fun resumeWith(result: Result) {

    // 這個是幹嘛的 == > 完成的回撥
    Log.e("hello","AsyncStub resumeWith ${Thread.currentThread().id} ${result.getOrThrow()}")
}

} ``` 上面我們定義出來一個async函式,同時定義出來了一個AsyncStub的類,它有兩個用處,第一個是為了充當Receiver,用於規範後續的await函式只能在這個Receiver作用域中呼叫,第二個作用是startCoroutine函式必須要傳入一個引數completion,是為了收到當前協程結束的回撥resumeWith中可以得到當前協程體結束回撥的資訊

``` await方法裡面

suspend fun AsyncScope.await(block:() -> T) = suspendCoroutine { // 自定義的Receiver函式 myFunc()

Thread{
     切換執行緒執行await中的方法
    it.resumeWith(Result.success(block()))
}.start()

} ``` 在await中,其實是一個擴充套件函式,我們可以呼叫任何在AsyncScope中定義的方法,同時這裡我們模擬了一下執行緒切換的操作(Dispatcher的實現,這裡不採用Dispatcher就是想讓大家知道其實Dispatcher.IO也是這樣實現的),在子執行緒中呼叫it.resumeWith(Result.success(block())),用於返回所需要的資訊

通過上面定的方法,我們可以實現 async { val result = await { suspend 函式 } 消費result } 這種呼叫方式,但是這裡引來了一個問題,因為我們在await函式中實際將操作切換到了子執行緒,我們想要將消費result的動作切換至主執行緒怎麼辦呢?又或者是加入我們希望獲取結果前做一些調整怎麼辦呢?別急,我們這裡預留了一個CoroutineContext函式,我們可以在外部傳入一個CoroutineContext public interface ContinuationInterceptor : CoroutineContext.Element 而CoroutineContext.Element又是繼承於CoroutineContext CoroutineContext.Element:CoroutineContext 而我們的攔截器,正是CoroutineContext的子類,我們把上文的ClassInterceptor修改一下 ```

class ClassInterceptor() : ContinuationInterceptor { override val key = ContinuationInterceptor override fun interceptContinuation(continuation: Continuation): Continuation = MyContinuation(continuation)

}

class MyContinuation(private val continuation: Continuation) : Continuation by continuation { private val handler = Handler(Looper.getMainLooper()) override fun resumeWith(result: Result) { Log.e("hello", "MyContinuation start ${result.getOrThrow()}")

    handler.post {
        continuation.resumeWith(Result.success(自定義內容))
    }
    Log.e("hello", "MyContinuation end ")
}

} 同時把async預設引數CoroutineContext實現一下即可 fun async( context: CoroutineContext = ClassInterceptor(), block: suspend AsyncScope.() -> Unit ) { // 這個有兩個作用 1.充當receiver 2.completion,接收回調 val completion = AsyncStub(context) block.startCoroutine(completion, completion) } ```

此後我們就可以直接通過,完美實現了一個類js協程的呼叫,同時具備了自動切換執行緒的能力 async { val result = await { test() } Log.e("hello", "result is $result ${Looper.myLooper() == Looper.getMainLooper()}") } 結果 E start E MyContinuation start kotlin.Unit E MyContinuation end E end E 執行阻塞函式 test 1923 E MyContinuation start 自定義內容數值 E MyContinuation end E result is 自定義內容的數值 true E AsyncStub resumeWith 2 kotlin.Unit

最後,這裡需要注意的是,為什麼攔截器回調了兩次,因為我們async的時候開啟了一個協程,同時await的時候也開啟了一個,因此是兩個。AsyncStub只回調了一次,是因為AsyncStub被當作complete引數傳入了async開啟的協程block.startCoroutine,因此只是async中的協程結束才會被回撥。

image.png

本章程式碼

```

class ClassInterceptor() : ContinuationInterceptor { override val key = ContinuationInterceptor override fun interceptContinuation(continuation: Continuation): Continuation = MyContinuation(continuation)

}

class MyContinuation(private val continuation: Continuation) : Continuation by continuation { private val handler = Handler(Looper.getMainLooper()) override fun resumeWith(result: Result) { Log.e("hello", "MyContinuation start ${result.getOrThrow()}")

    handler.post {
        continuation.resumeWith(Result.success(6 as T))
    }
    Log.e("hello", "MyContinuation end ")
}

} ```

``` interface AsyncScope { fun myFunc(){

}

} fun async( context: CoroutineContext = ClassInterceptor(), block: suspend AsyncScope.() -> Unit ) { // 這個有兩個作用 1.充當receiver 2.completion,接收回調 val completion = AsyncStub(context) block.startCoroutine(completion, completion) }

class AsyncStub(override val context: CoroutineContext = EmptyCoroutineContext) : Continuation, AsyncScope { override fun resumeWith(result: Result) {

    // 這個是幹嘛的 == > 完成的回撥
    Log.e("hello","AsyncStub resumeWith ${Thread.currentThread().id} ${result.getOrThrow()}")
}

}

suspend fun AsyncScope.await(block:() -> T) = suspendCoroutine { myFunc()

Thread{
    it.resumeWith(Result.success(block()))
}.start()

} 模擬阻塞 fun test(): Int { Thread.sleep(5000) Log.e("hello", "執行阻塞函式 test ${Thread.currentThread().id}") return 5 } async { val result = await { test() } Log.e("hello", "result is $result ${Looper.myLooper() == Looper.getMainLooper()}") } ```

最後

我們通過協程的低階api,實現了一個與官方庫不同版本的async await,同時也希望通過對低階api的設計,也能對Coroutine官方庫的高階api的實現有一定的瞭解。

本文正在參加「金石計劃 . 瓜分6萬現金大獎」