【Kotlin回顧】23.Kotlin協程—Channel流程

語言: CN / TW / HK

theme: cyanosis


開啟掘金成長之旅!這是我參與「掘金日新計劃 · 12 月更文挑戰」的第30天,點擊查看活動詳情

在分析Kotlin協程—協程的併發中用Actor也實現了併發安全,而Actor是本質是基於Channel管消息實現的,所以Channel是一個線程安全的數據管道,根據Channel的這個安全特性它最常見的用法是建立CSP通信模型。

CSP通信模型(Communicating Sequential Processes),是一種形式化語言,用於描述併發系統中的交互模式。CSP 允許根據獨立運行的組件進程來描述系統,並且僅通過消息傳遞通信相互交互。

1.為什麼可以通過Channel實現CSP通信模型?

分析Channel實現CSP通信模型首先要從Channel的數據結構入手,先來看一段demo

``` fun main() = runBlocking { val scope = CoroutineScope(Job()) // 1,創建管道 val channel = Channel()

scope.launch {
    // 2,在一個單獨的協程當中發送管道消息
    repeat(3) {
        channel.send(it)
        println("send: $it")
    }

    channel.close()
}

scope.launch {
    // 3,在一個單獨的協程當中接收管道消息
    repeat(3) {
        val result = channel.receive()
        println("result $result")
    }
}

println("end")
Thread.sleep(2000000L)

}

//輸出結果: //end //result 0 //send: 0 //result 1 //send: 1 //result 2 //send: 2 ```

代碼分為三個部分,創建Channel、通過channel發送數據,通過channel接收數據。

2.Channel的數據結構

在註釋1這裏創建了一個Channel。Channel是一個接口,同時它還實現了SendChannelReceiveChannel,但是註釋1中創建時調用的Channel其實是一個普通的頂層函數,它的作用是一個構造函數,所以首字母是大寫的,在前面的Kotlin協程—CoroutineScope是如何管理協程的也是提到過的。

``` public interface Channel : SendChannel, ReceiveChannel { ...

public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null

): Channel = when (capacity) { RENDEZVOUS -> { if (onBufferOverflow == BufferOverflow.SUSPEND) RendezvousChannel(onUndeliveredElement) // 一種有效的交會通道的實現 else ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // 通過緩衝通道支持緩衝區溢出 } CONFLATED -> { require(onBufferOverflow == BufferOverflow.SUSPEND) { "CONFLATED capacity cannot be used with non-default onBufferOverflow" } ConflatedChannel(onUndeliveredElement) } UNLIMITED -> LinkedListChannel(onUndeliveredElement) // 忽略onBufferOverflow:它有緩衝區,但它永遠不會溢出 BUFFERED -> ArrayChannel( // 使用默認容量與SUSPEND if (onBufferOverflow == BufferOverflow.SUSPEND) CHANNEL_DEFAULT_CAPACITY else 1, onBufferOverflow, onUndeliveredElement ) else -> { if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST) ConflatedChannel(onUndeliveredElement) // 合併實現效率更高,但工作方式似乎相同 else ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement) } } } ```

通過Channel函數中可以發現它的核心邏輯就是一個when表達式,根據傳入的參數創建不同的Channel實例:RendezvousChannelArrayChannelConflatedChannelLinkedListChannel,並且這幾個Channel的父類都是AbstractChannelAbstractChannel的父類又是AbstractSendChannel

``` /* * 它是所有發送通道實現的基類。 / internal abstract class AbstractSendChannel( @JvmField protected val onUndeliveredElement: OnUndeliveredElement? ) : SendChannel {

protected val queue = LockFreeLinkedListHead()

...

//抽象的發送/接收通道。它是所有通道實現的基類。
internal abstract class AbstractChannel<E>(
onUndeliveredElement: OnUndeliveredElement<E>?

) : AbstractSendChannel(onUndeliveredElement), Channel {

}

} ```

AbstractSendChannel類中還有一個變量val queue = LockFreeLinkedListHead(),Channel的核心邏輯就是依靠它實現的。

``` public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() { public actual val isEmpty: Boolean get() = next === this }

public actual open class LockFreeLinkedListNode { private val _next = atomic(this) // Node | Removed | OpDescriptor private val _prev = atomic(this) // Node to the left (cannot be marked as removed) private val _removedRef = atomic(null) // lazily cached removed ref to this } ```

LockFreeLinkedListHead 其實繼承自 LockFreeLinkedListNode,而 LockFreeLinkedListNode 則是實現 Channel 核心功能的關鍵數據結構。

