日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

协程的挂起、恢复和调度的原理 (二)

發布時間:2024/2/28 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 协程的挂起、恢复和调度的原理 (二) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

  • 一. 協程的掛起、恢復和調度的設計思想
  • 二. 深入解析協程
    • 1. 協程的創建與啟動
    • 2. 協程的線程調度
    • 3. 協程的掛起和恢復
    • 4. 不同 resumeWith 的解析
    • 5. 協程整體結構

被 suspend 修飾符修飾的函數在編譯期間會被編譯器做特殊處理:CPS(續體傳遞風格)變換,它會改變掛起函數的函數簽名。

suspend fun <T> CompletableFuture<T>.await(): T

會轉變成

fun <T> CompletableFuture<T>.await(continuation: Continuation<T>): Any?

編譯器對掛起函數的第一個改變就是對函數簽名的改變,這種改變被稱為 CPS(續體傳遞風格)變換。

我們可以看到,函數變換之后多了一個參數Continuation,聲明如下:

interface Continuation<in T> {val context: CoroutineContextfun resumeWith(result: Result<T>) }

Continuation 包裝了協程在掛起之后應該繼續執行的代碼;在編譯的過程中,一個完整的協程可能會有多個掛起點 (suspension point) , 掛起點把協程分割切塊成一個又一個續體。在 await 函數的掛起結束以后,它會調用 continuation 參數的 resumeWith 函數,來恢復執行 await 函數后面的代碼。

值得一提的是,除了會返回一個本身的返回值,還會返回一個標記,COROUTINE_SUSPENDED,返回它的掛起函數表示這個掛起函數會發生事實上的掛起操作。什么叫事實上的掛起操作呢?比如:

