Kotlin 協程 Select:看我如何多路複用

語言: CN / TW / HK

前言

協程通訊三劍客:Channel、Select、Flow,上篇已經分析了Channel的深水區,本篇將會重點分析Select的使用及原理。
通過本篇文章,你將瞭解到:

  1. Select 的引入
  2. Select 的使用
  3. Invoke函式 的妙用
  4. Select 的原理
  5. Select 注意事項

1. Select 的引入

多路資料的選擇

序列執行

如今的二維碼識別應用場景越來越廣了,早期應用比較廣泛的識別SDK如zxing、zbar,它們各有各的特點,也存在識別不出來的情況,為了將兩者優勢結合起來,我們想到的方法是同一份二維碼圖片分別給兩者進行識別。
如下:
```kotlin //從zxing 獲取二維碼資訊 suspend fun getQrcodeInfoFromZxing(bitmap: Bitmap?): String { //模擬耗時 delay(2000) return "I'm fish" }

//從zbar 獲取二維碼資訊
suspend fun getQrcodeInfoFromZbar(bitmap: Bitmap?): String {
    delay(1000)
    return "I'm fish"
}

fun testSelect() {
    runBlocking {
        var bitmap = null
        var starTime = System.currentTimeMillis()
        var qrcoe1 = getQrcodeInfoFromZxing(bitmap)
        var qrcode2 = getQrcodeInfoFromZbar(bitmap)
        println("qrcode1=$qrcoe1 qrcode2=$qrcode2 useTime:${System.currentTimeMillis() - starTime} ms")
    }
}

``` 檢視列印,最後花費的時間:

qrcode1=I'm fish qrcode2=I'm fish useTime:3013 ms

當然這是序列的方式效率比較低,我們想到了用協程來優化它。

協程並行執行

如下: ```kotlin fun testSelect1() { var bitmap = null; var starTime = System.currentTimeMillis() var deferredZxing = GlobalScope.async { getQrcodeInfoFromZxing(bitmap) }

    var deferredZbar = GlobalScope.async {
        getQrcodeInfoFromZbar(bitmap)
    }

    runBlocking {
        //掛起等待識別結果
        var qrcoe1 = deferredZxing.await()
        //掛起等待識別結果
        var qrcode2 = deferredZbar.await()
        println("qrcode1=$qrcoe1 qrcode2=$qrcode2 useTime:${System.currentTimeMillis() - starTime} ms")
    }
}

``` 檢視列印,最後花費的時間:

qrcode1=I'm fish qrcode2=I'm fish useTime:2084 ms

可以看出,花費時間明顯變少了。
與上個Demo 相比,雖然識別過程是放在協程裡並行執行的,但是在等待識別結果卻是序列的。我們引入兩個識別庫的初衷是哪個識別快就用哪個的結果,為了達成這個目的,傳統的方式是:

同時監聽並記錄識別結果的返回。

同時監聽多路結果

如下: ```kotlin fun testSelect2() { var bitmap = null; var starTime = System.currentTimeMillis() var deferredZxing = GlobalScope.async { getQrcodeInfoFromZxing(bitmap) }

    var deferredZbar = GlobalScope.async {
        getQrcodeInfoFromZbar(bitmap)
    }

    var isEnd = false
    var result: String? = null
    GlobalScope.launch {
        if (!isEnd) {
            //沒有結束,則繼續識別
            var resultTmp = deferredZxing.await()
            if (!isEnd) {
                //識別沒有結束,說明自己是第一個返回結果的
                result = resultTmp
                println("zxing recognize ok useTime:${System.currentTimeMillis() - starTime} ms")
                //標記識別結束
                isEnd = true
            }
        }
    }

    GlobalScope.launch {
        if (!isEnd) {
            var resultTmp = deferredZbar.await()
            if (!isEnd) {
                //識別沒有結束,說明自己是第一個返回結果的
                result = resultTmp
                println("zbar recognize ok useTime:${System.currentTimeMillis() - starTime} ms")
                isEnd = true
            }
        }
    }

    //檢測是否有結果返回
    runBlocking {
        while (!isEnd) {
            delay(1)
        }
        println("recognize result:$result")
    }
}

``` 通過檢測isEnd 標記來判斷是否有某個模組返回結果。
結果如下:

  • zbar recognize ok useTime:1070 ms
  • recognize result:I'm fish

由於模擬設定的zbar 解析速度快,因此每次都是採納的是zbar的結果,所花費的時間大幅減少了,該結果符合預期。

Select 閃亮登場

