Kotlin協程之Dispatchers原理

語言: CN / TW / HK

theme: fancy

Kotlin協程不是什麼空中閣樓,Kotlin原始碼會被編譯成class位元組碼檔案,最終會執行到虛擬機器中。所以從本質上講,Kotlin和Java是類似的,都是可以編譯產生class的語言,但最終還是會受到虛擬機器的限制,它們的程式碼最終會在虛擬機器上的某個執行緒上被執行。

之前我們分析了launch的原理,但當時我們沒有去分析協程創建出來後是如何與執行緒產生關聯的,怎麼被分發到具體的執行緒上執行的,本篇文章就帶大家分析一下。

前置知識

要想搞懂Dispatchers,我們先來看一下Dispatchers、CoroutineDispatcher、ContinuationInterceptor、CoroutineContext之間的關係

```kotlin public actual object Dispatchers { @JvmStatic public actual val Default: CoroutineDispatcher = DefaultScheduler

@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

@JvmStatic
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined

@JvmStatic
public val IO: CoroutineDispatcher = DefaultIoScheduler

}

public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { }

public interface ContinuationInterceptor : CoroutineContext.Element {}

public interface Element : CoroutineContext {} ```

Dispatchers中存放的是協程排程器(它本身是一個單例),有我們平時常用的IO、Default、Main等。這些協程排程器都是CoroutineDispatcher的子類,這些協程排程器其實都是CoroutineContext

demo

我們先來看一個關於launch的demo:

kotlin fun main() { val coroutineScope = CoroutineScope(Job()) coroutineScope.launch { println("Thread : ${Thread.currentThread().name}") } Thread.sleep(5000L) }

在生成CoroutineScope時,demo中沒有傳入相關的協程排程器,也就是Dispatchers。那這個launch會執行到哪個執行緒之上?

執行試一下:

log Thread : DefaultDispatcher-worker-1

居然執行到了DefaultDispatcher-worker-1執行緒上,這看起來明顯是Dispatchers.Default協程排程器裡面的執行緒。我明明沒傳Dispatchers相關的context,居然會執行到子執行緒上。說明執行到default執行緒是launch預設的。

它是怎麼與default執行緒產生關聯的?開啟原始碼一探究竟:

```kotlin public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { //程式碼1 val newContext = newCoroutineContext(context)

//程式碼2
val coroutine = if (start.isLazy)
    LazyStandaloneCoroutine(newContext, block) else
    StandaloneCoroutine(newContext, active = true)

//程式碼3
coroutine.start(start, coroutine, block)
return coroutine

} ```

  1. 將傳入的CoroutineContext構造出新的context
  2. 啟動模式,判斷是否為懶載入,如果是懶載入則構建懶載入協程物件,否則就是標準的
  3. 啟動協程

我們重點關注程式碼1,這是與CoroutineContext相關的。