launch {val deferred = async {// 發起了一個網絡請求......}// 做了一些操作......deferred.await()// 后續的一些操作...... }

在 deferred.await() 這行執行的時候,如果網絡請求已經取得了結果,那 await 函數會直接取得結果,而不會事實上的掛起協程。

明白了這么多概念之后,我們看看一個具體的例子:

val a = a() val y = foo(a).await() // 掛起點 #1 b() val z = bar(a, y).await() // 掛起點 #2 c(z)

這里有兩個掛起點,編譯后可以看到生成的偽字節碼:

class <anonymous_for_state_machine> extends SuspendLambda<...> {// 狀態機當前狀態int label = 0// 協程的局部變量A a = nullY y = nullvoid resumeWith(Object result) {if (label == 0) goto L0if (label == 1) goto L1if (label == 2) goto L2else throw IllegalStateException()L0:a = a()label = 1// 'this' 作為續體傳遞result = foo(a).await(this)// 如果 await 掛起了執行則返回if (result == COROUTINE_SUSPENDED) returnL1:// 外部代碼調用resumeWith y = (Y) resultb()label = 2result = bar(a, y).await(this)if (result == COROUTINE_SUSPENDED) return L2:Z z = (Z) resultc(z)// label = -1 代表已經沒有其他的步驟了label = -1return} }

在這段偽代碼中,我們很容易理解它的實現邏輯:L0 代表掛起點1之前的續體,首先goto L0開始,直到調用掛起點1的 result = foo(a).await(this) 方法,this就是續體,如果 await 沒掛起,直接使用結果跳入L1中;如果掛起了則直接返回,await 方法執行完后,調用 await 方法體中的 Continuation 對象,調用它的 resumeWith ,goto L1,依次類推。

其中 label 記錄了狀態,這也被稱為狀態機的實現方式。

到這里,大家可能不清楚,為什么協程剛開始就進入resumeWith方法呢?別著急,后面會提到為什么。

上面只是簡單介紹以下協程的實現原理,介紹了以下相關的概念:CPS、續體、掛起點、狀態機等,具體如何如何實現,必須深入源碼去了解。

先從一個簡單的創建方法CoroutineScope.launch開始:

public fun CoroutineScope.launch(context: CoroutineContext = EmptyCoroutineContext,start: CoroutineStart = CoroutineStart.DEFAULT,block: suspend CoroutineScope.() -> Unit ): Job {...coroutine.start(start, coroutine, block)return coroutine }

coroutine.start(start, coroutine, block) 這里會根據start屬性決定初始化何種協程對象:

public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>) =when (this) {CoroutineStart.DEFAULT -> block.startCoroutineCancellable(completion)CoroutineStart.ATOMIC -> block.startCoroutine(completion)CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(completion)CoroutineStart.LAZY -> Unit // will start lazily}

我們直接從默認的CoroutineStart.DEFAULT入手,其最終會調用到createCoroutineUnintercepted:

// his function creates a new, fresh instance of suspendable computation every time it is invoked. // To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance. public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(completion: Continuation<T> ): Continuation<Unit> { ... }

這里貼了一下注釋,意思是創建一個可掛起的協程,啟動時調用返回對象Continuation的resume(Unit)方法,這個方法是它的內聯擴展方法:

public inline fun <T> Continuation<T>.resume(value: T): Unit =resumeWith(Result.success(value))

這里調用的其實就是Continuation接口的resumeWith方法。

所以協程創建出來時就會去調用是Continuation接口的resumeWith方法。這就解釋了上文的流程圖為什么從resumeWith開始。

我們從 launch 創建協程調用的 startCoroutineCancellable 開始;

internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
  • createCoroutineUnintercepted(completion) 會創建一個新的協程,返回值類型為 Continuation
  • intercepted() 是給 Continuation 加上 ContinuationInterceptor 攔截器,也是線程調度的關鍵
  • resumeCancellable(Unit) 最終將調用 resume(Unit) 啟動協程

我們來看一下intercepted()的具體實現:

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =(this as? ContinuationImpl)?.intercepted() ?: this // ContinuationImpl 是 SuspendLambda 的父類 internal abstract class ContinuationImpl(...) : BaseContinuationImpl(completion) {@Transientprivate var intercepted: Continuation<Any?>? = nullpublic fun intercepted(): Continuation<Any?> =intercepted?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this).also { intercepted = it } }

context[ContinuationInterceptor]?.interceptContinuation(this) 就是利用上下文對象 context 得到 CoroutineDispatcher,會使用協程的CoroutineDispatcher的interceptContinuation 方法:

public abstract class CoroutineDispatcher :AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =DispatchedContinuation(this, continuation) }

interceptContinuation 方法中使用 DispatchedContinuation類 包裝原來的 Continuation,攔截所有的協程運行操作:

internal class DispatchedContinuation<in T>(@JvmField val dispatcher: CoroutineDispatcher,@JvmField val continuation: Continuation<T> ) : Continuation<T> by continuation, DispatchedTask<T> {inline fun resumeCancellable(value: T) {// 判斷是否需要線程調度if (dispatcher.isDispatchNeeded(context)) {...// 將協程的運算分發到另一個線程dispatcher.dispatch(context, this)} else {...// 如果不需要調度,直接在當前線程執行協程運算resumeUndispatched(value)}}override fun resumeWith(result: Result<T>) {// 判斷是否需要線程調度if (dispatcher.isDispatchNeeded(context)) {...// 將協程的運算分發到另一個線程dispatcher.dispatch(context, this)} else {...// 如果不需要調度,直接在當前線程執行協程運算continuation.resumeWith(result)}} }internal interface DispatchedTask<in T> : Runnable {public override fun run() {// 任務的執行最終來到這里,這里封裝了 continuation.resume 邏輯} }

總結: 協程的調度是通過 CoroutineDispatcher 的 interceptContinuation 方法來包裝原來的 Continuation 為 DispatchedContinuation,來攔截每個續體的運行操作,DispatchedContinuation 攔截了協程的啟動和恢復,分別是 resumeCancellable(Unit) 和重寫的 resumeWith(Result),然后通過 CoroutineDispatcher 的 dispatch 分發協程的運算任務,最終調用到DispatchedTask 這個 Runnable。

我們先來看一下掛起,看一個例子:

fun main(args: Array<String>) = runBlocking<Unit> { launch(Dispatchers.Unconfined) { println("${Thread.currentThread().name} : launch start")async(Dispatchers.Default) { println("${Thread.currentThread().name} : async start")delay(100) println("${Thread.currentThread().name} : async end")}.await() println("${Thread.currentThread().name} : launch end")} }

async在delay函數中被掛起,我們來看一下launch函數內反編譯得到的代碼:

public final Object invokeSuspend(@NotNull Object result) {Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();switch (this.label) {case 0:...System.out.println(stringBuilder.append(currentThread.getName()).append(" : launch start").toString());// 新建并啟動 async 協程 Deferred async$default = BuildersKt.async$default(coroutineScope, (CoroutineContext) Dispatchers.getDefault(), null, (Function2) new 1(null), 2, null);this.label = 1;// 調用 await() 掛起函數if (async$default.await(this) == coroutine_suspended) {return coroutine_suspended;}break;case 1:// 恢復協程后再執行一次 resumeWith(),然后無異常的話跳出if (result instanceof Failure) {throw ((Failure) result).exception;}break;default:throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");}...System.out.println(stringBuilder2.append(currentThread2.getName()).append(" : launch end").toString());return Unit.INSTANCE; }

上面代碼最關鍵的地方在于 async$default.await(this) == coroutine_suspended , 如果async線程未執行完成,那么await()返回為IntrinsicsKt.getCOROUTINE_SUSPENDED(),就會 return,然后async所在的線程就會繼續執行。當恢復該協程后再執行一次 resumeWith(),調用invokeSuspend(),

總結:協程掛起實際上就是協程掛起點之前的邏輯執行完,然后判斷是否是事實上的掛起,如果掛起了則返回,等待掛起函數執行完成,完成后調用resumeWith恢復協程,繼續執行該協程下面的代碼。

我們再來看一下協程怎么恢復:

我們來看一下await()的代碼,關鍵點在于,實現了一個CompletableDeferredImple對象,調用了 JobSupport.awaitSuspend() 方法

private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->val cont = AwaitContinuation(uCont.intercepted(), this)cont.initCancellability()invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler)cont.getResult() }

在這里,將 launch(this) 協程封裝為 ResumeAwaitOnCompletion 作為 handler 節點。

在方法 invokeOnCompletion 中:

// handler 就是 ResumeAwaitOnCompletion 的實例,將 handler 作為節點 val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it } // 將 node 節點添加到 state.list 中 if (!addLastAtomic(state, list, node)) return@loopOnState // retry

這里將 handler 節點添加到 aynsc 協程的 state.list 中,然后在 async 協程完成時會通知 handler 節點調用 launch 協程的 resume(result) 方法將結果傳給 launch 協程。

事實上,handler節點完成到launch恢復的過程也是比較復雜的,這里可以通過斷點調試查看調用的過程:

從 async 協程的 SuspendLambda 的子類 BaseContinuationImpl 的completion.resumeWith(outcome) -> AbstractCoroutine.resumeWith(result) …-> JobSupport.tryFinalizeSimpleState() -> JobSupport.completeStateFinalization() -> state.list?.notifyCompletion(cause) -> node.invoke,最后 handler 節點里面通過調用resume(result)恢復協程。

總結:所以await()掛起函數恢復協程的原理是,將 launch 協程封裝為 ResumeAwaitOnCompletion 作為 handler 節點添加到 aynsc 協程的 state.list,然后在 async 協程完成時會通知 handler 節點,最終會調用 launch 協程的 resume(result) 方法將結果傳給 launch 協程,并恢復 launch 協程繼續執行 await 掛起點之后的邏輯。

值得一提的是,續體completion有兩種不一樣的實現方式,分別是BaseContinuationImpl和AbstractCoroutine,它們的resumeWith執行著不一樣的邏輯,先來看BaseContinuationImpl:

internal abstract class BaseContinuationImpl(public val completion: Continuation<Any?>? ) : Continuation<Any?>, CoroutineStackFrame, Serializable {public final override fun resumeWith(result: Result<Any?>) {...var param = resultwhile (true) {with(current) {val completion = completion!!val outcome: Result<Any?> =try {// 調用 invokeSuspend 方法執行,執行協程的真正運算邏輯val outcome = invokeSuspend(param)// 協程掛起時 invokeSuspend 才會返回 COROUTINE_SUSPENDED,所以協程掛起時,先return,再次調用 resumeWith 時,協程掛起點之后的邏輯才能繼續執行if (outcome === COROUTINE_SUSPENDED) returnResult.success(outcome)} catch (exception: Throwable) {Result.failure(exception)}releaseIntercepted() // 這里可以看出 Continuation 其實分為兩類,一種是 BaseContinuationImpl,封裝了協程的真正運算邏輯if (completion is BaseContinuationImpl) {// unrolling recursion via loopcurrent = completionparam = outcome} else {// 這里實際調用的是其父類 AbstractCoroutine 的 resumeWith 方法completion.resumeWith(outcome)return}}}}

看一下AbstractCoroutine 的resumeWith實現:

public final override fun resumeWith(result: Result<T>) {makeCompletingOnce(result.toState(), defaultResumeMode)}/** * Returns:* * `true` if state was updated to completed/cancelled;* * `false` if made completing or it is cancelling and is waiting for children.*/internal fun makeCompletingOnce(proposedUpdate: Any?, mode: Int): Boolean = loopOnState { state ->when (tryMakeCompleting(state, proposedUpdate, mode)) {COMPLETING_ALREADY_COMPLETING -> throw IllegalStateException("Job $this is already complete or completing, " +"but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull)COMPLETING_COMPLETED -> return trueCOMPLETING_WAITING_CHILDREN -> return falseCOMPLETING_RETRY -> return@loopOnStateelse -> error("unexpected result")}}

可以看到 BaseContinuationImpl 的 resumeWith 封裝了協程的運算邏輯,而 AbstractCoroutine 的 resumeWith 主要用來管理協程的狀態。

從上面的協程執行流程,我們可以梳理一下協程的整體結構;


其中最上層的DispatcherContinuation負責協程的調度邏輯,第二層的BaseContinuaImpl的 invokeSuspend 封裝了協程真正的運算邏輯,AbstractCoroutine封裝了協程的狀態(Job,deferred)。

總結

以上是生活随笔為你收集整理的协程的挂起、恢复和调度的原理 (二)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。