kotlin协程中的线程切换

语言: CN / TW / HK

theme: juejin

前言

在了解了kotlin协程的基本原理之后我们接下来就需要关注下协程的线程切换。我们这篇文章就深入源码的角度来分析一波协程中的线程切换。

CoroutineContext

要了解Kotlin的线程切换,那我们首先必须要先了解协程的CoroutineContext这个东西。

我们都知道,每一个挂起函数在最后编译转换的时候都会变成一个携带CoroutineContext参数的函数,这也是为什么非suspend函数不可以调用suspend函数的原因。那么CoroutineContext究竟是干啥的呢?

CoroutineContext顾名思义就是协程的上下文,这个上下文是跟协程绑定的关系。每启动一个协程,就对应一个CoroutineContext。他规定了当前这个要启动的协程的运行环境。我们看下代码中CoroutineContext的定义

``` public interface CoroutineContext { public operator fun get(key: Key): E? //重载加号操作符 public operator fun plus(context: CoroutineContext): CoroutineContext = if (context === EmptyCoroutineContext) this else context.fold(this) { acc, element -> val removed = acc.minusKey(element.key) if (removed === EmptyCoroutineContext) element else { val interceptor = removed[ContinuationInterceptor] //如果拦截器为空就直接组合新和老的上下文返回 if (interceptor == null) CombinedContext(removed, element) else { //如果有拦截器,那就取出拦截器放在新组合的上下文右边。也就是优先访问到 val left = removed.minusKey(ContinuationInterceptor) if (left === EmptyCoroutineContext) CombinedContext( element, interceptor ) else CombinedContext(CombinedContext(left, element), interceptor) } } }

public interface Key<E : Element>
public interface Element : CoroutineContext {
    public val key: Key<*>

    public override operator fun <E : Element> get(key: Key<E>): E? =
        if (this.key == key) this as E else null
}

} ``` 可以看到CoroutineContext只是一个接口,本质上他的数据结构是一个数组形式。他的实现类和间接实现类有很多

WX20230206-113333@2x.png

CoroutineContextElement、ContinuationInterceptor、CoroutineDispatcher等等。而这里每一个子类其实就代表协程上下文的一种能力。例如: - Job:控制协程的生命周期,例如唤起或者取消。 - CoroutineDispatcher:将工作分派到适当的线程。 - CoroutineName:协程的名称,可用于调试。 - CoroutineExceptionHandler:处理未捕获的异常。

我们可以参考Kotlin协程:协程上下文与上下文元素这篇文章深入了解下CoroutineContext,本文还是顺着线程切换思路继续往下看。

withContext

上小节我们提到CoroutineDispatcher本质也是一个CoroutineContext。他用来分发任务到具体线程上。那他具体又是怎么分发的呢?我们以下边一个例子展开分析下。

suspend fun withContextTest() { withContext(Dispatchers.IO) { println("==========!!!!!io============== ${Thread.currentThread().name}") } } 这里一个很常见的线程切换写法。调用withContext函数,然后传一个Dispatchers.IO,然后用个协程启动下就OK了。这样我们withContext里的代码块就能在IO线程里执行了。

运行结果如下,一个名叫DefaultDispatcher-worker-1 @coroutine#1的线程执行了我们这次的任务。

WX20230206-143430@2x.png

所以我们顺藤摸瓜看下withContext函数的定义:

``` public suspend fun withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T ): T {

return suspendCoroutineUninterceptedOrReturn [email protected] { uCont ->
    val oldContext = uCont.context
    //用新的上下文和老的上下合并下一个最终上下文。新上下文的配置会覆盖替换掉老的上下文配置。
    val newContext = oldContext.newCoroutineContext(context)
    newContext.ensureActive()
    //最终新的上下文跟老的完全一致,调用非startUndispatchedOrReturn分发逻辑
    if (newContext === oldContext) {
        val coroutine = ScopeCoroutine(newContext, uCont)
        [email protected] coroutine.startUndispatchedOrReturn(coroutine, block)
    }
    //走到这步那就是两个上下文不相同了,但是拦截器是相同的
    if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
        val coroutine = UndispatchedCoroutine(newContext, uCont)
        withCoroutineContext(newContext, null) {
            //依旧走了不分发逻辑,我们没有拦截器可以先不考虑这个
            [email protected] coroutine.startUndispatchedOrReturn(coroutine, block)
        }
    }
    // 最终策略,使用DispatchedCoroutine分发任务。
    val coroutine = DispatchedCoroutine(newContext, uCont)
    block.startCoroutineCancellable(coroutine, coroutine)
    coroutine.getResult()
}

} ```