看到LinkedList基本可以確定這是一個鏈表,而LockFreeLinkedListHead是一個循環雙向鏈表,LockFreeLinkedListHead是一個哨兵節點。這個節點本身不會用於存儲任何數據,它的 next 指針會指向整個鏈表的頭節點,而它的 prev 指針會指向整個鏈表的尾節點。

哨兵節點是為了簡化處理鏈表邊界條件而引入的附加鏈表節點,哨兵節點通常位於鏈表頭部它的值沒有任何意義,在一個有哨兵節點的鏈表中,從第二個節點開始才真正保存有意義的信息。

這裏先用兩張圖來分別標識空鏈表和有元素的鏈表的結構

鏈表為空的時候LockFreeLinkedListHead的next指針和prev指針都指向自身的,這就意味着這個節點是不會存儲數據的,也是不會被刪除的。

鏈表中有2個或兩個以上的元素時LockFreeLinkedListHead的next指針才是第一個節點,prev則指向尾結點。

尋常的循環雙向鏈表是可以在首尾添加元素的,同時也支持“正向遍歷、逆向遍歷”的。而Channel內部的數據結構只能在末尾添加,遍歷順序則是從隊頭開始,這樣的設計就讓他的行為變成了先進先出單向隊列的同時還實現了隊尾添加操作,只需要O(1)的時間複雜度。

可以説,正是因為 LockFreeLinkedList 這個數據結構,才能使用 Channel 實現 CSP 通信模型。

3.發送和接收的流程

前面的demo中啟動了兩個協程,在這兩個協程中發送了三次數據和接收了三次數據,程序首先會執行send(),但是由於Channel默認情況下容量是0,所以send()首先會被掛起。

``` public final override suspend fun send(element: E) { //1 if (offerInternal(element) === OFFER_SUCCESS) return //2 return sendSuspend(element) }

/* * 嘗試將元素添加到緩衝區或隊列接收方 / protected open fun offerInternal(element: E): Any { while (true) { //3 val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED val token = receive.tryResumeReceive(element, null) if (token != null) { assert { token === RESUME_TOKEN } receive.completeResumeReceive(element) return receive.offerResult } } }

private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable [email protected] { cont -> [email protected] while (true) { if (isFullImpl) { //4 val send = if (onUndeliveredElement == null) SendElement(element, cont) else SendElementWithUndeliveredHandler(element, cont, onUndeliveredElement) val enqueueResult = enqueueSend(send) when { enqueueResult == null -> { // enqueued successfully //5 cont.removeOnCancellation(send) [email protected] } enqueueResult is Closed<> -> { cont.helpCloseAndResumeWithSendException(element, enqueueResult) [email protected] } enqueueResult === ENQUEUE_FAILED -> {} // try to offer instead enqueueResult is Receive<> -> {} // try to offer instead else -> error("enqueueSend returned $enqueueResult") } } // hm... receiver is waiting or buffer is not full. try to offer val offerResult = offerInternal(element) when { offerResult === OFFER_SUCCESS -> { cont.resume(Unit) [email protected] } offerResult === OFFER_FAILED -> [email protected] offerResult is Closed<*> -> { cont.helpCloseAndResumeWithSendException(element, offerResult) [email protected] } else -> error("offerInternal returned $offerResult") } } } ```

  • 註釋1:嘗試向Channel發送數據,在有消費這的情況下if判斷就是true,那麼此時就會return,但是如果是第一次調用Channel就不會有消費者,所以offerInternal函數中想要從隊列中取出消費者是不可能的(註釋3),所以第一次調用就進入了sendSuspend函數。
  • 註釋2:這個掛起函數是由高階函數suspendCancellableCoroutineReusable實現的,這個高階函數的作用就是暴露掛起函數的Continuation對象。
  • 註釋4:這裏的作用就是把發送的元素封裝成SendElement對象,然後調用enqueueSend()方法將其添加到LockFreeLinkedList隊列的末尾,添加成功則進入註釋5開始執行。
  • 註釋5:這裏就是將SendElement從隊列中刪除。

到這裏應該會有一個疑問:send()函數一直是被掛起的啊,它會在什麼時候恢復呢?

答案就是接收數據的時候:receive()