雖說上個Demo結果符合預期,但是多了很多額外的程式碼、多引入了其它協程,並且需要子模組對標記進行賦值(對"isEnd"進行賦值),沒有達到解耦的目的。我們希望子模組的任務是單一且閉環的,如果能在一個函式裡統一檢測結果的返回就好了。
Select 就是為了解決多路資料的選擇而生的。
來看看它是怎麼解決該問題的:
```kotlin fun testSelect3() { var bitmap = null; var starTime = System.currentTimeMillis() var deferredZxing = GlobalScope.async { getQrcodeInfoFromZxing(bitmap) } var deferredZbar = GlobalScope.async { getQrcodeInfoFromZbar(bitmap) } runBlocking { //通過select 監聽zxing、zbar 結果返回 var result = select { //監聽zxing deferredZxing.onAwait {value-> //value 為deferredZxing 識別的結果 "zxing result $value" }

            //監聽zbar
            deferredZbar.onAwait { value->
                "zbar result $value"
            }
        }

        //執行到此,說明已經有結果返回
        println("result from $result useTime:${System.currentTimeMillis() - starTime}")
    }
}

``` 結果如下:

result from zbar result I'm fish useTime:1079

符合預期,同時可以看出:相比上個Demo,這樣寫簡潔了許多。

2. Select 的使用

除了可以監聽async的結果,Select 還可以監聽Channel的傳送方/接收方 資料,我們以監聽接收方資料為例:
```kotlin fun testSelect4() { runBlocking { var bitmap = null; var starTime = System.currentTimeMillis() var receiveChannelZxing = produce { //生產資料 var result = getQrcodeInfoFromZxing(bitmap) //傳送資料 send(result) }

        var receiveChannelZbar = produce {
            var result = getQrcodeInfoFromZbar(bitmap)
            send(result)
        }

        var result = select<String> {
            //監聽是否有資料傳送過來
            receiveChannelZxing.onReceive {
                value->"zxing result $value"
            }

            receiveChannelZbar.onReceive {
                    value->"zbar result $value"
            }
        }

        println("result from $result useTime:${System.currentTimeMillis() - starTime}")
    }
}

``` 結果如下:

result from zbar result I'm fish useTime:1028

不論是async還是Channel,Select 都可以監聽它們的資料,從而形成多路複用的效果。

![image.png](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/b911afe577a040a6a58faeda45762372~tplv-k3u1fbpfcp-zoom-1.image)

在監聽協程裡呼叫select 表示式,表示式{}內宣告需要監聽的協程的資料,對於select 來說有兩種場景:

  1. 沒有資料,則select 掛起協程並等待直到其它協程資料準備完成後再次恢復select 所在的協程。
  2. 有資料,則select 正常執行並返回獲取的資料。

3. Invoke函式 的妙用

在分析Select 原理之前,需要弄明白invoke函式的原理。
對於Kotlin 類來說,都可以重寫其invoke函式。
kotlin operator fun invoke():String { return "I'm fish" } 如上,重寫了SelectDemo裡的invoke函式,和普通成員函式一樣,我們可以通過物件呼叫它。 kotlin fun main(args: Array<String>) { var selectDemo = SelectDemo() var result = selectDemo.invoke() println("result:$result") } 當然,可以進一步簡化: kotlin fun main(args: Array<String>) { var selectDemo = SelectDemo() var result = selectDemo() println("result:$result") } 這裡涉及到了kotlin的語法糖:物件居然可以像函式一樣呼叫。
作為函式,invoke 當然也可以接收高階函式作為引數:
```kotlin operator fun invoke(block: (Int) -> String): String { return block(3) }

fun main(args: Array) { var selectDemo = SelectDemo() var result = selectDemo { age -> when (age) { 3 -> "I'm fish3" 4 -> "I'm fish4" else -> "error" } } println("result:$result") } ``` 因此,當看到物件作為函式呼叫時,實際上呼叫的是invoke函式,具體的邏輯需要檢視其invoke函式的實現。

4. Select 的原理

上篇分析過Channel,因此本篇趁熱打鐵,通過Select 監聽Channel資料的變化來分析其原理,為方便講解,我們先以監聽一個Channel的為例。
先從select 表示式本身入手。
```kotlin fun testSelect5() { runBlocking { var starTime = System.currentTimeMillis() var receiveChannelZxing = produce { //傳送資料 send("I'm fish") }

        //確保channel 資料已經send
        delay(1000)
        var result = select<String> {
            //監聽是否有資料傳送過來
            receiveChannelZxing.onReceive { value ->
                "zxing result $value"
            }
        }
        println("result from $result useTime:${System.currentTimeMillis() - starTime}")
    }
}

select 是掛起函式,因此協程執行到此有可能被掛起。

Select.kt

public suspend inline fun select(crossinline builder: SelectBuilder.() -> Unit): R { //... return suspendCoroutineUninterceptedOrReturn { uCont -> //傳入父協程體 val scope = SelectBuilderImpl(uCont) try { //執行builder builder(scope) } catch (e: Throwable) { scope.handleBuilderException(e) } //通過返回值判斷是否需要掛起協程 scope.getResult() } } ``` 重點看builder(scope),builder 是高階函式,實際上就是執行了select花括號裡的內容,而它裡面就是監聽資料是否返回。