首先我们先看withContext这个方法的签名,第一个参数是CoroutineContext,协程上下文。Dispatchers.IO就是传递给了CoroutineContext这个参数。也是说Dispatchers.IO本质上也是CoroutineContext。

当我们使用Dispatchers.IO切换线程的时候,最终是由DispatchedCoroutine组件了一个新的上下文进行任务分发。那我们继续看DispatchedCoroutine处理逻辑。

DispatchedCoroutine

我们直接定位DispatchedCoroutine的startCoroutineCancellable这个方法。它是一个扩展函数。用runSafely语法糖包装了下。 internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable( receiver: R, completion: Continuation<T>, onCancellation: ((cause: Throwable) -> Unit)? = null ) = runSafely(completion) { createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation) } 这个函数主要做了两步: 1. 创建一个非拦截器的上下文,然后调用拦截方法。怪怪的,但是它就是这样。 2. 这个上下文调用resumeCancellableWith方法。

我们继续跟踪resumeCancellableWith方法。

inline fun resumeCancellableWith( result: Result<T>, noinline onCancellation: ((cause: Throwable) -> Unit)? ) { val state = result.toState(onCancellation) //如果判断任务需要分发 if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE //那就调用dispatcher进行分发 dispatcher.dispatch(context, this) } else { executeUnconfined(state, MODE_CANCELLABLE) { if (!resumeCancelled(state)) { resumeUndispatchedWith(result) } } } } 到这就很清晰了,用dispatcher校验下是否需要分发。如果需要的就去调用dispatch,如果不用则执行resumeUndispatchedWith恢复挂起点。

那这个dispatcher这个全局变量又是啥?

internal class DispatchedContinuation<in T>( @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation<T> ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation { @JvmField @Suppress("PropertyName") } 他是DispatchedContinuation的一个构造参数,也就是我们上边分析的withContext函数里的newContext。而newContext实际上就是我们例子里传递的Dispatchers.IO这个东西。

val coroutine = DispatchedCoroutine(newContext, uCont)

到这里我们梳理下逻辑应该是这样的: withContext线程分发.png

所以基于以上分析我们可以总结以下几点: - Dispatchers.Main和Dispatchers.IO本质也是CoroutineContext,并且他们负责实际的线程切换操作。 - withContext函数会对比新旧两个上下文的差异,只有不一致的时候才会走重新分发逻辑。所以并不是调用一次withContext就做一次上下文切换。

Dispatchers.Main

首先还是我们上边的例子,我们只把Dispatchers.IO换成Dispatchers.Main,然后把代码放到普通单元测试类里,代码就是这样。

``` suspend fun withContextTest() { withContext(Dispatchers.Main) { println("==========!!!!!main============== ${Thread.currentThread().name}") } }

@Test fun startWithContext() { runBlocking{ withContextTest() } } 然后执行下你就会发现代码会报错了,报错信息: Exception in thread "Test worker" java.lang.IllegalStateException: Module with the Main dispatcher had failed to initialize. For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used at kotlinx.coroutines.internal.MissingMainCoroutineDispatcher.missing(MainDispatchers.kt:118) at kotlinx.coroutines.internal.MissingMainCoroutineDispatcher.isDispatchNeeded(MainDispatchers.kt:96) at kotlinx.coroutines.internal.DispatchedContinuationKt.resumeCancellableWith(DispatchedContinuation.kt:319) at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:30) at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable$default(Cancellable.kt:25) at kotlinx.coroutines.CoroutineStart.invoke(CoroutineStart.kt:110) at kotlinx.coroutines.AbstractCoroutine.start(AbstractCoroutine.kt:126) at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch(Builders.common.kt:56) at kotlinx.coroutines.BuildersKt.launch(Unknown Source) at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch$default(Builders.common.kt:47) at kotlinx.coroutines.BuildersKt.launch$default(Unknown Source) at com.wuba.coroutinedemo.CoroutineDispatchDemo.addition_isCorrect(CoroutineDispatchDemo.kt:27) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176) at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129) at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100) at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60) at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56) at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:133) at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71) at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69) at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74) Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [CoroutineId(1), "coroutine#1":StandaloneCoroutine{Cancelling}@5f77d0f9, Dispatchers.Main[missing, cause=java.lang.RuntimeException: Method getMainLooper in android.os.Looper not mocked. See http://g.co/androidstudio/not-mocked for details.]] ``` 可以看到实际上是由MissingMainCoroutineDispatcher这个分发器分发了主线程任务,并且报了一个主线程没有初始化的任务。

在这里贴这个报错信息有两个目的。第一个大家可以根据这个报错堆栈复习下上一节讲的分发逻辑,这个报错堆栈很清晰反应了整个分发流程。第二个就是引出我们的Main线程。

我们先看下Main的定义。 ``` public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

```

可以看出main线程是需要初始化加载的,毕竟每个平台的的主线程是不一样的。比如安卓中主线程就是MainLooper。这也是上边报错堆栈的原因。在单元测试模块中没有指定配置主线程,所以最终指定了MissingMainCoroutineDispatcher来报错。

我们分析下loadMainDispatcher这个函数 ``` private fun loadMainDispatcher(): MainCoroutineDispatcher { return try { val factories = if (FAST_SERVICE_LOADER_ENABLED) { //理解就是个后门可以快速初始化一个测试级的主线程 FastServiceLoader.loadMainDispatcherFactory() } else { /MainDispatcherFactory是个接口,反射加载MainDispatcherFactory实现类 ServiceLoader.load( MainDispatcherFactory::class.java, MainDispatcherFactory::class.java.classLoader ).iterator().asSequence().toList() } @Suppress("ConstantConditionIf") //通过上边那个工厂创建实际的分发器 factories.maxByOrNull { it.loadPriority }?.tryCreateDispatcher(factories) ?: createMissingDispatcher() } catch (e: Throwable) { // 这就是我们报错信息提到的MissingMainCoroutineDispatcher createMissingDispatcher(e) } }

```

我们再看下MainDispatcherFactory的实现类都有哪些。

image.png

我们一眼就看到了AndroidDispatcherFactory,对他就是在安卓平台上实际的主线程分发器。赶紧点开看下实现。

``` internal class AndroidDispatcherFactory : MainDispatcherFactory {

override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher {
    //是不是很亲切很耳熟
    val mainLooper = Looper.getMainLooper() ?: throw IllegalStateException("The main looper is not available")
    return HandlerContext(mainLooper.asHandler(async = true))
}

} ``` 奥,实际的返回者又是HandlerContext,并且给他传递了Looper.getMainLooper()。然后再翻下HandlerContext的继承关系,没错是CoroutineContext。上节我们分析分发逻辑的时候说最终分发的是dispatch方法。那我们就看下HandlerContext的dispatch方法。其实不用看我们也知道怎么回事。

override fun dispatch(context: CoroutineContext, block: Runnable) { if (!handler.post(block)) { cancelOnRejection(context, block) } } 很简单调用looper的post方法扔到主线程里。但是这里有个兜底逻辑,就是如果扔主线程失败了会兜底使用Dispatchers.IO.dispatch(context, block)进行分发。

Dispatchers.IO

接下来我们就来分析下Dispatchers.IO。有了之前分析Dispatchers.Main的经验我们很快就找到了相关的定义。 public val IO: CoroutineDispatcher = DefaultIoScheduler 可以看到Dispatchers.IO就是DefaultIoScheduler。

在老的版本上Dispatchers.IO其实是和Dispatchers.Default一样使用的是DefaultScheduler。默认最大线程数为64。新版优化成了DefaultIoScheduler,支持扩展这个最大线程数。详细可以看下新版的DefaultIoScheduler注释。

我们来看下DefaultIoScheduler的定义: ``` //看方法名字和继承类,大概就可以知道这是一个线程池,不过这个线程池跟Java线程池没啥关系,完全是协程自己实现的一套 internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor { //一个没有限制的IO调度器,调至调度方法初始化了一个调度器? private val default = UnlimitedIoScheduler.limitedParallelism( //读取默认配置 systemProp( IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS) ) )

override val executor: Executor
    get() = this
//线程池执行任务的方法
override fun execute(command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command)

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
    return UnlimitedIoScheduler.limitedParallelism(parallelism)
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
    //分发调度任务
    default.dispatch(context, block)
}

} ``` 可以看到UnlimitedIoScheduler.limitedParallelism这个方法创建了一个调度器,然后由这个调度器来执行dispatch方法。而这个方法最终返回的是LimitedDispatcher。

public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher { parallelism.checkParallelism() return LimitedDispatcher(this, parallelism) }

看到这你是不是会想当然的认为最终dispatch分发的是LimitedDispatcher这个东西?如果你这样想就陷入了圈套中。

我们再放出LimitedDispatcher的源码:

``` //注意看参数 internal class LimitedDispatcher( private val dispatcher: CoroutineDispatcher, private val parallelism: Int ) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay) {

override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
    parallelism.checkParallelism()
    if (parallelism >= this.parallelism) return this
    return super.limitedParallelism(parallelism)
}
//核心任务执行
override fun run() {
    var fairnessCounter = 0
    while (true) {
        val task = queue.removeFirstOrNull()
        if (task != null) {
            try {
                task.run()
            } catch (e: Throwable) {
                handleCoroutineException(EmptyCoroutineContext, e)
            }
            if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this)) {
                dispatcher.dispatch(this, this)
                return
            }
            continue
        }

        synchronized(workerAllocationLock) {
            --runningWorkers
            if (queue.size == 0) return
            ++runningWorkers
            fairnessCounter = 0
        }
    }
}
//任务分发
override fun dispatch(context: CoroutineContext, block: Runnable) {
    dispatchInternal(block) {
        //注意看这,看dispatcher哪来的!!
        dispatcher.dispatch(this, this)
    }
}


private inline fun dispatchInternal(block: Runnable, dispatch: () -> Unit) {

    if (addAndTryDispatching(block)) return
    if (!tryAllocateWorker()) return
    dispatch()
}

private fun tryAllocateWorker(): Boolean {
    synchronized(workerAllocationLock) {
        if (runningWorkers >= parallelism) return false
        ++runningWorkers
        return true
    }
}

private fun addAndTryDispatching(block: Runnable): Boolean {
    queue.addLast(block)
    return runningWorkers >= parallelism
}

} ```

首先我们还是看dispatch方法,他调用了dispatchInternal做后续的分发逻辑。

dispatchInternal主要逻辑: 1. 把任务加到LockFreeTaskQueue队列里,判断下正在执行的任务数量是否已经大于约定的限制数量。如果大于,那证明已经没有可用的空闲线程去执行当前任务了。所以只用返回就好。 2. 如果小于,那证明还有可用空闲线程来执行当前的这个任务。那么就调用tryAllocateWorker申请资源。注意这里只是同步方法改变下计数,并非真正的去申请线程池资源。 3. 最后调用dispatch()方法,也就是override fun dispatch(context: CoroutineContext, block: Runnable) 这个方法里的 dispatcher.dispatch(this, this) 这一行。 4. 最后就是调用dispatcher.dispatch(this, this)

  • 第4条这句很容易让人陷入误解。其实它并不是递归调用LimitedDispatcher的dispatch方法。
  • 这里的dispatcher是LimitedDispatcher构造方法里传来的CoroutineDispatcher。
  • 是limitedParallelism方法LimitedDispatcher(this, parallelism) 的this。
  • 也是DefaultIoScheduler里的default变量赋值语句里的UnlimitedIoScheduler.limitedParallelism
  • 也就是说实际上是UnlimitedIoScheduler的dispatch方法在起作用。

我们再看下UnlimitedIoScheduler的dispatch方法:

``` private object UnlimitedIoScheduler : CoroutineDispatcher() {

@InternalCoroutinesApi
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
    DefaultScheduler.dispatchWithContext(block, BlockingContext, true)
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
    DefaultScheduler.dispatchWithContext(block, BlockingContext, false)
}

} ``` 他调用的是DefaultScheduler的dispatchWithContext分发任务。我们再看下DefaultScheduler的定义。

``` //看参数,又是核心池大小,最大池子大小的大概也能猜出这是个线程池。但是这个类方法里就shutdown和close两个函数。所以核心实现在SchedulerCoroutineDispatcher里。 internal object DefaultScheduler : SchedulerCoroutineDispatcher( CORE_POOL_SIZE, MAX_POOL_SIZE, IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME ) { internal fun shutdown() { super.close() } override fun close() { throw UnsupportedOperationException("Dispatchers.Default cannot be closed") }

} ``` 看这阵仗大家似乎也应该猜到什么了。对,这就是个线程池。哎,问题来了,为什么不直接复用java的线程池?要自己实现呢?我们先把这个问题放下继续分析源码。最后回过头来再思考这个问题。

DefaultScheduler类里就两个函数,所以核心逻辑肯定在父类SchedulerCoroutineDispatcher里。所以我们继续看SchedulerCoroutineDispatcher这个类

``` internal open class SchedulerCoroutineDispatcher( private val corePoolSize: Int = CORE_POOL_SIZE, private val maxPoolSize: Int = MAX_POOL_SIZE, private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, private val schedulerName: String = "CoroutineScheduler", ) : ExecutorCoroutineDispatcher() {

override val executor: Executor
    get() = coroutineScheduler

private var coroutineScheduler = createScheduler()

private fun createScheduler() =
    //奥,SchedulerCoroutineDispatcher也不是实际的线程池,CoroutineScheduler才是。
    CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)

override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)

} ``` dispatch在用coroutineScheduler进行分发,coroutineScheduler又是CoroutineScheduler。

到这里我们我们要先歇息歇息了,我们总结下Dispatchers.IO,其实就三点: - Dispatchers.IO也是一个CoroutineContext,在老版本对应的是DefaultScheduler,新版本是DefaultIOScheduler。 - 新版本DefaultIOScheduler相对于DefaultScheduler增加了最大线程数量的扩展。本质上还是使用DefaultScheduler做分发。 - DefaultScheduler的本质其实是CoroutineScheduler,他是一个自定义的线程池。我们的Dispatchers.IO本质是交给了CoroutineScheduler去执行调度任务了。

我们可以以一个更简单的图来描述下他们的关系。

Dispatchers_IO.png

CoroutineScheduler

接下来就是我们IO线程池的核心部分,CoroutineScheduler。可能我分析的有些地方不够透彻,大家可以先看一遍我的分析文章然后自行去源码里分析分析这个类。也可以直接跳过我的分析直接自己动手丰衣足食。

``` internal class CoroutineScheduler( @JvmField val corePoolSize: Int, @JvmField val maxPoolSize: Int, @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME ) : Executor, Closeable { //线程池的全局队列,CpuQueue可以理解为核心线程任务。 @JvmField val globalCpuQueue = GlobalQueue() //第二条任务队列,BlockingQueue用来存放优先级较低的任务。就是核心线程把CpuQueue任务做完之后才会调度到这里。 @JvmField val globalBlockingQueue = GlobalQueue() //添加任务队列 private fun addToGlobalQueue(task: Task): Boolean { return if (task.isBlocking) { globalBlockingQueue.addLast(task) } else { globalCpuQueue.addLast(task) } } override fun execute(command: Runnable) = dispatch(command)

fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) { trackTask() //创建任务 val task = createTask(block, taskContext) //判断现在是否已经在一个Worker线程中,如果在的话那就可以进行复用了,算是一个小优化。 val currentWorker = currentWorker() //将任务添加到到Worker线程自己的任务队列里。注意不是上边的全局队列 val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch) //如果添加失败,那就添加到分发器的全局队列里。 if (notAdded != null) { if (!addToGlobalQueue(notAdded)) { throw RejectedExecutionException("$schedulerName was terminated") } } //如果是尾调模式,并且当前是worker线程,也就是说任务被添加到了复用线程任务里了 val skipUnpark = tailDispatch && currentWorker != null if (task.mode == TASK_NON_BLOCKING) { //如果是核心线程就等待当前线程执行完毕,不在唤起或者创建新的线程。以期望任务可以在这个线程中按序执行完毕。就是说不要在启动非核心线程来抢占这个核心任务。 if (skipUnpark) return signalCpuWork() } else { //非核心任务执行逻辑,其实大体跟核心任务逻辑相同 signalBlockingWork(skipUnpark = skipUnpark) } }

}

```

我们可以再关注下signalBlockingWork方法的定义:

private fun signalBlockingWork(skipUnpark: Boolean) { //一个状态值的获取 val stateSnapshot = incrementBlockingTasks() //刚才我们提到的尾调,直接返回 if (skipUnpark) return //从线程池唤起一个线程 if (tryUnpark()) return //唤起失败,那就准备创建一个线程 if (tryCreateWorker(stateSnapshot)) return //创建失败了,那在尝试唤起一遍,万一这时候线程池又有线程了呢 tryUnpark() }

以上就是协程自定义线程池的大概逻辑。我们可以只关注两点内容: - 这个自定义线程池有两个全局任务队列,一个核心线程任务,一个非核心线程任务。 - 优先复用已有的线程任务,如果有就会把任务加到已有的work任务的本地队列里。否则会重新唤起或者创建线程。

比如有个协程任务连续两次调withContext(Dispatchers.IO)切换子线程分发任务,那么第二个withContext(Dispatchers.IO)就在第一个子线程中继续分发执行,而非重新创建线程任务。

Worker

接下来我们就要分析真正负责干活的线程任务Worker。 ``` //看继承类,哦是个线程。既然是线程我们就要关注run方法 internal inner class Worker private constructor() : Thread() {

    inline val scheduler get() = [email protected]
    //本地任务队列,也就是每个线程的任务表。
    @JvmField
    val localQueue: WorkQueue = WorkQueue()
    //关键方法
    override fun run() = runWorker()
    //核心任务
    private fun runWorker() {
        var rescanned = false
        while (!isTerminated && state != WorkerState.TERMINATED) {
            //找活干!
            val task = findTask(mayHaveLocalTasks)
            //找到活了
            if (task != null) {
                rescanned = false
                minDelayUntilStealableTaskNs = 0L
                executeTask(task)
                //continue下继续找活干
                continue
            } else {
                mayHaveLocalTasks = false
            }
            //活都干完了,先别走。在延迟一会,重新continue下。万一这时候又有活来了呢?
            if (minDelayUntilStealableTaskNs != 0L) {
                if (!rescanned) {
                    rescanned = true
                } else {
                    rescanned = false
                    tryReleaseCpu(WorkerState.PARKING)
                    interrupted()
                    LockSupport.parkNanos(minDelayUntilStealableTaskNs)
                    minDelayUntilStealableTaskNs = 0L
                }
                continue
            }
            //活真的干完了,也没新活来,那这个线程就可以被回收了。收工!
            tryPark()
        }
        //释放资源,其实就是改标记位
        tryReleaseCpu(WorkerState.TERMINATED)
    }

    //找活干
    fun findTask(scanLocalQueue: Boolean): Task? {
        //获取标记位,获取成功后就开始找任务。如果允许扫描本地队列,那就先扫描本地队列。如果不允许扫描本地队列就去全局队列里查找。
        if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
        //没有获取到cpu令牌,还是从本地全局队列里去查询。
        val task = if (scanLocalQueue) {
            localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
        } else {
            globalBlockingQueue.removeFirstOrNull()
        }
        //唉,有意思的来了。如果以上都没查询到任务,那就尝试偷取一个任务(Steal=偷)
        return task ?: trySteal(blockingOnly = true)
    }
}

那这Work线程又去哪偷任务去呢?我们来看下trySteal方法的定义: private fun trySteal(blockingOnly: Boolean): Task? { assert { localQueue.size == 0 } val created = createdWorkers // 看下当前有几个线程呢,小于两个那就是只有一个。奥那不就是我自己么,那还偷啥,不偷了。 if (created < 2) { return null }

var currentIndex = nextInt(created)
var minDelay = Long.MAX_VALUE
//有多少个线程我就重复多少次
repeat(created) {
    ++currentIndex
    if (currentIndex > created) currentIndex = 1
    val worker = workers[currentIndex]
    //取出来线程,并且这个线程不是我自己
    if (worker !== null && worker !== this) {
        assert { localQueue.size == 0 }
        //从别的Work线程任务里去偷他的本地任务。
        val stealResult = if (blockingOnly) {
            localQueue.tryStealBlockingFrom(victim = worker.localQueue)
        } else {
            localQueue.tryStealFrom(victim = worker.localQueue)
        }
        if (stealResult == TASK_STOLEN) {
            return localQueue.poll()
        } else if (stealResult > 0) {
            minDelay = min(minDelay, stealResult)
        }
    }
}
minDelayUntilStealableTaskNs = if (minDelay != Long.MAX_VALUE) minDelay else 0
return null

} ``` 经过我们分析,原来Worker真是个敬业好员工(卷王)。自己没活了(本地任务队列),领导那也没活了(全局任务队列),又主动去帮同事完成一部分工作(偷任务)。并且在所有任务完成之后也不立马下班,而是主动加班,等待分配新工作(等待复用机制)。

尾调机制

我们大体对这个IO线程池有个初步了解了,然后我们回头看下上边说的那个“尾调”这个逻辑currentWorker.submitToLocalQueue(task, tailDispatch)

我们跟踪这个方法最后定位到WorkQueue类。其中fair参数就是tailDispatch。

fun add(task: Task, fair: Boolean = false): Task? { //尾调就是规规矩矩的放在任务队列尾部 if (fair) return addLast(task) //不是尾调,就把新任务发在高优出队任务里,然后把本来要出队的任务放在队尾。 val previous = lastScheduledTask.getAndSet(task) ?: return null return addLast(previous) }

结合以上CoroutineScheduler和Worker小节学到的知识点。我们可以总结出这个尾调逻辑具体要做啥。

在传统的线程池的线程充足情况下,一个任务到来时,会被分配一个线程。假设前后两个任务A与B有依赖关系,需要在执行A再执行B,这时如果两个任务同时到来,执行A任务的线程会直接执行,而执行B线程的任务可能需要被阻塞。而一旦线程阻塞会造成线程资源的浪费。而协程本质上就是多个小段程序的相互协作,因此这种场景会非常多,通过这种机制可以保证任务的执行顺序,同时减少资源浪费,而且可以最大限度的保证一个连续的任务执行在同一个线程中。

所以基于我们也很容易理解谷歌doc关于withContext的这段描述。

WX20230206-111156@2x.png

总结

至此我们基本已经分析完了协程线程切换的大体流程。我们总结本篇文章的几个核心知识点吧

  • 什么是协程上下文?他的作用是什么?
    • 协程上下文是规定了此次协程任务的工作环境,比如在什么线程里,异常处理机制等操作。
  • 协程IO线程池为什么不复用java线程池?
    • 针对协程多个小段程序的相互协作,线程切换场景频繁的特点,协程使用尾回调机制和线程任务偷取机制来优化IO线程池性能。
  • 协程IO线程池做了那些优化?
    • 尾回调机制和线程任务偷取机制
  • 什么是尾回调机制?
    • 如果有当前活跃线程,协程会把任务放到这个线程的本地任务队列里,并等待线程执行完任务,而非重新创建或唤起新任务。以此来保证有前后依赖任务的场景可以顺序执行。以避免线程资源的浪费。
  • 什么是线程任务偷取?
    • 当一个线程的本地任务和全局队列任务都执行完毕后,会尝试去别的线程里的本地任务队列里偷取一个任务拿来执行,以实现线程的最大复用。

以上是我的总结和答案,大家也可以参考别人的文章得到自己的总结和答案。

参考文章

【深入理解Kotlin协程】协程的上下文 CoroutineContext

Kotlin协程:协程上下文与上下文元素

Kotlin协程:Dispatchers.IO线程池原理