``` public final override suspend fun receive(): E { // 1 val result = pollInternal()

@Suppress("UNCHECKED_CAST")
if (result !== POLL_FAILED && result !is Closed<*>) return result as E
// 2
return receiveSuspend(RECEIVE_THROWS_ON_CLOSE)

}

protected open fun pollInternal(): Any? { while (true) { //3 val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED val token = send.tryResumeSend(null) if (token != null) { assert { token === RESUME_TOKEN } //4 send.completeResumeSend() return send.pollResult }

    send.undeliveredElement()
}

}

private suspend fun receiveSuspend(receiveMode: Int): R = suspendCancellableCoroutineReusable [email protected] { cont ->

val receive = if (onUndeliveredElement == null)
    ReceiveElement(cont as CancellableContinuation<Any?>, receiveMode) else
    ReceiveElementWithUndeliveredHandler(cont as CancellableContinuation<Any?>, receiveMode, onUndeliveredElement)
while (true) {
    if (enqueueReceive(receive)) {
        removeReceiveOnCancel(cont, receive)
        [email protected]
    }
    // hm... something is not right. try to poll
    val result = pollInternal()
    if (result is Closed<*>) {
        receive.resumeReceiveClosed(result)
        [email protected]
    }
    if (result !== POLL_FAILED) {
        cont.resume(receive.resumeValue(result as E), receive.resumeOnCancellationFun(result as E))
        [email protected]
    }
}

} ```

  • 註釋1:嘗試從 LockFreeLinkedList 隊列當中找出是否有正在被掛起的發送方;
  • 註釋2:當 LockFreeLinkedList 隊列當中沒有正在掛起的發送方時,它會執行 receiveSuspend(),而 receiveSuspend() 也同樣會被掛起,這個邏輯跟前面的sendSuspend()是類似的,先封裝成receiveElement對象並添加隊列的末尾,如果添加成功的話這個receiveSuspend()就會繼續被掛起,這也就意味着receive()也會被掛起,而它的恢復時機則是在offerInternal()中;
  • 註釋3:在send()流程中這一步是將發送的元素封裝成SendElement對象,那麼在receive()中這一步是取出封裝好的SendElement對象,然後調用註釋4;
  • 註釋4:這裏其實調用的是

override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN)

這裏的cont其實是continuation,前面聊過,它是一個狀態機,cont.completeResume(RESUME_TOKEN)會回調CancellableContinuationImpl中的completeResume

``` override fun completeResume(token: Any) { assert { token === RESUME_TOKEN } dispatchResume(resumeMode) }

private fun dispatchResume(mode: Int) { if (tryResume()) return dispatch(mode) }

//DispatchedTask internal fun DispatchedTask.dispatch(mode: Int) { assert { mode != MODE_UNINITIALIZED } // invalid mode value for this method val delegate = this.delegate val undispatched = mode == MODE_UNDISPATCHED if (!undispatched && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) { // dispatch directly using this instance's Runnable implementation val dispatcher = delegate.dispatcher val context = delegate.context if (dispatcher.isDispatchNeeded(context)) { dispatcher.dispatch(context, this) } else { resumeUnconfined() } } else { // delegate is coming from 3rd-party interceptor implementation (and does not support cancellation) // or undispatched mode was requested resume(delegate, undispatched) } } ```

最終調用dispatch(mode),而 dispatch(mode) 其實就是 DispatchedTask 的 dispatch(),而這個dispatch()就是協程體當中的代碼在線程執行的時機,最終,它會執行在 Java 的 Executor 之上。至此,之前被掛起的 send() 方法,就算是恢復了。

4.總結:

  • Channel是一個線程安全的管道,常見用法是實現CSP通信模型,它之所以可以用這種方式來實現CSP通信模型主要還是因為底層的數據結構是LockFreeLinkedList
  • LockFreeLinkedList是一個雙向循環的鏈表但是在Channel源碼中會被當做先進先出的單向隊列,只是在隊列末尾插入節點,且只會正向遍歷;
  • Channel的發送分為兩種情況,一種是當前隊列中已經存在發送方,這時候send()會恢復 Receive 節點的執行,並將數據發送給對方;另一種情況是當前隊列中不存在發送方,就是首次發送,則會掛起並將元素封裝成SendElement後添加到LockFreeLinkedList隊列的末尾,等待被receive()恢復執行;
  • Channel的接收分為兩種情況,一種是當前隊列中已經存在被掛起的發送方,這時候receive()會恢復Send節點的執行,並且取出Send節點帶過來的數據 ; 另一種情況是當前隊列中不存在被掛起的發送方,那麼這時候receive()就會被掛起,同時會封裝元素為ReceiveElement並添加到隊列的末尾,等待被下一個send()恢復執行。