【Kotlin回顧】23.Kotlin協程—Channel流程

語言: CN / TW / HK

theme: cyanosis


開啟掘金成長之旅!這是我參與「掘金日新計劃 · 12 月更文挑戰」的第30天,點選檢視活動詳情

在分析Kotlin協程—協程的併發中用Actor也實現了併發安全,而Actor是本質是基於Channel管訊息實現的,所以Channel是一個執行緒安全的資料管道,根據Channel的這個安全特性它最常見的用法是建立CSP通訊模型。

CSP通訊模型(Communicating Sequential Processes),是一種形式化語言,用於描述併發系統中的互動模式。CSP 允許根據獨立執行的元件程序來描述系統,並且僅通過訊息傳遞通訊相互互動。

1.為什麼可以通過Channel實現CSP通訊模型?

分析Channel實現CSP通訊模型首先要從Channel的資料結構入手,先來看一段demo

``` fun main() = runBlocking { val scope = CoroutineScope(Job()) // 1,建立管道 val channel = Channel()

scope.launch {
    // 2,在一個單獨的協程當中傳送管道訊息
    repeat(3) {
        channel.send(it)
        println("send: $it")
    }

    channel.close()
}

scope.launch {
    // 3,在一個單獨的協程當中接收管道訊息
    repeat(3) {
        val result = channel.receive()
        println("result $result")
    }
}

println("end")
Thread.sleep(2000000L)

}

//輸出結果: //end //result 0 //send: 0 //result 1 //send: 1 //result 2 //send: 2 ```

程式碼分為三個部分,建立Channel、通過channel傳送資料,通過channel接收資料。

2.Channel的資料結構

在註釋1這裡建立了一個Channel。Channel是一個介面,同時它還實現了SendChannelReceiveChannel,但是註釋1中建立時呼叫的Channel其實是一個普通的頂層函式,它的作用是一個建構函式,所以首字母是大寫的,在前面的Kotlin協程—CoroutineScope是如何管理協程的也是提到過的。

``` public interface Channel : SendChannel, ReceiveChannel { ...

public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null

): Channel = when (capacity) { RENDEZVOUS -> { if (onBufferOverflow == BufferOverflow.SUSPEND) RendezvousChannel(onUndeliveredElement) // 一種有效的交會通道的實現 else ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // 通過緩衝通道支援緩衝區溢位 } CONFLATED -> { require(onBufferOverflow == BufferOverflow.SUSPEND) { "CONFLATED capacity cannot be used with non-default onBufferOverflow" } ConflatedChannel(onUndeliveredElement) } UNLIMITED -> LinkedListChannel(onUndeliveredElement) // 忽略onBufferOverflow:它有緩衝區,但它永遠不會溢位 BUFFERED -> ArrayChannel( // 使用預設容量與SUSPEND if (onBufferOverflow == BufferOverflow.SUSPEND) CHANNEL_DEFAULT_CAPACITY else 1, onBufferOverflow, onUndeliveredElement ) else -> { if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST) ConflatedChannel(onUndeliveredElement) // 合併實現效率更高,但工作方式似乎相同 else ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement) } } } ```

通過Channel函式中可以發現它的核心邏輯就是一個when表示式,根據傳入的引數建立不同的Channel例項:RendezvousChannelArrayChannelConflatedChannelLinkedListChannel,並且這幾個Channel的父類都是AbstractChannelAbstractChannel的父類又是AbstractSendChannel

``` /* * 它是所有傳送通道實現的基類。 / internal abstract class AbstractSendChannel( @JvmField protected val onUndeliveredElement: OnUndeliveredElement? ) : SendChannel {

protected val queue = LockFreeLinkedListHead()

...

//抽象的傳送/接收通道。它是所有通道實現的基類。
internal abstract class AbstractChannel<E>(
onUndeliveredElement: OnUndeliveredElement<E>?

) : AbstractSendChannel(onUndeliveredElement), Channel {

}

} ```

AbstractSendChannel類中還有一個變數val queue = LockFreeLinkedListHead(),Channel的核心邏輯就是依靠它實現的。

``` public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() { public actual val isEmpty: Boolean get() = next === this }

public actual open class LockFreeLinkedListNode { private val _next = atomic(this) // Node | Removed | OpDescriptor private val _prev = atomic(this) // Node to the left (cannot be marked as removed) private val _removedRef = atomic(null) // lazily cached removed ref to this } ```

LockFreeLinkedListHead 其實繼承自 LockFreeLinkedListNode,而 LockFreeLinkedListNode 則是實現 Channel 核心功能的關鍵資料結構。

