CSP通信模型(Communicating Sequential Processes),是一种形式化语言,用于描述并发系统中的交互模式。CSP 允许根据独立运行的组件进程来描述系统,并且仅通过消息传递通信相互交互。



``` fun main() = runBlocking { val scope = CoroutineScope(Job()) // 1,创建管道 val channel = Channel()

scope.launch {
    // 2,在一个单独的协程当中发送管道消息
    repeat(3) {
        println("send: $it")


scope.launch {
    // 3,在一个单独的协程当中接收管道消息
    repeat(3) {
        val result = channel.receive()
        println("result $result")



//输出结果: //end //result 0 //send: 0 //result 1 //send: 1 //result 2 //send: 2 ```




``` public interface Channel : SendChannel, ReceiveChannel { ...

public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null

): Channel = when (capacity) { RENDEZVOUS -> { if (onBufferOverflow == BufferOverflow.SUSPEND) RendezvousChannel(onUndeliveredElement) // 一种有效的交会通道的实现 else ArrayChannel(1, onBufferOverflow, onUndeliveredElement) // 通过缓冲通道支持缓冲区溢出 } CONFLATED -> { require(onBufferOverflow == BufferOverflow.SUSPEND) { "CONFLATED capacity cannot be used with non-default onBufferOverflow" } ConflatedChannel(onUndeliveredElement) } UNLIMITED -> LinkedListChannel(onUndeliveredElement) // 忽略onBufferOverflow:它有缓冲区,但它永远不会溢出 BUFFERED -> ArrayChannel( // 使用默认容量与SUSPEND if (onBufferOverflow == BufferOverflow.SUSPEND) CHANNEL_DEFAULT_CAPACITY else 1, onBufferOverflow, onUndeliveredElement ) else -> { if (capacity == 1 && onBufferOverflow == BufferOverflow.DROP_OLDEST) ConflatedChannel(onUndeliveredElement) // 合并实现效率更高,但工作方式似乎相同 else ArrayChannel(capacity, onBufferOverflow, onUndeliveredElement) } } } ```


``` /* * 它是所有发送通道实现的基类。 / internal abstract class AbstractSendChannel( @JvmField protected val onUndeliveredElement: OnUndeliveredElement? ) : SendChannel {

protected val queue = LockFreeLinkedListHead()


internal abstract class AbstractChannel<E>(
onUndeliveredElement: OnUndeliveredElement<E>?

) : AbstractSendChannel(onUndeliveredElement), Channel {


} ```

AbstractSendChannel类中还有一个变量val queue = LockFreeLinkedListHead(),Channel的核心逻辑就是依靠它实现的。

``` public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() { public actual val isEmpty: Boolean get() = next === this }

public actual open class LockFreeLinkedListNode { private val _next = atomic(this) // Node | Removed | OpDescriptor private val _prev = atomic(this) // Node to the left (cannot be marked as removed) private val _removedRef = atomic(null) // lazily cached removed ref to this } ```

LockFreeLinkedListHead 其实继承自 LockFreeLinkedListNode,而 LockFreeLinkedListNode 则是实现 Channel 核心功能的关键数据结构。

看到LinkedList基本可以确定这是一个链表,而LockFreeLinkedListHead是一个循环双向链表,LockFreeLinkedListHead是一个哨兵节点。这个节点本身不会用于存储任何数据,它的 next 指针会指向整个链表的头节点,而它的 prev 指针会指向整个链表的尾节点。






可以说,正是因为 LockFreeLinkedList 这个数据结构,才能使用 Channel 实现 CSP 通信模型。



``` public final override suspend fun send(element: E) { //1 if (offerInternal(element) === OFFER_SUCCESS) return //2 return sendSuspend(element) }

/* * 尝试将元素添加到缓冲区或队列接收方 / protected open fun offerInternal(element: E): Any { while (true) { //3 val receive = takeFirstReceiveOrPeekClosed() ?: return OFFER_FAILED val token = receive.tryResumeReceive(element, null) if (token != null) { assert { token === RESUME_TOKEN } receive.completeResumeReceive(element) return receive.offerResult } } }

private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable [email protected] { cont -> [email protected] while (true) { if (isFullImpl) { //4 val send = if (onUndeliveredElement == null) SendElement(element, cont) else SendElementWithUndeliveredHandler(element, cont, onUndeliveredElement) val enqueueResult = enqueueSend(send) when { enqueueResult == null -> { // enqueued successfully //5 cont.removeOnCancellation(send) [email protected] } enqueueResult is Closed<> -> { cont.helpCloseAndResumeWithSendException(element, enqueueResult) [email protected] } enqueueResult === ENQUEUE_FAILED -> {} // try to offer instead enqueueResult is Receive<> -> {} // try to offer instead else -> error("enqueueSend returned $enqueueResult") } } // hm... receiver is waiting or buffer is not full. try to offer val offerResult = offerInternal(element) when { offerResult === OFFER_SUCCESS -> { cont.resume(Unit) [email protected] } offerResult === OFFER_FAILED -> [email protected] offerResult is Closed<*> -> { cont.helpCloseAndResumeWithSendException(element, offerResult) [email protected] } else -> error("offerInternal returned $offerResult") } } } ```

  • 注释1:尝试向Channel发送数据,在有消费这的情况下if判断就是true,那么此时就会return,但是如果是第一次调用Channel就不会有消费者,所以offerInternal函数中想要从队列中取出消费者是不可能的(注释3),所以第一次调用就进入了sendSuspend函数。
  • 注释2:这个挂起函数是由高阶函数suspendCancellableCoroutineReusable实现的,这个高阶函数的作用就是暴露挂起函数的Continuation对象。
  • 注释4:这里的作用就是把发送的元素封装成SendElement对象,然后调用enqueueSend()方法将其添加到LockFreeLinkedList队列的末尾,添加成功则进入注释5开始执行。
  • 注释5:这里就是将SendElement从队列中删除。