kotlin public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext { //從父協程那裡繼承過來的context+這次的context val combined = coroutineContext.foldCopiesForChildCoroutine() + context val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined //combined可以簡單的把它看成是一個map,它是CoroutineContext型別的 //如果當前context不等於Dispatchers.Default,而且從map裡面取ContinuationInterceptor(用於攔截之後分發執行緒的)值為空,說明沒有傳入協程應該在哪個執行緒上執行的相關引數 return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) debug + Dispatchers.Default else debug }

呼叫launch的時候,我們沒有傳入context,預設引數是EmptyCoroutineContext。這裡的combined,它其實是CoroutineContext型別的,可以簡單的看成是map(其實不是,只是類似)。通過combined[ContinuationInterceptor]可以將傳入的執行緒排程相關的引數給取出來,這裡如果取出來為空,是給該context添加了一個Dispatchers.Default,然後把新的context返回出去了。所以launch預設情況下,會走到default執行緒去執行。

補充一點:CoroutineContext能夠通過+連線是因為它內部有個public operator fun plus函式。能夠通過combined[ContinuationInterceptor]這種方式訪問元素是因為有個public operator fun get函式。

``kotlin public interface CoroutineContext { /** * Returns the element with the given [key] from this context ornull`. */ public operator fun get(key: Key): E?

 /**
 * Returns a context containing elements from this context and elements from  other [context].
 * The elements from this context with the same key as in the other one are dropped.
 */
public operator fun plus(context: CoroutineContext): CoroutineContext {
    ......
}

} ```

startCoroutineCancellable

上面我們分析了launch預設情況下,context中會增加Dispatchers.Default的這個協程排程器,到時launch的Lambda會在default執行緒上執行,其中具體流程是怎麼樣的,我們分析一下。

在之前的文章 Kotlin協程之launch原理 中我們分析過,launch預設情況下會最終執行到startCoroutineCancellable函式。

```kotlin public fun (suspend () -> T).startCoroutineCancellable(completion: Continuation): Unit = runSafely(completion) { //構建ContinuationImpl createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit)) }

public actual fun (suspend () -> T).createCoroutineUnintercepted( completion: Continuation ): Continuation { val probeCompletion = probeCoroutineCreated(completion) return if (this is BaseContinuationImpl) //走這裡 create(probeCompletion) else createCoroutineFromSuspendFunction(probeCompletion) { (this as Function1, Any?>).invoke(it) } } ```

Kotlin協程之launch原理 文章中,咱們分析過create(probeCompletion)這裡創建出來的是launch的那個Lambda,編譯器會產生一個匿名內部類,它繼承自SuspendLambda,而SuspendLambda是繼承自ContinuationImpl。所以 createCoroutineUnintercepted(completion)一開始構建出來的是一個ContinuationImpl,接下來需要去看它的intercepted()函式。

```kotlin internal abstract class ContinuationImpl( completion: Continuation?, private val _context: CoroutineContext? ) : BaseContinuationImpl(completion) { constructor(completion: Continuation?) : this(completion, completion?.context)

public override val context: CoroutineContext
    get() = _context!!

@Transient
private var intercepted: Continuation<Any?>? = null

public fun intercepted(): Continuation<Any?> =
    intercepted
        ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
            .also { intercepted = it }

} ```

第一次走到intercepted()函式時,intercepted肯定是為null的,還沒初始化。此時會通過context[ContinuationInterceptor]取出Dispatcher物件,然後呼叫該Dispatcher物件的interceptContinuation()函式。這個Dispatcher物件在demo這裡其實就是Dispatchers.Default。

kotlin public actual object Dispatchers { @JvmStatic public actual val Default: CoroutineDispatcher = DefaultScheduler }

可以看到,Dispatchers.Default是一個CoroutineDispatcher物件,interceptContinuation()函式就在CoroutineDispatcher中。

```kotlin public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { public final override fun interceptContinuation(continuation: Continuation): Continuation = DispatchedContinuation(this, continuation) }

public fun (suspend () -> T).startCoroutineCancellable(completion: Continuation): Unit = runSafely(completion) { createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit)) } ```

這個方法非常簡單,就是新建並且返回了一個DispatchedContinuation物件,將this和continuation給傳入進去。這裡的this是Dispatchers.Default。

所以,最終我們發現走完startCoroutineCancellable的前2步之後,也就是走完intercepted()之後,建立的是DispatchedContinuation物件,最後是呼叫的DispatchedContinuation的resumeCancellableWith函式。最後這步比較關鍵,這是真正將協程的具體執行邏輯放到執行緒上執行的部分。

```kotlin internal class DispatchedContinuation( //這裡傳入的dispatcher在demo中是Dispatchers.Default @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation ) : DispatchedTask(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation by continuation {

inline fun resumeCancellableWith(
    result: Result<T>,
    noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
    val state = result.toState(onCancellation)
    //程式碼1
    if (dispatcher.isDispatchNeeded(context)) {
        _state = state
        resumeMode = MODE_CANCELLABLE
        //程式碼2
        dispatcher.dispatch(context, this)
    } else {
        //程式碼3
        executeUnconfined(state, MODE_CANCELLABLE) {
            if (!resumeCancelled(state)) {
                resumeUndispatchedWith(result)
            }
        }
    }
}

}

internal abstract class DispatchedTask( @JvmField public var resumeMode: Int ) : SchedulerTask() { ...... }

internal actual typealias SchedulerTask = Task

internal abstract class Task( @JvmField var submissionTime: Long, @JvmField var taskContext: TaskContext ) : Runnable { ...... }

public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {

public abstract fun dispatch(context: CoroutineContext, block: Runnable)

public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true

}

```

從DispatchedContinuation的繼承結構來看,它既是一個Continuation(通過委託給傳入的continuation引數),也是一個Runnable。

  • 首先看程式碼1:這個dispatcher在demo中其實是Dispatchers.Default ,然後呼叫它的isDispatchNeeded(),這個函式定義在CoroutineDispatcher中,預設就是返回true,只有Dispatchers.Unconfined返回false
  • 程式碼2:呼叫Dispatchers.Default的dispatch函式,將context和自己(DispatchedContinuation,也就是Runnable)傳過去了
  • 程式碼3:對應Dispatchers.Unconfined的情況,它的isDispatchNeeded()返回false

現在我們要分析程式碼2之後的執行邏輯,也就是將context和Runnable傳入到dispatch函式之後是怎麼執行的。按道理,看到Runnable,那可能這個與執行緒執行相關,應該離我們想要的答案不遠了。回到Dispatchers,我們發現Dispatchers.Default是DefaultScheduler型別的,那我們就去DefaultScheduler中或者其父類中去找dispatch函式。

```kotlin public actual object Dispatchers { @JvmStatic public actual val Default: CoroutineDispatcher = DefaultScheduler }

internal object DefaultScheduler : SchedulerCoroutineDispatcher( CORE_POOL_SIZE, MAX_POOL_SIZE, IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME ) { ...... }

internal open class SchedulerCoroutineDispatcher( private val corePoolSize: Int = CORE_POOL_SIZE, private val maxPoolSize: Int = MAX_POOL_SIZE, private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, private val schedulerName: String = "CoroutineScheduler", ) : ExecutorCoroutineDispatcher() {

private var coroutineScheduler = createScheduler()

private fun createScheduler() =
    CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)

 override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)

}

```

最後發現dispatch函式在其父類SchedulerCoroutineDispatcher中,在這裡構建了一個CoroutineScheduler,直接呼叫了CoroutineScheduler物件的dispatch,然後將Runnable(也就是上面的DispatchedContinuation物件)傳入。

```kotlin internal class CoroutineScheduler( @JvmField val corePoolSize: Int, @JvmField val maxPoolSize: Int, @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME ) : Executor, Closeable { override fun execute(command: Runnable) = dispatch(command)

fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
    trackTask() // this is needed for virtual time support
    //程式碼1:構建Task,Task實現了Runnable介面
    val task = createTask(block, taskContext)
    //程式碼2:取當前執行緒轉為Worker物件,Worker是一個繼承自Thread的類
    val currentWorker = currentWorker()
    //程式碼3:嘗試將Task提交到本地佇列並根據結果執行相應的操作
    val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
    if (notAdded != null) {
        //程式碼4:notAdded不為null,則再將notAdded(Task)新增到全域性佇列中
        if (!addToGlobalQueue(notAdded)) {
            throw RejectedExecutionException("$schedulerName was terminated")
        }
    }
    val skipUnpark = tailDispatch && currentWorker != null
    // Checking 'task' instead of 'notAdded' is completely okay
    if (task.mode == TASK_NON_BLOCKING) {
        if (skipUnpark) return
        //程式碼5: 建立Worker並開始執行該執行緒
        signalCpuWork()
    } else {
        // Increment blocking tasks anyway
        signalBlockingWork(skipUnpark = skipUnpark)
    }
}

private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }

internal inner class Worker private constructor() : Thread() {
    .....
}

} ```

觀察發現,原來CoroutineScheduler類實現了java.util.concurrent.Executor介面,同時實現了它的execute方法,這個方法也會呼叫dispatch()。

  • 程式碼1:首先是通過Runnable構建了一個Task,這個Task其實也是實現了Runnable介面,只是把傳入的Runnable包裝了一下
  • 程式碼2:將當前執行緒取出來轉換成Worker,當然第一次時,這個轉換不會成功,這個Worker是繼承自Thread的一個類
  • 程式碼3:將task提交到本地佇列中,這個本地佇列待會兒會在Worker這個執行緒執行時取出Task,並執行Task
  • 程式碼4:如果task提交到本地佇列的過程中沒有成功,那麼會新增到全域性佇列中,待會兒也會被Worker取出來Task並執行
  • 程式碼5:建立Worker執行緒,並開始執行

開始執行Worker執行緒之後,我們需要看一下這個執行緒的run方法執行的是啥,也就是它的具體執行邏輯。

```kotlin internal inner class Worker private constructor() : Thread() { override fun run() = runWorker() private fun runWorker() { var rescanned = false while (!isTerminated && state != WorkerState.TERMINATED) { //程式碼1 val task = findTask(mayHaveLocalTasks) if (task != null) { rescanned = false minDelayUntilStealableTaskNs = 0L //程式碼2 executeTask(task) continue } else { mayHaveLocalTasks = false } if (minDelayUntilStealableTaskNs != 0L) { if (!rescanned) { rescanned = true } else { rescanned = false tryReleaseCpu(WorkerState.PARKING) interrupted() LockSupport.parkNanos(minDelayUntilStealableTaskNs) minDelayUntilStealableTaskNs = 0L } continue } tryPark() } tryReleaseCpu(WorkerState.TERMINATED) }

fun findTask(scanLocalQueue: Boolean): Task? {
    if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
    // If we can't acquire a CPU permit -- attempt to find blocking task
    val task = if (scanLocalQueue) {
        localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
    } else {
        globalBlockingQueue.removeFirstOrNull()
    }
    return task ?: trySteal(blockingOnly = true)
}

private fun executeTask(task: Task) {
    val taskMode = task.mode
    idleReset(taskMode)
    beforeTask(taskMode)
    runSafely(task)
    afterTask(taskMode)
}

fun runSafely(task: Task) {
    try {
        task.run()
    } catch (e: Throwable) {
        val thread = Thread.currentThread()
        thread.uncaughtExceptionHandler.uncaughtException(thread, e)
    } finally {
        unTrackTask()
    }
}

} ```

run方法直接呼叫的runWorker(),在裡面是一個while迴圈,不斷從佇列中取Task來執行。

  • 程式碼1:從本地佇列或者全域性佇列中取出Task
  • 程式碼2:執行這個task,最終其實就是呼叫這個Runnable的run方法。

也就是說,在Worker這個執行緒中,執行了這個Runnable的run方法。還記得這個Runnable是誰麼?它就是上面我們看過的DispatchedContinuation,這裡的run方法執行的就是協程任務,那這塊具體的run方法的實現邏輯,我們應該到DispatchedContinuation中去找。

```kotlin

internal class DispatchedContinuation( @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation ) : DispatchedTask(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation by continuation { ...... }

internal abstract class DispatchedTask( @JvmField public var resumeMode: Int ) : SchedulerTask() { public final override fun run() { assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching val taskContext = this.taskContext var fatalException: Throwable? = null try { val delegate = delegate as DispatchedContinuation val continuation = delegate.continuation withContinuationContext(continuation, delegate.countOrElement) { val context = continuation.context val state = takeState() // NOTE: Must take state in any case, even if cancelled val exception = getExceptionalResult(state) / * Check whether continuation was originally resumed with an exception. * If so, it dominates cancellation, otherwise the original exception * will be silently lost. / val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null

            //非空,且未處於active狀態
            if (job != null && !job.isActive) {
                //開始之前,協程已經被取消,將具體的Exception傳出去
                val cause = job.getCancellationException()
                cancelCompletedResult(state, cause)
                continuation.resumeWithStackTrace(cause)
            } else {
                //有異常,傳遞異常
                if (exception != null) {
                    continuation.resumeWithException(exception)
                } else {
                    //程式碼1
                    continuation.resume(getSuccessfulResult(state))
                }
            }
        }
    } catch (e: Throwable) {
        // This instead of runCatching to have nicer stacktrace and debug experience
        fatalException = e
    } finally {
        val result = runCatching { taskContext.afterTask() }
        handleFatalException(fatalException, result.exceptionOrNull())
    }
}

} ```

我們主要看一下程式碼1處,呼叫了resume開啟協程。前面沒有異常,才開始啟動協程,這裡才是真正的開始啟動協程,開始執行launch傳入的Lambda表示式。這個時候,協程的邏輯是在Worker這個執行緒上執行的了,切到某個執行緒上執行的邏輯已經完成了。

ps: rusume會走到BaseContinuationImpl的rusumeWith,然後走到launch傳入的Lambda匿名內部類的invokeSuspend方法,開始執行狀態機邏輯。前面的文章 Kotlin協程createCoroutine和startCoroutine原理 我們分析過這裡,這裡就只是簡單提一下。

到這裡,Dispatchers的執行流程就算完了,前後都串起來了。

小結

Dispatchers是協程框架中與執行緒互動的關鍵。底層會有不同的執行緒池,Dispatchers.Default、IO,協程任務來了的時候會封裝成一個個的Runnable,丟到執行緒中執行,這些Runnable的run方法中執行的其實就是continuation.resume,也就是launch的Lambda生成的SuspendLambda匿名內部類,也就是開啟協程狀態機,開始協程的真正執行。