帶著問題分析Kotlin協程原理
theme: scrolls-light highlight: androidstudio
我報名參加金石計劃1期挑戰——瓜分10萬獎池,這是我的第1篇文章,點選檢視活動詳情
協程是一個較為複雜的東西,弄清協程的原理也不是簡簡單單的一篇文章就能講的清,這個過程中需要做的就是使用、看原始碼、debug、總結、回顧。本節內容主要以弄清以下幾個問題為主:
- 如何建立一個協程
- 協程是如何被創建出來的
- 啟動策略是什麼
- 啟動策略完成了什麼工作
- 協程是如何被啟動的
- 協程啟動過程中的Dispatchers是什麼
- 協程啟動過程中的Dispatchers做了什麼
- Worker是什麼
- Worker中的任務被找到後是如何執行的?
- 協程建立的時候CoroutineScope是什麼
- 協程的結構化中父子關係是怎樣建立的
- 結構化建立後又是怎麼取消的
- newCoroutineContext(context)做了什麼
1.如何建立一個協程
建立一個協程有三種方式 ``` fun main() { CoroutineScope(Job()).launch(Dispatchers.Default) { delay(1000L) println("Kotlin") }
println("Hello")
Thread.sleep(2000L)
}
//執行結果: //Hello //Kotlin
fun main() = runBlocking { val deferred = CoroutineScope(Job()).async(Dispatchers.Default) { println("Hello") delay(1000L) println("Kotlin") }
deferred.await()
}
//執行結果: //Hello //Kotlin
fun main() {
runBlocking(Dispatchers.Default) {
println("launch started!")
delay(1000L)
println("Kotlin")
}
println("Hello")
Thread.sleep(2000L)
println("Process end!")
}
//執行結果: //launch started! //Hello //Kotlin
```
三者區別如下:
- launch:無法獲取執行結果,返回型別Job,不會阻塞;
- async:可獲取執行結果,返回型別Deferred,呼叫
await()
會阻塞,不呼叫則不會阻塞但也無法獲取執行結果; - runBlocking:可獲取執行結果,阻塞當前執行緒的執行,多用於Demo、測試,官方推薦只用於連線執行緒與協程。
以launch
為例,看一下它的原始碼
public fun CoroutineScope.launch(
//①
context: CoroutineContext = EmptyCoroutineContext,
//②
start: CoroutineStart = CoroutineStart.DEFAULT,
//③
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
launch原始碼有三個引數分別對其解釋一下:
- context: 協程的上下文,用於提供協程啟動和執行時需要的資訊,預設值是
EmptyCoroutineContext
,有預設值就可以不傳,但是也可以傳遞Kotlin提供的Dispatchers
來指定協程執行在哪一個執行緒中 - start: 協程的啟動模式;
- block: 這個可以理解為協程的函式體,函式型別為
suspend CoroutineScope.() -> Unit
通過對引數的說明,上面關於launch的建立也可以這麼寫: ``` fun coroutineTest() { val scope = CoroutineScope(Job())
val block: suspend CoroutineScope.() -> Unit = {
println("Hello")
delay(1000L)
println("Kotlin")
}
scope.launch(block = block)
}
反編譯成Java程式碼如下:
public final class CoroutineDemoKt {
public static final void main() {
coroutineTest();
Thread.sleep(2000L);
}
// $FF: synthetic method
public static void main(String[] var0) {
main();
}
public static final void coroutineTest() {
CoroutineScope scope = CoroutineScopeKt.CoroutineScope((CoroutineContext)JobKt.Job$default((Job)null, 1, (Object)null));
//Function2 是Kotlin 為 block 變數生成的靜態變數以及方法。
//實現狀態機的邏輯
Function2 block = (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
String var2;
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
var2 = "Hello";
System.out.println(var2);
this.label = 1;
if (DelayKt.delay(1000L, this) == var3) {
return var3;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
var2 = "Kotlin";
System.out.println(var2);
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
});
BuildersKt.launch$default(scope, (CoroutineContext)null, (CoroutineStart)null, block, 3, (Object)null);
}
} ```
2.協程是怎麼被創建出來的
還是以launch
的原始碼為例進行分析
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
//協程的建立看這裡
coroutine.start(start, coroutine, block)
return coroutine
}
coroutine.start
為協程的建立與啟動,這個start進入到了AbstractCoroutine
,是一個抽象類,裡面的start方法專門用於啟動協程。
```
public abstract class AbstractCoroutine
/**
* 用給定的block和start啟動協程
* 最多呼叫一次
*/
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}
}
``
AbstractCoroutine`中的start函式負責啟動協程,同時啟動是根據block和啟動策略決定的,那麼啟動策略是什麼? 以及啟動策略完成了什麼工作?
3.啟動策略是什麼
``` public enum class CoroutineStart {
/* * 根據上下文立即排程協程執行。 / DEFAULT
/* * 延遲啟動協程,只在需要時才啟動。 * 如果協程[Job]在它有機會開始執行之前被取消,那麼它根本不會開始執行 * 而是以一個異常結束。 / LAZY
/* * 以一種不可取消的方式,根據其上下文安排執行的協程; * 類似於[DEFAULT],但是協程在開始執行之前不能取消。 * 協程在掛起點的可取消性取決於掛起函式的具體實現細節,如[DEFAULT]。 / ATOMIC
/* * 立即執行協程,直到它在當前執行緒中的第一個掛起點; / UNDISPATCHED: } ```
4.啟動策略完成了什麼工作
協程在確定啟動策略之後就會開始執行它的任務,它的任務在invoke()
函式中被分為不同的執行方式
/**
* 用這個協程的啟動策略啟動相應的block作為協程
*/
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(completion)
ATOMIC -> block.startCoroutine(completion)
UNDISPATCHED -> block.startCoroutineUndispatched(completion)
LAZY -> Unit
}
這裡直接對block.startCoroutine(completion)
進行分析,block.startCoroutineCancellable
和block.startCoroutineUndispatched
也只是在startCoroutine
的基礎上增加了一些額外的功能,前者表示啟動協程以後可以響應取消,後者表示協程啟動以後不會被分發。
/**
* 啟動一個沒有接收器且結果型別為[T]的協程。
* 每次呼叫這個函式時,它都會建立並啟動一個新的、可掛起計算例項。
* 當協程以一個結果或一個異常完成時,將呼叫[completion]延續。
*/
public fun <T> (suspend () -> T).startCoroutine(completion: Continuation<T) {
createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}
先來看一下createCoroutineUnintercepted()
做了哪些工作
//可以理解為一種宣告
// ↓
public expect fun <T> (suspend () -> T).createCoroutineUnintercepted(completion: Continuation<T>
): Continuation<Unit>
expect的意思是期望、期盼,這裡可以理解為一種宣告,期望在具體的平臺中實現。
進入到createCoroutineUnintercepted()
的原始碼中看到並沒有什麼實現,這主要是因為Kotlin是面向多個平臺的具體的實現需要在特定平臺中才能找到,這裡進入IntrinsicsJvm.kt中分析。
//IntrinsicsJvm#createCoroutineUnintercepted
//actual代表的是createCoroutineUnintercepted在JVM平臺上的具體實現
// ↓
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
// 難點
// ↓
return if (this is BaseContinuationImpl)
//會進入這裡執行
create(probeCompletion)
else
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function1<Continuation<T>, Any?>).invoke(it)
}
}
這裡的actual就是在具體品臺上的實現。
上面的程式碼中有一個難點是【this】 這個this的含義如果只是在反編譯程式碼或者原始碼中去看很難發現它是什麼,這裡要通過原始碼、位元組碼、反編譯的Java程式碼進行分析,這裡我以截圖進行展示
com/example/coroutines/CoroutineDemoKt$coroutineTest$block$1
是block具體的實現類。它繼承自kotlin/coroutines/jvm/internal/SuspendLambda
```
internal abstract class SuspendLambda(
public override val arity: Int,
completion: Continuation
public override fun toString(): String =
if (completion == null)
Reflection.renderLambdaToString(this) // this is lambda
else
super.toString() // this is continuation
} ```
```
internal abstract class ContinuationImpl(
completion: Continuation
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
} ```
```
internal abstract class BaseContinuationImpl(
public val completion: Continuation
public open fun create(completion: Continuation<*>): Continuation<Unit> {
throw UnsupportedOperationException("create(Continuation) has not been overridden")
}
}
``
**SuspendLambda是ContinuationImpl的子類,ContinuationImpl又是BaseContinuationImpl的子類,** 所以可以得到結論
if (this is BaseContinuationImpl)的結果為**true,** 然後會進入到
create(probeCompletion)`函式中
這個create()
函式丟擲了一個異常,意思就是方法沒有被重寫,潛臺詞就是create()
這個方法是要被重寫的,如果不重寫就會丟擲異常。 那麼create()
方法又是在哪裡重寫的呢。答案就在反編譯後的Java程式碼中的create
方法
//這段程式碼來自launch建立的反編譯後的Java程式碼
//create函式被重寫
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
這行程式碼,其實就對應著協程被建立的時刻。
分析完了startContinue()
再來分析一下startCoroutineCancellable()
做了什麼,因為協程預設的啟動策略是CoroutineStart.DEFAULT
```
//Cancellable#startCoroutineCancellable
/*
* 使用此函式以可取消的方式啟動協程,以便它可以在等待排程時被取消。
/
public fun
//Continuation#startCoroutine
public fun ``
通過對比可以發現
startCoroutineCancellable()和
startCoroutine()的內部並沒有太大區別,他們最終都會呼叫
createCoroutineUnintercepted(),只不過前者在最後呼叫了
resumeCancellableWith(),後者呼叫的是
resume()`,這個稍後分析。
5.協程是如何被啟動的
協程的建立分析完成後再來分析一下協程是如何啟動的,再回過頭看一下createCoroutineUnintercepted()
之後做了什麼
//Cancellable#startCoroutineCancellable
/**
* 使用此函式以可取消的方式啟動協程,以便它可以在等待排程時被取消。
*/
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
// 現在看這裡
// ↓
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
進入到intercepted()
,它也是需要找到對應平臺上的具體實現,這裡還是以JVM平臺進行分析
```
//需要找到對應平臺的具體實現
public expect fun
//JVM平臺的實現
//IntrinsicsJvm.kt#intercepted
/
* 使用[ContinuationInterceptor]攔截continuation。
*/
public actual fun 在分析協程的建立過程中已經分析過上面的**this**代表的就是block變數,所以這裡的強轉是成立的,那麼這裡的`intercepted()`呼叫的就是`ContinuationImpl`物件中的函式
/
public override val context: CoroutineContext
get() = _context!!
@Transient
private var intercepted: Continuation<Any?>? = null
//重點看這裡
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
}
``
首先
intercepted()方法會判斷它的成員變數
intercepted是否為空,如果為空則呼叫
context[ContinuationInterceptor]獲取上下文當中的
Dispatchers物件,這個
Dispatchers`物件又是什麼呢?
6.協程啟動過程中的Dispatchers是什麼
這裡以launch的原始碼為主進行分析 ``` fun main() { CoroutineScope(Job()).launch(Dispatchers.Default) { delay(1000L) println("Kotlin") }
println("Hello")
Thread.sleep(2000L)
}
public fun CoroutineScope.launch(
// 傳入的Dispatchers.Default表示的就是這個context
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
``
傳入的
Dispatchers.Default對應的是context引數,從原始碼可知這個引數不是必傳的,因為它有預設值
EmptyCoroutineContext`,Kotlin官方用它來替代了null,這是Kotlin空安全思維。
傳入Dispatchers.Default
之後就是用它替代了EmptyCoroutineContext
,那麼這裡的Dispatchers
的定義跟CoroutineContext
有什麼關係呢?看一下Dispatchers
的原始碼
```
/*
* 對[CoroutineDispatcher]的各種實現進行分組。
/
public actual object Dispatchers {
/**
* 用於CPU密集型任務的執行緒池,一般來說它內部的執行緒個數是與機器 CPU 核心數量保持一致的
* 不過它有一個最小限制2,
*/
public actual val Default: CoroutineDispatcher = DefaultScheduler
/**
* 主執行緒,在Android中才可以使用,主要用於UI的繪製,在普通JVM上無法使用
*/
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
/**
* 不侷限於任何特定執行緒,會根據執行時的上下文環境決定
*/
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
/**
* 用於執行IO密集型任務的執行緒池,它的數量會多一些,預設最大執行緒數量為64個
* 具體的執行緒數量可以通過kotlinx.coroutines.io.parallelism配置
* 它會和Default共享執行緒,當Default還有其他空閒執行緒時是可以被IO執行緒池複用。
*/
public val IO: CoroutineDispatcher = DefaultIoScheduler
}
`Dispatchers`是一個單例物件,它裡面的幾個型別都是`CoroutineDispatcher`。
/*
* 所有協程排程器實現擴充套件的基類。
/
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { }
/* * 標記攔截協程延續的協程上下文元素。 / public interface ContinuationInterceptor : CoroutineContext.Element { }
/*
* CoroutineContext的一個元素。協程上下文的一個元素本身就是一個單例上下文。
/
public interface Element : CoroutineContext { }
``
CoroutineDispatcher本身又是
CoroutineContext`,從上面的原始碼就可以得出他們的關係可以這麼表示:
7.協程啟動過程中的Dispatchers做了什麼
協程的執行是離不開執行緒的,Dispatchers的作用就是確定協程執行在哪個執行緒,預設是Default,然後它也可以執行在IO、Main等執行緒,它負責將任務排程的指定的現呈上,具體的分析後面再寫。
通過前分析在協程中預設的是Default
執行緒池,因此這裡進入的就是Default
執行緒池。 那麼我們回到intercepted()
函式繼續進行分析,通過Debug進入到CoroutineDispatcher
中的interceptContinuation()
函式
```
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
/**
* 返回一個封裝了提供的[continuation]的continuation,從而攔截所有的恢復。
* 這個方法通常應該是異常安全的。
* 從此方法丟擲的異常可能會使使用此排程程式的協程處於不一致且難以除錯的狀態。
*/
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
}
``
interceptContinuation()返回了一個
DispatchedContinuation物件,其中的**this**就是預設的執行緒池
Dispatchers.Default`。
然後通過DispatchedContinuation
呼叫它的resumeCancellableWith()
函式,這個函式前面分析過是從哪裡進入的,這裡不再說明。
```
internal class DispatchedContinuation
...
public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}
//我們內聯它來儲存堆疊上的一個條目,在它顯示的情況下(無限制排程程式)
//它只在Continuation<T>.resumeCancellableWith中使用
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}
...
}
`DispatchedContinuation`繼承了`DispatchedTask`:
internal abstract class DispatchedTask
internal actual typealias SchedulerTask = Task
internal abstract class Task(
@JvmField var submissionTime: Long,
@JvmField var taskContext: TaskContext
) : Runnable {
constructor() : this(0, NonBlockingContext)
inline val mode: Int get() = taskContext.taskMode // TASK_XXX
}
``
DispatchedTask繼承了
SchedulerTask,同時
SchedulerTask還是Task的別名,Task又實現了
Runnable`介面,這意味著它可以被分發到Java的執行緒中去執行了。
同時可以得出一個結論:DispatchedContinuation是一個Runnable。
DispatchedContinuation
還實現了Continuation
介面,它還使用了類委託的語法將介面的具體實現交給了它的成員屬性continuation
,那麼這裡對上面的結論進行補充:DispatchedContinuation不僅是一個Runnable,還是一個Continuation。
DispatchedContinuation
分析完了進入它的resumeCancellableWith()
函式分析:
inline fun resumeCancellableWith(
result: Result<T>,
noinline onCancellation: ((cause: Throwable) -> Unit)?
) {
val state = result.toState(onCancellation)
//①
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
//②
dispatcher.dispatch(context, this)
} else {
//這裡就是Dispatchers.Unconfined情況,這個時候協程不會被分發到別的執行緒,只執行在當前執行緒中。
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled(state)) {
resumeUndispatchedWith(result)
}
}
}
}
- 註釋①: dispatcher來自CoroutineDispatcher,isDispatchNeeded就是它的成員函式
```
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
/**
* 如果協程的執行應該使用[dispatch]方法執行,則返回' true '。
* 大多數dispatchers的預設行為是返回' true '。
*/
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
}
`isDispatchNeeded()`**預設返回true**,且在大多數情況下都是**true,** 但是也有個例外就是在它的子類中`Dispatchers.Unconfined`會將其重寫成 false。
internal object Unconfined : CoroutineDispatcher() {
// 只有Unconfined會重寫成false
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
}
```
因為預設是true,所以接下來會進入註釋②
-
註釋②: 註釋②呼叫了
CoroutineDispatcher
中的dispatch()
方法將block程式碼塊排程到另一個執行緒上,這裡的執行緒池預設值是Dispatchers.Default
所以任務被分發到Default執行緒池,第二個引數是Runnable
,這裡傳入的是this,因為DispatchedContinuation
間接的實現了Runnable
介面。 ``` public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { /**- 在給定的[上下文]中,將一個可執行的block排程到另一個執行緒上。
- 這個方法應該保證給定的[block]最終會被呼叫,否則系統可能會達到死鎖狀態並且永遠不會終止。
*/
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
}
因為預設執行緒池是`Dispatchers.Default`,所以這裡的`dispatch()`其實呼叫的是`Dispatchers.Default.dispatch`,這裡的`Dispatchers.Default`的本質是一個單例物件**DefaultScheduler,** 它繼承了`SchedulerCoroutineDispatcher`:
//繼承了SchedulerCoroutineDispatcher internal object DefaultScheduler : SchedulerCoroutineDispatcher( CORE_POOL_SIZE, MAX_POOL_SIZE, IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME ) { // 關閉排程程式,僅用於Dispatchers.shutdown() internal fun shutdown() { super.close() }
//重寫 (Dispatchers.Default as ExecutorCoroutineDispatcher).close() override fun close() { throw UnsupportedOperationException("Dispatchers.Default cannot be closed") }
override fun toString(): String = "Dispatchers.Default" }
internal open class SchedulerCoroutineDispatcher( private val corePoolSize: Int = CORE_POOL_SIZE, private val maxPoolSize: Int = MAX_POOL_SIZE, private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, private val schedulerName: String = "CoroutineScheduler", ) : ExecutorCoroutineDispatcher() {
private var coroutineScheduler = createScheduler()
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
}
`SchedulerCoroutineDispatcher`中實際呼叫`dispatch()`方法的實際是`coroutineScheduler`,所以`dispatcher.dispatch()`實際呼叫的是**coroutineScheduler.dispatch()**
internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
//Executor介面中的方法被覆蓋
override fun execute(command: Runnable) = dispatch(command)
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
trackTask()
//將傳入的 Runnable 型別的 block(也就是 DispatchedContinuation),包裝成 Task。
val task = createTask(block, taskContext)
// 拿到當前的任務佇列, 嘗試將任務提交到本地佇列並根據結果進行操作
//Worker其實是一個內部類,其實就是Java的Thread類
val currentWorker = currentWorker()
//將當前的 Task 新增到 Worker 執行緒的本地佇列,等待執行。
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
if (!addToGlobalQueue(notAdded)) {
// 全域性佇列在關閉/關閉的最後一步關閉——不再接受任何任務
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
//
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
signalCpuWork()
} else {
//增加阻塞任務
signalBlockingWork(skipUnpark = skipUnpark)
}
}
} ```
8.Worker是什麼?
``` internal inner class Worker private constructor() : Thread() { init { isDaemon = true //守護執行緒預設為true }
private fun runWorker() {
var rescanned = false
while (!isTerminated && state != WorkerState.TERMINATED) {
//在while迴圈中一直嘗試從佇列中找到任務
val task = findTask(mayHaveLocalTasks)
// 找到任務則進行下一步
if (task != null) {
rescanned = false
minDelayUntilStealableTaskNs = 0L
//執行任務
executeTask(task)
continue
} else {
mayHaveLocalTasks = false
}
if (minDelayUntilStealableTaskNs != 0L) {
if (!rescanned) {
rescanned = true
} else {
rescanned = false
tryReleaseCpu(WorkerState.PARKING)
interrupted()
LockSupport.parkNanos(minDelayUntilStealableTaskNs)
minDelayUntilStealableTaskNs = 0L
}
continue
}
//沒有任務則停止執行,執行緒可能會關閉
tryPark()
}
tryReleaseCpu(WorkerState.TERMINATED)
}
} ```
9.Worker中的任務被找到後是如何執行的?
``` internal inner class Worker private constructor() : Thread() { private fun executeTask(task: Task) { val taskMode = task.mode //當它找到一個任務時,這個工作者就會呼叫它 idleReset(taskMode) beforeTask(taskMode) runSafely(task) afterTask(taskMode) }
fun runSafely(task: Task) {
try {
task.run()
} catch (e: Throwable) {
val thread = Thread.currentThread()
thread.uncaughtExceptionHandler.uncaughtException(thread, e)
} finally {
unTrackTask()
}
}
}
internal abstract class Task(
@JvmField var submissionTime: Long,
@JvmField var taskContext: TaskContext
) : Runnable {
constructor() : this(0, NonBlockingContext)
inline val mode: Int get() = taskContext.taskMode // TASK_XXX
}
``
最終進入到
runSafely()`函式中,然後呼叫run方法,前面分析過,將DispatchedContinuation包裝成一個實現了Runnable介面的Task,所以這裡的task.run()本質上就是呼叫的Runnable.run(),到這裡任務就協程任務就真正的執行了。
那麼也就可以知道這裡的run() 函式其實呼叫的就是DispatchedContinuation父類DispatchedTask中的run()函式:
```
internal abstract class DispatchedTask
public final override fun run() {
assert { resumeMode != MODE_UNINITIALIZED }
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
withContinuationContext(continuation, delegate.countOrElement) {
val context = continuation.context
val state = takeState() //
val exception = getExceptionalResult(state)
// 檢查延續最初是否在異常情況下恢復。
// 如果是這樣,它將主導取消,否則原始異常將被靜默地丟失。
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
if (job != null && !job.isActive) {
//①
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) {
//②
continuation.resumeWithException(exception)
} else {
//③
continuation.resume(getSuccessfulResult(state))
}
}
}
} catch (e: Throwable) {
//
fatalException = e
} finally {
val result = runCatching { taskContext.afterTask() }
handleFatalException(fatalException, result.exceptionOrNull())
}
}
}
``
- **註釋①:** 在程式碼執行之前這裡會判斷當前協程是否被取消。如果被取消了就會呼叫
continuation.resumeWithStackTrace(cause)將具體的原因傳出去;
- **註釋②:** 判斷協程是否發生了異常如果已經發生異常就呼叫
continuation.resumeWithException(exception)將異常傳遞出去;
- **註釋③:** 如果前面執行沒有問題,就進入最後一步
continuation.resume(getSuccessfulResult(state)`,此時協程正式被啟動並且執行launch當中傳入的block或者Lambda函式。
這裡其實就是協程與執行緒產生關聯的地方。
以上就是協程的建立、啟動的流程,但是還有幾個問題沒有弄明白: - 協程建立的時候CoroutineScope是什麼 - 協程的結構化中父子關係是怎樣建立的 - 結構化建立後又是怎麼取消的
接下來對這幾個問題進行解答:
10.協程建立的時候CoroutineScope是什麼
前面對於協程的三種建立方式中的launch、async的建立方式中都有CoroutineScope(Job())
,現在先來分析下CoroutineScope
做了什麼,先來看下launch和async的原始碼。
```
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
public fun
async、launch的擴充套件接收者都是`CoroutineScope`,這就意味著他們等價於`CoroutineScope`的成員方法,如果要呼叫就必須先獲取到`CoroutineScope`的物件。
public interface CoroutineScope {
/**
* 此作用域的上下文
* Context被作用域封裝,用於實現作為作用域擴充套件的協程構建器
* 不建議在普通程式碼中訪問此屬性,除非訪問[Job]例項以獲得高階用法
*/
public val coroutineContext: CoroutineContext
}
``
CoroutineScope是一個介面,這個介面所做的也只是對
CoroutineContext做了一層封裝而已。
CoroutineScope`最大的作用就是可以方便的批量的控制協程,例如結構化併發。
11.CoroutineScope與結構化併發
``` fun coroutineScopeTest() { val scope = CoroutineScope(Job()) scope.launch { launch { delay(1000000L) logX("ChildLaunch 1") } logX("Hello 1") delay(1000000L) logX("Launch 1") }
scope.launch {
launch {
delay(1000000L)
logX("ChildLaunch 2")
}
logX("Hello 2")
delay(1000000L)
logX("Launch 2")
}
Thread.sleep(1000L)
scope.cancel()
}
//輸出結果:
//================================
//Hello 2
//Thread:DefaultDispatcher-worker-2
//================================
//================================
//Hello 1
//Thread:DefaultDispatcher-worker-1
//================================
``
上面的程式碼實現了結構化,只是建立了
CoroutineScope(Job())和利用
launch啟動了幾個協程就實現了結構化,結構如圖所示,那麼它的**父子結構是如何建立的?**

CoroutineScope這裡要說明一下為什麼明明是是一個介面,可是在建立的時候卻可以以建構函式的方式使用。在Kotlin中的命名規則是以【駝峰法】為主的,在特殊情況下是可以打破這個規則的,
CoroutineScope就是一個特殊的情況,它是一個**頂層函式**但它發揮的作用卻是**建構函式**,同樣的還有
Job()`,它也是頂層函式,在Kotlin中當頂層函式被用作建構函式的時候首字母都是大寫的。
12.協程的結構化中父子關係是怎樣建立的
再來看一下CoroutineScope
作為建構函式使用時的原始碼:
/**
* 建立一個[CoroutineScope],包裝給定的協程[context]。
*
* 如果給定的[context]不包含[Job]元素,則建立一個預設的' Job() '。
*
* 這樣,任何子協程在這個範圍或[取消][協程]失敗。就像在[coroutineScope]塊中一樣,
* 作用域本身會取消作用域的所有子作用域。
*/
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
ContextScope(if (context[Job] != null) context else context + Job())
建構函式的CoroutineScope
傳入一個引數,這個引數如果包含Job
元素則直接使用,如果不包含Job
則會建立一個新的Job
,這就說明每一個coroutineScope
物件中的 Context
中必定會存在一個Job
物件。而在建立一個CoroutineScope
物件時這個Job()
是一定要傳入的,因為CoroutineScope
就是通過這個Job()
物件管理協程的。
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
上面的程式碼是launch
的原始碼,分析一下LazyStandaloneCoroutine
和StandaloneCoroutine
。
```
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine
private class LazyStandaloneCoroutine( parentContext: CoroutineContext, block: suspend CoroutineScope.() -> Unit ) : StandaloneCoroutine(parentContext, active = false) { private val continuation = block.createCoroutineUnintercepted(this, this)
override fun onStart() {
continuation.startCoroutineCancellable(this)
}
}
`StandaloneCoroutine`是`AbstractCoroutine`子類,`AbstractCoroutine`是**協程的抽象類,** 裡面的引數`initParentJob = true`表示協程建立之後需要初始化協程的父子關係。`LazyStandaloneCoroutine`是`StandaloneCoroutine`的子類,`active=false`使命它是以懶載入的方式建立協程。
public abstract class AbstractCoroutine``
AbstractCoroutine是一個抽象類他繼承了
JobSupport,而
JobSupport是
Job`的具體實現。
在init
函式中根據initParentJob
判斷是否建立父子關係,initParentJob
的預設值是true
因此if中的initParentJob()
函式是一定會執行的,這裡的parentContext[Job]
取出的的Job
就是在launche建立時傳入的Job
。
initParentJob()
是JobSupport
中的方法,因為AbstractCoroutine
繼承自JobSupport
,所以進入JobSupport
分析這個方法。
```
public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {
final override val key: CoroutineContext.Key<*> get() = Job
/**
* 初始化父類的Job
* 在所有初始化之後最多呼叫一次
*/
protected fun initParentJob(parent: Job?) {
assert { parentHandle == null }
//①
if (parent == null) {
parentHandle = NonDisposableHandle
return
}
//②
parent.start() // 確保父協程已經啟動
@Suppress("DEPRECATION")
//③
val handle = parent.attachChild(this)
parentHandle = handle
// 檢查註冊的狀態
if (isCompleted) {
handle.dispose()
parentHandle = NonDisposableHandle
}
}
}
``
上面的原始碼
initParentJob`中添加了三處註釋,現在分別對這三處註釋進行分析:
- if (parent == null): 這裡是對是否存在父
Job
的判斷,如果不存在則不再進行後面的工作,也就談不上建立父子關係了。因為在Demo中傳遞了Job()
因此這裡的父Job
是存在的,所以程式碼可以繼續執行。 - parent.start(): 這裡確保parent對應的Job啟動了;
- parent.attachChild(this): 這裡就是將子
Job
新增到父Job
中,使其成為parent的子Job
。這裡其實就是建立了父子關係。
用一句話來概括這個關係就是:每一個協程都有一個Job,每一個Job又有一個父Job和多個子Job,可以看做是一個樹狀結構。這個關係可以用下面這張圖表示:
13.協程的結構化建立後又是怎麼取消的
結構化可以被建立的同時CoroutineScope
還提供了可取消的函式,Demo中通過scope.cancel()
取消了協程,它的流程又是怎樣的呢?先從scope.cancel
中的cancel
看起
/**
* 取消這個scope,包含當前Job和子Job
* 如果沒有Job,可丟擲異常IllegalStateException
*/
public fun CoroutineScope.cancel(cause: CancellationException? = null) {
val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
job.cancel(cause)
}
scope.cancel
又是通過job.cancel
取消的,這個cancel
具體實現是在JobSupport
中
``` public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 { ...
public override fun cancel(cause: CancellationException?) {
cancelInternal(cause ?: defaultCancellationException())
}
public open fun cancelInternal(cause: Throwable) {
cancelImpl(cause)
}
/**
* 當cancelChild被呼叫的時候cause是Throwable或者ParentJob
* 如果異常已經被處理則返回true,否則返回false
*/
internal fun cancelImpl(cause: Any?): Boolean {
var finalState: Any? = COMPLETING_ALREADY
if (onCancelComplete) {
// 確保它正在完成,如果返回狀態是 cancelMakeCompleting 說明它已經完成
finalState = cancelMakeCompleting(cause)
if (finalState === COMPLETING_WAITING_CHILDREN) return true
}
if (finalState === COMPLETING_ALREADY) {
//轉換到取消狀態,當完成時呼叫afterCompletion
finalState = makeCancelling(cause)
}
return when {
finalState === COMPLETING_ALREADY -> true
finalState === COMPLETING_WAITING_CHILDREN -> true
finalState === TOO_LATE_TO_CANCEL -> false
else -> {
afterCompletion(finalState)
true
}
}
}
/**
* 如果沒有需要協程體完成的任務返回true並立即進入完成狀態等待子類完成
* 這裡代表的是當前Job是否有協程體需要執行
*/
internal open val onCancelComplete: Boolean get() = false
} ```
job.cancel()
最終呼叫的是JobSupport
中的cancelImpl()
。這裡它分為兩種情況,判斷依據是onCancelComplete
,代表的就是當前Job
是否有協程體需要執行,如果沒有則返回true。這裡的Job
是自己建立的且沒有需要執行的協程程式碼因此返回結果是true
,所以就執行cancelMakeCompleting()
表示式。
``` private fun cancelMakeCompleting(cause: Any?): Any? { loopOnState { state -> ... val finalState = tryMakeCompleting(state, proposedUpdate) if (finalState !== COMPLETING_RETRY) return finalState } }
private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? { ... return tryMakeCompletingSlowPath(state, proposedUpdate) }
private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? { //獲取狀態列表或提升為列表以正確操作子列表 val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY ... notifyRootCause?.let { notifyCancelling(list, it) } ... return finalizeFinishingState(finishing, proposedUpdate) } ```
進入cancelMakeCompleting()
後經過多次流轉最終會呼叫tryMakeCompletingSlowPath()
中的notifyCancelling()
,在這個函式中才是執行子Job
和父Job
取消的最終流程
```
private fun notifyCancelling(list: NodeList, cause: Throwable) {
//首先取消子Job
onCancelling(cause)
//通知子Job
notifyHandlers
private inline fun
notifyHandlers()
中的流程就是遍歷當前Job
的子Job
,並將取消的cause
傳遞過去,這裡的invoke()
最終會呼叫 ChildHandleNode
的 invoke()
方法
``` public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 { ...
internal class ChildHandleNode(
@JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {
override val parent: Job get() = job
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
}
public final override fun parentCancelled(parentJob: ParentJob) {
cancelImpl(parentJob)
}
} ```
childJob.parentCancelled(job)
的呼叫最終呼叫的是JobSupport
中的parentCanceled()
函式,然後又回到了cancelImpl()
中,也就是 Job 取消的入口函式。這實際上就相當於在做遞迴呼叫。
子Job
取消完成後接著就是取消父Job
了,進入到cancelParent()
函式中
``` /* * 取消Job時呼叫的方法,以便可能將取消傳播到父類。 * 如果父協程負責處理異常,則返回' true ',否則返回' false '。 / private fun cancelParent(cause: Throwable): Boolean { // Is scoped coroutine -- don't propagate, will be rethrown if (isScopedCoroutine) return true
/*
* CancellationException被認為是“正常的”,當子協程產生它時父協程通常不會被取消。
* 這允許父協程取消它的子協程(通常情況下),而本身不會被取消,
* 除非子協程在其完成期間崩潰併產生其他異常。
*/
val isCancellation = cause is CancellationException
val parent = parentHandle
if (parent === null || parent === NonDisposableHandle) {
return isCancellation
}
// 責任鏈模式
return parent.childCancelled(cause) || isCancellation
}
/* * 在這個方法中,父類決定是否取消自己(例如在重大故障上)以及是否處理子類的異常。 * 如果異常被處理,則返回' true ',否則返回' false '(呼叫者負責處理異常) / public open fun childCancelled(cause: Throwable): Boolean { if (cause is CancellationException) return true return cancelImpl(cause) && handlesException } ```
cancelParent
的返回結果使用了責任鏈模式, 如果返回【true】表示父協程處理了異常,返回【false】則表示父協程沒有處理異常。
當異常是CancellationException
時如果是子協程產生的父協程不會取消,或者說父協程會忽略子協程的取消異常,如果是其他異常父協程就會響應子協程的取消了。
14.newCoroutineContext(context)
launch原始碼中第一行程式碼做了什麼目前還不得而知,這裡分析一下做了什麼事
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
// 就是這一行
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
/**
* 為新的協程建立上下文。當沒有指定其他排程程式或[ContinuationInterceptor]時,
* 它會設定[Dispatchers.Default],並新增對除錯工具的可選支援(當開啟時)。
*/
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
//①
val combined = coroutineContext.foldCopiesForChildCoroutine() + context
//②
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
//③
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
- 註釋①:這行程式碼首先呼叫了coroutineContext
,這是因為newCoroutineContext()
是CoroutineScope
的擴充套件函式,CoroutineScope
對CoroutineContext
進行了封裝,所以newCoroutineContext()
函式中可直接訪問CoroutineScope
的coroutineContext
;foldCopiesForChildCoroutine()
函式返回子協程要繼承的[CoroutineContext]
;然後跟傳入的context引數進行合併。這行程式碼就是讓子協程可以繼承父協程的上下文元素。
- 註釋②:它的作用是在除錯模式下,為我們的協程物件增加唯一的 ID,這個ID就是在除錯協程程式時出現的日誌如:Thread:DefaultDispatcher-worker-1 @coroutine#1
中的@coroutine#1
,其中的1就是這個ID。
- 註釋③:如果合併後的combined沒有指定排程程式就預設使用Dispatcher.Default
。
通過上面的分析可以得出newCoroutineContext
函式確定了預設使用的執行緒池是Dispatcher.Default
,那麼這裡為什麼會預設使用Default
執行緒池而不是Main
呢? 因為Kotlin並不是只針對Android開發的,它支援多個平臺Main
執行緒池僅在UI相關的平臺中才會用到,而協程是不能脫離執行緒執行的,所以這裡預設使用Default
執行緒池。
15.總結:
1. 協程是如何被建立的
協程在確認自己的啟動策略後進入到createCoroutineUnintercepted
函式中建立了協程的Continuation
例項,Continuation
的實現類是ContinuationImpl
,它繼承自BaseContinuationImpl
,在BaseContinuationImpl
中呼叫了它的create()
方法,而這個create()
方法需要重寫才可以實現否則會丟擲異常,那麼這個create()
的重寫就是反編譯後的Java程式碼中create()
函式。
2. 協程是如何被啟動的
協程通過createCoroutineUnintercepted
函式建立後緊接著就會呼叫它的intercepted()
方法,將其封裝成DispatchedContinuation
物件,DispatchedContinuation
是Runnable
的子類,DispatchedContinuation
會持有CoroutineDispatcher
以及前面建立的Continuation
物件,DispatchedContinuation
呼叫內部的resumeCancellableWith()
方法,然後進入到resumeCancellableWith()
中的dispatched.dispatch()
,這裡會將協程的Continuation
包裝成Task並新增到Worker的本地任務佇列等待執行。而這裡的Worker本質上是Java中的Thread,在這一步協程完成了執行緒的切換,任務新增到Worker的本地任務佇列後就會通過run()
方法啟動任務,這裡呼叫的是task.run()
,這裡的run最終是呼叫的DispatchedContinuation
的父類DispatchedTask中的run()
方法,在這個run方法中如果前面沒有異常最終會呼叫continuation.resume()
,然後就開始執行執行協程體中的程式碼了也就是反編譯程式碼中的invokeSuspend()
,這裡開始了協程狀態機流程,這樣協程就被啟動了。
3. CoroutineScope與結構化的關係
CoroutineScope
是一個介面,這個介面所做的也只是對CoroutineContext
做了一層封裝而已。CoroutineScope
最大的作用就是可以方便的批量的控制協程,CoroutineScope
在建立它的例項的時候是需要傳入Job()
物件的,因為CoroutineScop
e就是通過這個Job()
物件管理協程的。協程的結構化關係也就因此而產生。
協程的結構化關係是一種父子關係,父子關係可以看做是一個N叉樹的結構,用一句話來概括這個關係就是:每一個協程都有一個Job,每一個Job又有一個父Job和多個子Job,可以看做是一個樹狀結構。
父子關係的建立是通過AbstractCoroutine
中的initParentJob()
進行的,而AbstractCoroutine
是JobSuppert
的子類,建立父子關係的過程就是首先確定是否有父類如果沒有則不建立父子關係,如果有父類則需要確保父Job
已經被啟動,然後通過attachChild()
函式將子Job
新增到父Job
中,這樣就完成了父子關係的建立。
4. 父子關係建立後如何取消結構化的執行
因為是一個樹結構因此協程的取消以及異常的傳播都是按照這個結構進行傳遞。當取消Job
時都會通知自己的父Job
和子Job
,取消子Job
最終是以遞迴的方式傳遞給每一個Job
。協程在向上取消父Job
時通過責任鏈模式一步一步的傳遞到最頂層的協程,同時如果子Job
產生CancellationException
異常時父Job
會將其忽略,如果是其他異常父Job
則會響應這個異常。對於CancellationException
引起的取消只會向下傳遞取消子協程;對於其他異常引起的取消既向上傳遞也向下傳遞,最終會使所有的協程被取消。