``` public final override suspend fun receive(): E { // 1 val result = pollInternal()

if (result !== POLL_FAILED && result !is Closed<*>) return result as E
// 2
return receiveSuspend(RECEIVE_THROWS_ON_CLOSE)


protected open fun pollInternal(): Any? { while (true) { //3 val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED val token = send.tryResumeSend(null) if (token != null) { assert { token === RESUME_TOKEN } //4 send.completeResumeSend() return send.pollResult }



private suspend fun receiveSuspend(receiveMode: Int): R = suspendCancellableCoroutineReusable [email protected] { cont ->

val receive = if (onUndeliveredElement == null)
    ReceiveElement(cont as CancellableContinuation<Any?>, receiveMode) else
    ReceiveElementWithUndeliveredHandler(cont as CancellableContinuation<Any?>, receiveMode, onUndeliveredElement)
while (true) {
    if (enqueueReceive(receive)) {
        removeReceiveOnCancel(cont, receive)
        [email protected]
    // hm... something is not right. try to poll
    val result = pollInternal()
    if (result is Closed<*>) {
        [email protected]
    if (result !== POLL_FAILED) {
        cont.resume(receive.resumeValue(result as E), receive.resumeOnCancellationFun(result as E))
        [email protected]

} ```

  • 注释1:尝试从 LockFreeLinkedList 队列当中找出是否有正在被挂起的发送方;
  • 注释2:当 LockFreeLinkedList 队列当中没有正在挂起的发送方时,它会执行 receiveSuspend(),而 receiveSuspend() 也同样会被挂起,这个逻辑跟前面的sendSuspend()是类似的,先封装成receiveElement对象并添加队列的末尾,如果添加成功的话这个receiveSuspend()就会继续被挂起,这也就意味着receive()也会被挂起,而它的恢复时机则是在offerInternal()中;
  • 注释3:在send()流程中这一步是将发送的元素封装成SendElement对象,那么在receive()中这一步是取出封装好的SendElement对象,然后调用注释4;
  • 注释4:这里其实调用的是

override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN)


``` override fun completeResume(token: Any) { assert { token === RESUME_TOKEN } dispatchResume(resumeMode) }

private fun dispatchResume(mode: Int) { if (tryResume()) return dispatch(mode) }

//DispatchedTask internal fun DispatchedTask.dispatch(mode: Int) { assert { mode != MODE_UNINITIALIZED } // invalid mode value for this method val delegate = this.delegate val undispatched = mode == MODE_UNDISPATCHED if (!undispatched && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) { // dispatch directly using this instance's Runnable implementation val dispatcher = delegate.dispatcher val context = delegate.context if (dispatcher.isDispatchNeeded(context)) { dispatcher.dispatch(context, this) } else { resumeUnconfined() } } else { // delegate is coming from 3rd-party interceptor implementation (and does not support cancellation) // or undispatched mode was requested resume(delegate, undispatched) } } ```

最终调用dispatch(mode),而 dispatch(mode) 其实就是 DispatchedTask 的 dispatch(),而这个dispatch()就是协程体当中的代码在线程执行的时机,最终,它会执行在 Java 的 Executor 之上。至此,之前被挂起的 send() 方法,就算是恢复了。


  • Channel是一个线程安全的管道,常见用法是实现CSP通信模型,它之所以可以用这种方式来实现CSP通信模型主要还是因为底层的数据结构是LockFreeLinkedList
  • LockFreeLinkedList是一个双向循环的链表但是在Channel源码中会被当做先进先出的单向队列,只是在队列末尾插入节点,且只会正向遍历;
  • Channel的发送分为两种情况,一种是当前队列中已经存在发送方,这时候send()会恢复 Receive 节点的执行,并将数据发送给对方;另一种情况是当前队列中不存在发送方,就是首次发送,则会挂起并将元素封装成SendElement后添加到LockFreeLinkedList队列的末尾,等待被receive()恢复执行;
  • Channel的接收分为两种情况,一种是当前队列中已经存在被挂起的发送方,这时候receive()会恢复Send节点的执行,并且取出Send节点带过来的数据 ; 另一种情况是当前队列中不存在被挂起的发送方,那么这时候receive()就会被挂起,同时会封装元素为ReceiveElement并添加到队列的末尾,等待被下一个send()恢复执行。