协程的挂起、恢复和调度的原理 (二)
目錄
- 一. 協程的掛起、恢復和調度的設計思想
- 二. 深入解析協程
- 1. 協程的創建與啟動
- 2. 協程的線程調度
- 3. 協程的掛起和恢復
- 4. 不同 resumeWith 的解析
- 5. 協程整體結構
一. 協程的掛起、恢復和調度的設計思想
被 suspend 修飾符修飾的函數在編譯期間會被編譯器做特殊處理:CPS(續體傳遞風格)變換,它會改變掛起函數的函數簽名。
suspend fun <T> CompletableFuture<T>.await(): T會轉變成
fun <T> CompletableFuture<T>.await(continuation: Continuation<T>): Any?編譯器對掛起函數的第一個改變就是對函數簽名的改變,這種改變被稱為 CPS(續體傳遞風格)變換。
我們可以看到,函數變換之后多了一個參數Continuation,聲明如下:
interface Continuation<in T> {val context: CoroutineContextfun resumeWith(result: Result<T>) }Continuation 包裝了協程在掛起之后應該繼續執行的代碼;在編譯的過程中,一個完整的協程可能會有多個掛起點 (suspension point) , 掛起點把協程分割切塊成一個又一個續體。在 await 函數的掛起結束以后,它會調用 continuation 參數的 resumeWith 函數,來恢復執行 await 函數后面的代碼。
值得一提的是,除了會返回一個本身的返回值,還會返回一個標記,COROUTINE_SUSPENDED,返回它的掛起函數表示這個掛起函數會發生事實上的掛起操作。什么叫事實上的掛起操作呢?比如:
launch {val deferred = async {// 發起了一個網絡請求......}// 做了一些操作......deferred.await()// 后續的一些操作...... }在 deferred.await() 這行執行的時候,如果網絡請求已經取得了結果,那 await 函數會直接取得結果,而不會事實上的掛起協程。
明白了這么多概念之后,我們看看一個具體的例子:
val a = a() val y = foo(a).await() // 掛起點 #1 b() val z = bar(a, y).await() // 掛起點 #2 c(z)這里有兩個掛起點,編譯后可以看到生成的偽字節碼:
class <anonymous_for_state_machine> extends SuspendLambda<...> {// 狀態機當前狀態int label = 0// 協程的局部變量A a = nullY y = nullvoid resumeWith(Object result) {if (label == 0) goto L0if (label == 1) goto L1if (label == 2) goto L2else throw IllegalStateException()L0:a = a()label = 1// 'this' 作為續體傳遞result = foo(a).await(this)// 如果 await 掛起了執行則返回if (result == COROUTINE_SUSPENDED) returnL1:// 外部代碼調用resumeWith y = (Y) resultb()label = 2result = bar(a, y).await(this)if (result == COROUTINE_SUSPENDED) return L2:Z z = (Z) resultc(z)// label = -1 代表已經沒有其他的步驟了label = -1return} }在這段偽代碼中,我們很容易理解它的實現邏輯:L0 代表掛起點1之前的續體,首先goto L0開始,直到調用掛起點1的 result = foo(a).await(this) 方法,this就是續體,如果 await 沒掛起,直接使用結果跳入L1中;如果掛起了則直接返回,await 方法執行完后,調用 await 方法體中的 Continuation 對象,調用它的 resumeWith ,goto L1,依次類推。
其中 label 記錄了狀態,這也被稱為狀態機的實現方式。
到這里,大家可能不清楚,為什么協程剛開始就進入resumeWith方法呢?別著急,后面會提到為什么。
上面只是簡單介紹以下協程的實現原理,介紹了以下相關的概念:CPS、續體、掛起點、狀態機等,具體如何如何實現,必須深入源碼去了解。
二. 深入解析協程
1. 協程的創建與啟動
先從一個簡單的創建方法CoroutineScope.launch開始:
public fun CoroutineScope.launch(context: CoroutineContext = EmptyCoroutineContext,start: CoroutineStart = CoroutineStart.DEFAULT,block: suspend CoroutineScope.() -> Unit ): Job {...coroutine.start(start, coroutine, block)return coroutine }coroutine.start(start, coroutine, block) 這里會根據start屬性決定初始化何種協程對象:
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>) =when (this) {CoroutineStart.DEFAULT -> block.startCoroutineCancellable(completion)CoroutineStart.ATOMIC -> block.startCoroutine(completion)CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(completion)CoroutineStart.LAZY -> Unit // will start lazily}我們直接從默認的CoroutineStart.DEFAULT入手,其最終會調用到createCoroutineUnintercepted:
// his function creates a new, fresh instance of suspendable computation every time it is invoked. // To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance. public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(completion: Continuation<T> ): Continuation<Unit> { ... }這里貼了一下注釋,意思是創建一個可掛起的協程,啟動時調用返回對象Continuation的resume(Unit)方法,這個方法是它的內聯擴展方法:
public inline fun <T> Continuation<T>.resume(value: T): Unit =resumeWith(Result.success(value))這里調用的其實就是Continuation接口的resumeWith方法。
所以協程創建出來時就會去調用是Continuation接口的resumeWith方法。這就解釋了上文的流程圖為什么從resumeWith開始。
2. 協程的線程調度
我們從 launch 創建協程調用的 startCoroutineCancellable 開始;
internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)- createCoroutineUnintercepted(completion) 會創建一個新的協程,返回值類型為 Continuation
- intercepted() 是給 Continuation 加上 ContinuationInterceptor 攔截器,也是線程調度的關鍵
- resumeCancellable(Unit) 最終將調用 resume(Unit) 啟動協程
我們來看一下intercepted()的具體實現:
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =(this as? ContinuationImpl)?.intercepted() ?: this // ContinuationImpl 是 SuspendLambda 的父類 internal abstract class ContinuationImpl(...) : BaseContinuationImpl(completion) {@Transientprivate var intercepted: Continuation<Any?>? = nullpublic fun intercepted(): Continuation<Any?> =intercepted?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this).also { intercepted = it } }context[ContinuationInterceptor]?.interceptContinuation(this) 就是利用上下文對象 context 得到 CoroutineDispatcher,會使用協程的CoroutineDispatcher的interceptContinuation 方法:
public abstract class CoroutineDispatcher :AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =DispatchedContinuation(this, continuation) }interceptContinuation 方法中使用 DispatchedContinuation類 包裝原來的 Continuation,攔截所有的協程運行操作:
internal class DispatchedContinuation<in T>(@JvmField val dispatcher: CoroutineDispatcher,@JvmField val continuation: Continuation<T> ) : Continuation<T> by continuation, DispatchedTask<T> {inline fun resumeCancellable(value: T) {// 判斷是否需要線程調度if (dispatcher.isDispatchNeeded(context)) {...// 將協程的運算分發到另一個線程dispatcher.dispatch(context, this)} else {...// 如果不需要調度,直接在當前線程執行協程運算resumeUndispatched(value)}}override fun resumeWith(result: Result<T>) {// 判斷是否需要線程調度if (dispatcher.isDispatchNeeded(context)) {...// 將協程的運算分發到另一個線程dispatcher.dispatch(context, this)} else {...// 如果不需要調度,直接在當前線程執行協程運算continuation.resumeWith(result)}} }internal interface DispatchedTask<in T> : Runnable {public override fun run() {// 任務的執行最終來到這里,這里封裝了 continuation.resume 邏輯} }總結: 協程的調度是通過 CoroutineDispatcher 的 interceptContinuation 方法來包裝原來的 Continuation 為 DispatchedContinuation,來攔截每個續體的運行操作,DispatchedContinuation 攔截了協程的啟動和恢復,分別是 resumeCancellable(Unit) 和重寫的 resumeWith(Result),然后通過 CoroutineDispatcher 的 dispatch 分發協程的運算任務,最終調用到DispatchedTask 這個 Runnable。
3. 協程的掛起和恢復
我們先來看一下掛起,看一個例子:
fun main(args: Array<String>) = runBlocking<Unit> { launch(Dispatchers.Unconfined) { println("${Thread.currentThread().name} : launch start")async(Dispatchers.Default) { println("${Thread.currentThread().name} : async start")delay(100) println("${Thread.currentThread().name} : async end")}.await() println("${Thread.currentThread().name} : launch end")} }async在delay函數中被掛起,我們來看一下launch函數內反編譯得到的代碼:
public final Object invokeSuspend(@NotNull Object result) {Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();switch (this.label) {case 0:...System.out.println(stringBuilder.append(currentThread.getName()).append(" : launch start").toString());// 新建并啟動 async 協程 Deferred async$default = BuildersKt.async$default(coroutineScope, (CoroutineContext) Dispatchers.getDefault(), null, (Function2) new 1(null), 2, null);this.label = 1;// 調用 await() 掛起函數if (async$default.await(this) == coroutine_suspended) {return coroutine_suspended;}break;case 1:// 恢復協程后再執行一次 resumeWith(),然后無異常的話跳出if (result instanceof Failure) {throw ((Failure) result).exception;}break;default:throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");}...System.out.println(stringBuilder2.append(currentThread2.getName()).append(" : launch end").toString());return Unit.INSTANCE; }上面代碼最關鍵的地方在于 async$default.await(this) == coroutine_suspended , 如果async線程未執行完成,那么await()返回為IntrinsicsKt.getCOROUTINE_SUSPENDED(),就會 return,然后async所在的線程就會繼續執行。當恢復該協程后再執行一次 resumeWith(),調用invokeSuspend(),
總結:協程掛起實際上就是協程掛起點之前的邏輯執行完,然后判斷是否是事實上的掛起,如果掛起了則返回,等待掛起函數執行完成,完成后調用resumeWith恢復協程,繼續執行該協程下面的代碼。
我們再來看一下協程怎么恢復:
我們來看一下await()的代碼,關鍵點在于,實現了一個CompletableDeferredImple對象,調用了 JobSupport.awaitSuspend() 方法
private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->val cont = AwaitContinuation(uCont.intercepted(), this)cont.initCancellability()invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler)cont.getResult() }在這里,將 launch(this) 協程封裝為 ResumeAwaitOnCompletion 作為 handler 節點。
在方法 invokeOnCompletion 中:
// handler 就是 ResumeAwaitOnCompletion 的實例,將 handler 作為節點 val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it } // 將 node 節點添加到 state.list 中 if (!addLastAtomic(state, list, node)) return@loopOnState // retry這里將 handler 節點添加到 aynsc 協程的 state.list 中,然后在 async 協程完成時會通知 handler 節點調用 launch 協程的 resume(result) 方法將結果傳給 launch 協程。
事實上,handler節點完成到launch恢復的過程也是比較復雜的,這里可以通過斷點調試查看調用的過程:
從 async 協程的 SuspendLambda 的子類 BaseContinuationImpl 的completion.resumeWith(outcome) -> AbstractCoroutine.resumeWith(result) …-> JobSupport.tryFinalizeSimpleState() -> JobSupport.completeStateFinalization() -> state.list?.notifyCompletion(cause) -> node.invoke,最后 handler 節點里面通過調用resume(result)恢復協程。
總結:所以await()掛起函數恢復協程的原理是,將 launch 協程封裝為 ResumeAwaitOnCompletion 作為 handler 節點添加到 aynsc 協程的 state.list,然后在 async 協程完成時會通知 handler 節點,最終會調用 launch 協程的 resume(result) 方法將結果傳給 launch 協程,并恢復 launch 協程繼續執行 await 掛起點之后的邏輯。
4. resumeWith解析
值得一提的是,續體completion有兩種不一樣的實現方式,分別是BaseContinuationImpl和AbstractCoroutine,它們的resumeWith執行著不一樣的邏輯,先來看BaseContinuationImpl:
internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>? ) : Continuation<Any?>, CoroutineStackFrame, Serializable {public final override fun resumeWith(result: Result<Any?>) {...var param = resultwhile (true) {with(current) {val completion = completion!!val outcome: Result<Any?> =try {// 調用 invokeSuspend 方法執行,執行協程的真正運算邏輯val outcome = invokeSuspend(param)// 協程掛起時 invokeSuspend 才會返回 COROUTINE_SUSPENDED,所以協程掛起時,先return,再次調用 resumeWith 時,協程掛起點之后的邏輯才能繼續執行if (outcome === COROUTINE_SUSPENDED) returnResult.success(outcome)} catch (exception: Throwable) {Result.failure(exception)}releaseIntercepted() // 這里可以看出 Continuation 其實分為兩類,一種是 BaseContinuationImpl,封裝了協程的真正運算邏輯if (completion is BaseContinuationImpl) {// unrolling recursion via loopcurrent = completionparam = outcome} else {// 這里實際調用的是其父類 AbstractCoroutine 的 resumeWith 方法completion.resumeWith(outcome)return}}}}看一下AbstractCoroutine 的resumeWith實現:
public final override fun resumeWith(result: Result<T>) {makeCompletingOnce(result.toState(), defaultResumeMode)}/** * Returns:* * `true` if state was updated to completed/cancelled;* * `false` if made completing or it is cancelling and is waiting for children.*/internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean = loopOnState { state ->when (tryMakeCompleting(state, proposedUpdate, mode)) {COMPLETING_ALREADY_COMPLETING -> throw IllegalStateException("Job $this is already complete or completing, " +"but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)COMPLETING_COMPLETED -> return trueCOMPLETING_WAITING_CHILDREN -> return falseCOMPLETING_RETRY -> return@loopOnStateelse -> error("unexpected result")}}可以看到 BaseContinuationImpl 的 resumeWith 封裝了協程的運算邏輯,而 AbstractCoroutine 的 resumeWith 主要用來管理協程的狀態。
5. 協程整體結構
從上面的協程執行流程,我們可以梳理一下協程的整體結構;
其中最上層的DispatcherContinuation負責協程的調度邏輯,第二層的BaseContinuaImpl的 invokeSuspend 封裝了協程真正的運算邏輯,AbstractCoroutine封裝了協程的狀態(Job,deferred)。
總結
以上是生活随笔為你收集整理的协程的挂起、恢复和调度的原理 (二)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kotlin协程简介(一)
- 下一篇: 快速排序的两种方法