receiveChannelZxing.onReceive
剛開始看的時候勢必以為onReceive是個函式,然而它是ReceiveChannel 裡的成員變數:
```kotlin

Channel.kt

public val onReceive: SelectClause1<E>

通過上一節的分析可知,關鍵是要找到SelectClause1 的invoke的實現。kotlin

Select.kt

public interface SelectBuilder { //block 有個入參 //聲明瞭SelectClause1的擴充套件函式invoke public operator fun SelectClause1.invoke(block: suspend (Q) -> R) }

override fun SelectClause1.invoke(block: suspend (Q) -> R) { //SelectBuilderImpl 實現了 SelectClause1 的invoke函式 registerSelectClause1([email protected], block) } 再看onReceive 的賦值:kotlin

AbstractChannel.kt

final override val onReceive: SelectClause1 get() = object : SelectClause1 { @Suppress("UNCHECKED_CAST") override fun registerSelectClause1(select: SelectInstance, block: suspend (E) -> R) { registerSelectReceiveMode(select, RECEIVE_THROWS_ON_CLOSE, block as suspend (Any?) -> R) } } ``` 因此,簡單總結呼叫棧如下:

當呼叫receiveChannelZxing.onReceive{},實際上呼叫了SelectClause1.invoke(),而它裡面又呼叫了SelectClause1.registerSelectClause1(),最終呼叫了AbstractChannel.registerSelectReceiveMode。

AbstractChannel. registerSelectReceiveMode
```kotlin

AbstractChannel.kt

private fun registerSelectReceiveMode(select: SelectInstance, receiveMode: Int, block: suspend (Any?) -> R) { while (true) { //如果已經有結果了,則直接返回------->① if (select.isSelected) return if (isEmptyImpl) { //沒有傳送者在等待,則入隊等待,並返回 ------->② if (enqueueReceiveSelect(select, block, receiveMode)) return } else { //直接取出值------->③ val pollResult = pollSelectInternal(select) when { pollResult === ALREADY_SELECTED -> return pollResult === POLL_FAILED -> {} // retry pollResult === RETRY_ATOMIC -> {} // retry //呼叫block------->④ else -> block.tryStartBlockUnintercepted(select, receiveMode, pollResult) } } } } ``` 分為4個點,接著來一一分析。

select 同時監聽多個值,若是有1個符合要求的資料返回了,那麼該isSelected 標記為true,當檢測到該標記為true時直接退出。
結合之前的Demo,zbar 已經識別出結果了,當select 檢測zxing的結果時直接返回。


```kotlin

AbstractChannel.kt

private fun enqueueReceiveSelect( select: SelectInstance, block: suspend (Any?) -> R, receiveMode: Int ): Boolean { //構造為Node元素 val node = AbstractChannel.ReceiveSelect(this, select, block, receiveMode) //新增到Channel佇列裡 val result = enqueueReceive(node) if (result) select.disposeOnSelect(node) return result } 當select 時,發現Channel裡沒有資料,說明Channel還沒有開始send,因此構造了Node(ReceiveSelect)加入到Channel queue裡。當send資料時,會查詢queue裡是否有接收者等待,若有則呼叫Node(ReceiveSelect.completeResumeReceive):kotlin

AbstractChannel.kt

    override fun completeResumeReceive(value: E) {
        block.startCoroutineCancellable(
            if (receiveMode == RECEIVE_RESULT) ChannelResult.success(value) else value,
            select.completion,
            resumeOnCancellationFun(value)
        )
    }

``` block 被排程執行,最後會恢復select 協程的執行。


取出資料,並嘗試恢復send協程。


在③的基礎上,拿到資料後,直接執行block(此時並沒有切換執行緒進行排程)。

小結一下select 原理:

![image.png](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/50cdd29ecc2b463189832ba92745734d~tplv-k3u1fbpfcp-zoom-1.image)

可以看出:

select 本身執行並不耗時,若最終沒有資料返回則掛起等待,若是有資料返回則不會掛起協程。

我們從頭再捋一下select 配合Channel 的原理:

![image.png](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/030ceaf746034422b3151d6ed45168de~tplv-k3u1fbpfcp-zoom-1.image)

雖然以Channel為例講解了select 原理,實際上async等結合select 原理大致差不多,重點都是利用了協程的掛起/恢復做文章。

5. Select 注意事項

如果select有多個數據同時到達,select 預設會選擇第一個資料,若想要隨機選擇資料,可做如下處理:
kotlin var result = selectUnbiased<String> { //監聽是否有資料傳送過來 receiveChannelZxing.onReceive { value -> "zxing result $value" } }

想要知道select 還可以監聽哪些資料,可檢視該資料是否實現了SelectClauseX(X 表示0、1、2)。

以上即為Select 的原理及其使用,下篇將會進入協程的精華部分:Flow的運用,該部分內容較多,可能會分幾篇分析,敬請期待。

本文基於Kotlin 1.5.3,文中完整Demo請點選

您若喜歡,請點贊、關注、收藏,您的鼓勵是我前進的動力

持續更新中,和我一起步步為營系統、深入學習Android/Kotlin