看到LinkedList基本可以確定這是一個連結串列,而LockFreeLinkedListHead是一個迴圈雙向連結串列,LockFreeLinkedListHead是一個哨兵節點。這個節點本身不會用於儲存任何資料,它的 next 指標會指向整個連結串列的頭節點,而它的 prev 指標會指向整個連結串列的尾節點。

哨兵節點是為了簡化處理連結串列邊界條件而引入的附加連結串列節點,哨兵節點通常位於連結串列頭部它的值沒有任何意義,在一個有哨兵節點的連結串列中,從第二個節點開始才真正儲存有意義的資訊。

這裡先用兩張圖來分別標識空連結串列和有元素的連結串列的結構

連結串列為空的時候LockFreeLinkedListHead的next指標和prev指標都指向自身的,這就意味著這個節點是不會儲存資料的,也是不會被刪除的。

連結串列中有2個或兩個以上的元素時LockFreeLinkedListHead的next指標才是第一個節點,prev則指向尾結點。

尋常的迴圈雙向連結串列是可以在首尾新增元素的,同時也支援“正向遍歷、逆向遍歷”的。而Channel內部的資料結構只能在末尾新增,遍歷順序則是從隊頭開始,這樣的設計就讓他的行為變成了先進先出單向佇列的同時還實現了隊尾新增操作,只需要O(1)的時間複雜度。

可以說,正是因為 LockFreeLinkedList 這個資料結構,才能使用 Channel 實現 CSP 通訊模型。

3.傳送和接收的流程

前面的demo中啟動了兩個協程,在這兩個協程中傳送了三次資料和接收了三次資料,程式首先會執行send(),但是由於Channel預設情況下容量是0,所以send()首先會被掛起。

``` public final override suspend fun send(element: E) { //1 if (offerInternal(element) === OFFER_SUCCESS) return //2 return sendSuspend(element) }

/* * 嘗試將元素新增到緩衝區或佇列接收方 / protected open fun offerInternal(element: E): Any { while (true) { //3 val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED val token = receive.tryResumeReceive(element, null) if (token != null) { assert { token === RESUME_TOKEN } receive.completeResumeReceive(element) return receive.offerResult } } }

private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable [email protected] { cont -> [email protected] while (true) { if (isFullImpl) { //4 val send = if (onUndeliveredElement == null) SendElement(element, cont) else SendElementWithUndeliveredHandler(element, cont, onUndeliveredElement) val enqueueResult = enqueueSend(send) when { enqueueResult == null -> { // enqueued successfully //5 cont.removeOnCancellation(send) [email protected] } enqueueResult is Closed<> -> { cont.helpCloseAndResumeWithSendException(element, enqueueResult) [email protected] } enqueueResult === ENQUEUE_FAILED -> {} // try to offer instead enqueueResult is Receive<> -> {} // try to offer instead else -> error("enqueueSend returned $enqueueResult") } } // hm... receiver is waiting or buffer is not full. try to offer val offerResult = offerInternal(element) when { offerResult === OFFER_SUCCESS -> { cont.resume(Unit) [email protected] } offerResult === OFFER_FAILED -> [email protected] offerResult is Closed<*> -> { cont.helpCloseAndResumeWithSendException(element, offerResult) [email protected] } else -> error("offerInternal returned $offerResult") } } } ```

  • 註釋1:嘗試向Channel傳送資料,在有消費這的情況下if判斷就是true,那麼此時就會return,但是如果是第一次呼叫Channel就不會有消費者,所以offerInternal函式中想要從佇列中取出消費者是不可能的(註釋3),所以第一次呼叫就進入了sendSuspend函式。
  • 註釋2:這個掛起函式是由高階函式suspendCancellableCoroutineReusable實現的,這個高階函式的作用就是暴露掛起函式的Continuation物件。
  • 註釋4:這裡的作用就是把傳送的元素封裝成SendElement物件,然後呼叫enqueueSend()方法將其新增到LockFreeLinkedList佇列的末尾,新增成功則進入註釋5開始執行。
  • 註釋5:這裡就是將SendElement從佇列中刪除。

到這裡應該會有一個疑問:send()函式一直是被掛起的啊,它會在什麼時候恢復呢?

答案就是接收資料的時候:receive()

``` public final override suspend fun receive(): E { // 1 val result = pollInternal()

@Suppress("UNCHECKED_CAST")
if (result !== POLL_FAILED && result !is Closed<*>) return result as E
// 2
return receiveSuspend(RECEIVE_THROWS_ON_CLOSE)

}

protected open fun pollInternal(): Any? { while (true) { //3 val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED val token = send.tryResumeSend(null) if (token != null) { assert { token === RESUME_TOKEN } //4 send.completeResumeSend() return send.pollResult }

    send.undeliveredElement()
}

}

private suspend fun receiveSuspend(receiveMode: Int): R = suspendCancellableCoroutineReusable [email protected] { cont ->

val receive = if (onUndeliveredElement == null)
    ReceiveElement(cont as CancellableContinuation<Any?>, receiveMode) else
    ReceiveElementWithUndeliveredHandler(cont as CancellableContinuation<Any?>, receiveMode, onUndeliveredElement)
while (true) {
    if (enqueueReceive(receive)) {
        removeReceiveOnCancel(cont, receive)
        [email protected]
    }
    // hm... something is not right. try to poll
    val result = pollInternal()
    if (result is Closed<*>) {
        receive.resumeReceiveClosed(result)
        [email protected]
    }
    if (result !== POLL_FAILED) {
        cont.resume(receive.resumeValue(result as E), receive.resumeOnCancellationFun(result as E))
        [email protected]
    }
}

} ```

  • 註釋1:嘗試從 LockFreeLinkedList 隊列當中找出是否有正在被掛起的傳送方;
  • 註釋2:當 LockFreeLinkedList 隊列當中沒有正在掛起的傳送方時,它會執行 receiveSuspend(),而 receiveSuspend() 也同樣會被掛起,這個邏輯跟前面的sendSuspend()是類似的,先封裝成receiveElement物件並新增佇列的末尾,如果新增成功的話這個receiveSuspend()就會繼續被掛起,這也就意味著receive()也會被掛起,而它的恢復時機則是在offerInternal()中;
  • 註釋3:在send()流程中這一步是將傳送的元素封裝成SendElement物件,那麼在receive()中這一步是取出封裝好的SendElement物件,然後呼叫註釋4;
  • 註釋4:這裡其實呼叫的是

override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN)

這裡的cont其實是continuation,前面聊過,它是一個狀態機,cont.completeResume(RESUME_TOKEN)會回撥CancellableContinuationImpl中的completeResume

``` override fun completeResume(token: Any) { assert { token === RESUME_TOKEN } dispatchResume(resumeMode) }

private fun dispatchResume(mode: Int) { if (tryResume()) return dispatch(mode) }

//DispatchedTask internal fun DispatchedTask.dispatch(mode: Int) { assert { mode != MODE_UNINITIALIZED } // invalid mode value for this method val delegate = this.delegate val undispatched = mode == MODE_UNDISPATCHED if (!undispatched && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) { // dispatch directly using this instance's Runnable implementation val dispatcher = delegate.dispatcher val context = delegate.context if (dispatcher.isDispatchNeeded(context)) { dispatcher.dispatch(context, this) } else { resumeUnconfined() } } else { // delegate is coming from 3rd-party interceptor implementation (and does not support cancellation) // or undispatched mode was requested resume(delegate, undispatched) } } ```

最終呼叫dispatch(mode),而 dispatch(mode) 其實就是 DispatchedTask 的 dispatch(),而這個dispatch()就是協程體當中的程式碼線上程執行的時機,最終,它會執行在 Java 的 Executor 之上。至此,之前被掛起的 send() 方法,就算是恢復了。

4.總結:

  • Channel是一個執行緒安全的管道,常見用法是實現CSP通訊模型,它之所以可以用這種方式來實現CSP通訊模型主要還是因為底層的資料結構是LockFreeLinkedList
  • LockFreeLinkedList是一個雙向迴圈的連結串列但是在Channel原始碼中會被當做先進先出的單向佇列,只是在佇列末尾插入節點,且只會正向遍歷;
  • Channel的傳送分為兩種情況,一種是當前佇列中已經存在傳送方,這時候send()會恢復 Receive 節點的執行,並將資料傳送給對方;另一種情況是當前佇列中不存在傳送方,就是首次傳送,則會掛起並將元素封裝成SendElement後新增到LockFreeLinkedList佇列的末尾,等待被receive()恢復執行;
  • Channel的接收分為兩種情況,一種是當前佇列中已經存在被掛起的傳送方,這時候receive()會恢復Send節點的執行,並且取出Send節點帶過來的資料 ; 另一種情況是當前佇列中不存在被掛起的傳送方,那麼這時候receive()就會被掛起,同時會封裝元素為ReceiveElement並新增到佇列的末尾,等待被下一個send()恢復執行。