忠于职守 —— sysmon 线程到底做了什么?(九)
在 runtime.main() 函數中,執行 runtime_init() 前,會啟動一個 sysmon 的監控線程,執行后臺監控任務:
systemstack(func() { // 創建監控線程,該線程獨立于調度器,不需要跟 p 關聯即可運行 newm(sysmon, nil) })sysmon 函數不依賴 P 直接執行,通過 newm 函數創建一個工作線程:
func newm(fn func(), _p_ *p) { // 創建 m 對象 mp := allocm(_p_, fn) // 暫存 m mp.nextp.set(_p_) mp.sigmask = initSigmask // …………………… execLock.rlock() // Prevent process clone. // 創建系統線程 newosproc(mp, unsafe.Pointer(mp.g0.stack.hi)) execLock.runlock() }先調用 allocm 在堆上創建一個 m,接著調用 newosproc 函數啟動一個工作線程:
// src/runtime/os_linux.go //go:nowritebarrier func newosproc(mp *m, stk unsafe.Pointer) { // …………………… ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart))) // …………………… }核心就是調用 clone 函數創建系統線程,新線程從 mstart 函數開始執行。clone 函數由匯編語言實現:
// int32 clone(int32 flags, void *stk, M *mp, G *gp, void (*fn)(void)); TEXT runtime·clone(SB),NOSPLIT,$0 // 準備系統調用的參數 MOVL flags+0(FP), DI MOVQ stk+8(FP), SI MOVQ $0, DX MOVQ $0, R10 // 將 mp,gp,fn 拷貝到寄存器,對子線程可見 MOVQ mp+16(FP), R8 MOVQ gp+24(FP), R9 MOVQ fn+32(FP), R12 // 系統調用 clone MOVL $56, AX SYSCALL // In parent, return. CMPQ AX, $0 JEQ 3(PC) // 父線程,返回 MOVL AX, ret+40(FP) RET // In child, on new stack. // 在子線程中。設置 CPU 棧頂寄存器指向子線程的棧頂 MOVQ SI, SP // If g or m are nil, skip Go-related setup. CMPQ R8, $0 // m JEQ nog CMPQ R9, $0 // g JEQ nog // Initialize m->procid to Linux tid // 通過 gettid 系統調用獲取線程 ID(tid) MOVL $186, AX // gettid SYSCALL // 設置 m.procid = tid MOVQ AX, m_procid(R8) // Set FS to point at m->tls. // 新線程剛剛創建出來,還未設置線程本地存儲,即 m 結構體對象還未與工作線程關聯起來, // 下面的指令負責設置新線程的 TLS,把 m 對象和工作線程關聯起來 LEAQ m_tls(R8), DI CALL runtime·settls(SB) // In child, set up new stack get_tls(CX) MOVQ R8, g_m(R9) // g.m = m MOVQ R9, g(CX) // tls.g = &m.g0 CALL runtime·stackcheck(SB) nog: // Call fn // 調用 mstart 函數。永不返回 CALL R12 // It shouldn't return. If it does, exit that thread. MOVL $111, DI MOVL $60, AX SYSCALL JMP -3(PC) // keep exiting先是為 clone 系統調用準備參數,參數通過寄存器傳遞。第一個參數指定內核創建線程時的選項,第二個參數指定新線程應該使用的棧,這兩個參數都是通過 newosproc 函數傳遞進來的。
接著將 m, g0, fn 分別保存到寄存器中,待子線程創建好后再拿出來使用。因為這些參數此時是在父線程的棧上,若不保存到寄存器中,子線程就取不出來了。
這個幾個參數保存在父線程的寄存器中,創建子線程時,操作系統內核會把父線程所有的寄存器幫我們復制一份給子線程,所以當子線程開始運行時就能拿到父線程保存在寄存器中的值,從而拿到這幾個參數。
之后,調用 clone 系統調用,內核幫我們創建出了一個子線程。相當于原來的一個執行分支現在變成了兩個執行分支,于是會有兩個返回。這和著名的 fork 系統調用類似,根據返回值來判斷現在是處于父線程還是子線程。
如果是父線程,就直接返回了。如果是子線程,接著還要執行一堆操作,例如設置 tls,設置 m.procid 等等。
最后執行 mstart 函數,這是在 newosproc 函數傳遞進來的。mstart 函數再調用 mstart1,在 mstart1 里會執行這一行:
// 執行啟動函數。初始化過程中,fn == nil if fn := _g_.m.mstartfn; fn != nil { fn() }之前我們在講初始化的時候,這里的 fn 是空,會跳過的。但在這里,fn 就是最開始在 runtime.main 里設置的 sysmon 函數,因此這里會執行 sysmon,而它又是一個無限循環,永不返回。
所以,這里不會執行到 mstart1 函數后面的 schedule 函數,也就不會進入 schedule 循環。因此這是一個不用和 p 結合的 m,它直接在后臺執行,默默地執行監控任務。
接下來,我們就來看 sysmon 函數到底做了什么?
sysmon 執行一個無限循環,一開始每次循環休眠 20us,之后(1 ms 后)每次休眠時間倍增,最終每一輪都會休眠 10ms。
sysmon 中會進行 netpool(獲取 fd 事件)、retake(搶占)、forcegc(按時間強制執行 gc),scavenge heap(釋放自由列表中多余的項減少內存占用)等處理。
和調度相關的,我們只關心 retake 函數:
func retake(now int64) uint32 { n := 0 // 遍歷所有的 p for i := int32(0); i < gomaxprocs; i++ { _p_ := allp[i] if _p_ == nil { continue } // 用于 sysmon 線程記錄被監控 p 的系統調用時間和運行時間 pd := &_p_.sysmontick // p 的狀態 s := _p_.status if s == _Psyscall { // P 處于系統調用之中,需要檢查是否需要搶占 // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us). // _p_.syscalltick 用于記錄系統調用的次數,在完成系統調用之后加 1 t := int64(_p_.syscalltick) if int64(pd.syscalltick) != t { // pd.syscalltick != _p_.syscalltick,說明已經不是上次觀察到的系統調用了, // 而是另外一次系統調用,所以需要重新記錄 tick 和 when 值 pd.syscalltick = uint32(t) pd.syscallwhen = now continue } // 只要滿足下面三個條件中的任意一個,則搶占該 p,否則不搶占 // 1. p 的運行隊列里面有等待運行的 goroutine // 2. 沒有無所事事的 p // 3. 從上一次監控線程觀察到 p 對應的 m 處于系統調用之中到現在已經超過 10 毫秒 if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } incidlelocked(-1) if atomic.Cas(&_p_.status, s, _Pidle) { // …………………… n++ _p_.syscalltick++ // 尋找一新的 m 接管 p handoffp(_p_) } incidlelocked(1) } else if s == _Prunning { // P 處于運行狀態,檢查是否運行得太久了 // Preempt G if it's running for too long. // 每發生一次調度,調度器 ++ 該值 t := int64(_p_.schedtick) if int64(pd.schedtick) != t { pd.schedtick = uint32(t) pd.schedwhen = now continue } //pd.schedtick == t 說明(pd.schedwhen ~ now)這段時間未發生過調度 // 這段時間是同一個goroutine一直在運行,檢查是否連續運行超過了 10 毫秒 if pd.schedwhen+forcePreemptNS > now { continue } // 連續運行超過 10 毫秒了,發起搶占請求 preemptone(_p_) } } return uint32(n) }從代碼來看,主要會對處于 _Psyscall 和 _Prunning 狀態的 p 進行搶占。
搶占進行系統調用的 P
當 P 處于 _Psyscall 狀態時,表明對應的 goroutine 正在進行系統調用。如果搶占 p,需要滿足幾個條件:
p 的本地運行隊列里面有等待運行的 goroutine。這時 p 綁定的 g 正在進行系統調用,無法去執行其他的 g,因此需要接管 p 來執行其他的 g。
沒有“無所事事”的 p。sched.nmspinning 和 sched.npidle 都為 0,這就意味著沒有“找工作”的 m,也沒有空閑的 p,大家都在“忙”,可能有很多工作要做。因此要搶占當前的 p,讓它來承擔一部分工作。
從上一次監控線程觀察到 p 對應的 m 處于系統調用之中到現在已經超過 10 毫秒。這說明系統調用所花費的時間較長,需要對其進行搶占,以此來使得 retake 函數返回值不為 0,這樣,會保持 sysmon 線程 20 us 的檢查周期,提高 sysmon 監控的實時性。
注意,原代碼是用的三個與條件,三者都要滿足才會執行下面的 continue,也就是不進行搶占。因此要想進行搶占的話,只需要三個條件有一個不滿足就行了。于是就有了上述三種情況。
確定要搶占當前 p 后,先使用原子操作將 p 的狀態修改為 _Pidle,最后調用 handoffp 進行搶占。
func handoffp(_p_ *p) { // 如果 p 本地有工作或者全局有工作,需要綁定一個 m if !runqempty(_p_) || sched.runqsize != 0 { startm(_p_, false) return } // …………………… // 所有其它 p 都在運行 goroutine,說明系統比較忙,需要啟動 m if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic // p 沒有本地工作,啟動一個自旋 m 來找工作 startm(_p_, true) return } lock(&sched.lock) // …………………… // 全局隊列有工作 if sched.runqsize != 0 { unlock(&sched.lock) startm(_p_, false) return } // …………………… // 沒有工作要處理,把 p 放入全局空閑隊列 pidleput(_p_) unlock(&sched.lock) }handoffp 再次進行場景判斷,以調用 startm 啟動一個工作線程來綁定 p,使得整體工作繼續推進。
當 p 的本地運行隊列或全局運行隊列里面有待運行的 goroutine,說明還有很多工作要做,調用 startm(_p_,false) 啟動一個 m 來結合 p,繼續工作。
當除了當前的 p 外,其他所有的 p 都在運行 goroutine,說明天下太平,每個人都有自己的事做,唯獨自己沒有。為了全局更快地完成工作,需要啟動一個 m,且要使得 m 處于自旋狀態,和 p 結合之后,盡快找到工作。
最后,如果實在沒有工作要處理,就將 p 放入全局空閑隊列里。
我們接著來看 startm 函數都做了些什么:
// runtime/proc.go // // 調用 m 來綁定 p,如果沒有 m,那就新建一個 // 如果 p 為空,那就嘗試獲取一個處于空閑狀態的 p,如果找到 p,那就什么都不做 func startm(_p_ *p, spinning bool) { lock(&sched.lock) if _p_ == nil { // 沒有指定 p 則需要從全局空閑隊列中獲取一個 p _p_ = pidleget() if _p_ == nil { unlock(&sched.lock) if spinning { // 如果找到 p,放棄。還原全局處于自旋狀態的 m 的數量 if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("startm: negative nmspinning") } } // 沒有空閑的 p,直接返回 return } } // 從 m 空閑隊列中獲取正處于睡眠之中的工作線程, // 所有處于睡眠狀態的 m 都在此隊列中 mp := mget() unlock(&sched.lock) if mp == nil { // 如果沒有找到 m var fn func() if spinning { // The caller incremented nmspinning, so set m.spinning in the new M. fn = mspinning } // 創建新的工作線程 newm(fn, _p_) return } if mp.spinning { throw("startm: m is spinning") } if mp.nextp != 0 { throw("startm: m has p") } if spinning && !runqempty(_p_) { throw("startm: p has runnable gs") } // The caller incremented nmspinning, so set m.spinning in the new M. mp.spinning = spinning // 設置 m 馬上要結合的 p mp.nextp.set(_p_) // 喚醒 m notewakeup(&mp.park) }首先處理 p 為空的情況,直接從全局空閑 p 隊列里找,如果沒找到,則直接返回。如果設置了 spinning 為 true 的話,還需要還原全局的處于自旋狀態的 m 的數值:&sched.nmspinning 。
搞定了 p,接下來看 m。先調用 mget 函數從全局空閑的 m 隊列里獲取一個 m,如果沒找到 m,則要調用 newm 新創建一個 m,并且如果設置了 spinning 為 true 的話,先要設置好 mstartfn:
func mspinning() { // startm's caller incremented nmspinning. Set the new M's spinning. getg().m.spinning = true }這樣,啟動 m 后,在 mstart1 函數里,進入 schedule 循環前,執行 mstartfn 函數,使得 m 處于自旋狀態。
接下來是正常情況下(找到了 p 和 m)的處理:
mp.spinning = spinning // 設置 m 馬上要結合的 p mp.nextp.set(_p_) // 喚醒 m notewakeup(&mp.park)設置 nextp 為找到的 p,調用 notewakeup 喚醒 m。之前我們講 findrunnable 函數的時候,對于最后沒有找到工作的 m,我們調用 notesleep(&_g_.m.park),使得 m 進入睡眠狀態?,F在終于有工作了,需要老將出山,將其喚醒:
// src/runtime/lock_futex.go func notewakeup(n *note) { // 設置 n.key = 1, 被喚醒的線程通過查看該值是否等于 1 // 來確定是被其它線程喚醒還是意外從睡眠中蘇醒 old := atomic.Xchg(key32(&n.key), 1) if old != 0 { print("notewakeup - double wakeup (", old, ")\n") throw("notewakeup - double wakeup") } futexwakeup(key32(&n.key), 1) }notewakeup 函數首先使用 atomic.Xchg 設置 note.key 值為 1,這是為了使被喚醒的線程可以通過查看該值是否等于 1 來確定是被其它線程喚醒還是意外從睡眠中蘇醒了過來。
如果該值為 1 則表示是被喚醒的,可以繼續工作,但如果該值為 0 則表示是意外蘇醒,需要再次進入睡眠。
調用 futexwakeup 來喚醒工作線程,它和 futexsleep 是相對的。
func futexwakeup(addr *uint32, cnt uint32) { // 調用 futex 函數喚醒工作線程 ret := futex(unsafe.Pointer(addr), _FUTEX_WAKE, cnt, nil, nil, 0) if ret >= 0 { return } // …………………… }futex 由匯編語言實現,前面已經分析過,這里就不重復了。主要內容就是先準備好參數,然后進行系統調用,由內核喚醒線程。
內核在完成喚醒工作之后當前工作線程從內核返回到 futex 函數繼續執行 SYSCALL 指令之后的代碼并按函數調用鏈原路返回,繼續執行其它代碼。
而被喚醒的工作線程則由內核負責在適當的時候調度到 CPU 上運行。
搶占長時間運行的 P
我們知道,Go scheduler 采用的是一種稱為協作式的搶占式調度,就是說并不強制調度,大家保持協作關系,互相信任。對于長時間運行的 P,或者說綁定在 P 上的長時間運行的 goroutine,sysmon 會檢測到這種情況,然后設置一些標志,表示 goroutine 自己讓出 CPU 的執行權,給其他 goroutine 一些機會。
接下來我們就來分析當 P 處于 _Prunning 狀態的情況。sysmon 掃描每個 p 時,都會記錄下當前調度器調度的次數和當前時間,數據記錄在結構體:
type sysmontick struct { schedtick uint32 schedwhen int64 syscalltick uint32 syscallwhen int64 }前面兩個字段記錄調度器調度的次數和時間,后面兩個字段記錄系統調用的次數和時間。
在下一次掃描時,對比 sysmon 記錄下的 p 的調度次數和時間,與當前 p 自己記錄下的調度次數和時間對比,如果一致。說明 P 在這一段時間內一直在運行同一個 goroutine。那就來計算一下運行時間是否太長了。
如果發現運行時間超過了 10 ms,則要調用 preemptone(_p_) 發起搶占的請求:
func preemptone(_p_ *p) bool { mp := _p_.m.ptr() if mp == nil || mp == getg().m { return false } // 被搶占的 goroutine gp := mp.curg if gp == nil || gp == mp.g0 { return false } // 設置搶占標志 gp.preempt = true // 在 goroutine 內部的每次調用都會比較棧頂指針和 g.stackguard0, // 來判斷是否發生了棧溢出。stackPreempt 非常大的一個數,比任何棧都大 // stackPreempt = 0xfffffade gp.stackguard0 = stackPreempt return true }基本上只是將 stackguard0 設置了一個很大的值,而檢查 stackguard0 的地方在函數調用前的一段匯編代碼里進行。
舉一個簡單的例子:
package main import "fmt" func main() { fmt.Println("hello qcrao.com!") }執行命令:
go tool compile -S main.go得到匯編代碼:
"".main STEXT size=120 args=0x0 locals=0x48 0x0000 00000 (test26.go:5) TEXT "".main(SB), $72-0 0x0000 00000 (test26.go:5) MOVQ (TLS), CX 0x0009 00009 (test26.go:5) CMPQ SP, 16(CX) 0x000d 00013 (test26.go:5) JLS 113 0x000f 00015 (test26.go:5) SUBQ $72, SP 0x0013 00019 (test26.go:5) MOVQ BP, 64(SP) 0x0018 00024 (test26.go:5) LEAQ 64(SP), BP 0x001d 00029 (test26.go:5) FUNCDATA $0, gclocals·69c1753bd5f81501d95132d08af04464(SB) 0x001d 00029 (test26.go:5) FUNCDATA $1, gclocals·e226d4ae4a7cad8835311c6a4683c14f(SB) 0x001d 00029 (test26.go:6) MOVQ $0, ""..autotmp_0+48(SP) 0x0026 00038 (test26.go:6) MOVQ $0, ""..autotmp_0+56(SP) 0x002f 00047 (test26.go:6) LEAQ type.string(SB), AX 0x0036 00054 (test26.go:6) MOVQ AX, ""..autotmp_0+48(SP) 0x003b 00059 (test26.go:6) LEAQ "".statictmp_0(SB), AX 0x0042 00066 (test26.go:6) MOVQ AX, ""..autotmp_0+56(SP) 0x0047 00071 (test26.go:6) LEAQ ""..autotmp_0+48(SP), AX 0x004c 00076 (test26.go:6) MOVQ AX, (SP) 0x0050 00080 (test26.go:6) MOVQ $1, 8(SP) 0x0059 00089 (test26.go:6) MOVQ $1, 16(SP) 0x0062 00098 (test26.go:6) PCDATA $0, $1 0x0062 00098 (test26.go:6) CALL fmt.Println(SB) 0x0067 00103 (test26.go:7) MOVQ 64(SP), BP 0x006c 00108 (test26.go:7) ADDQ $72, SP 0x0070 00112 (test26.go:7) RET 0x0071 00113 (test26.go:7) NOP 0x0071 00113 (test26.go:5) PCDATA $0, $-1 0x0071 00113 (test26.go:5) CALL runtime.morestack_noctxt(SB) 0x0076 00118 (test26.go:5) JMP 0以前看這段代碼的時候會直接跳過前面的幾行代碼,看不懂。這次能看懂了!所以,那些暫時看不懂的,先放一放,沒關系,讓子彈飛一會兒,很多東西回過頭再來看就會豁然開朗,這就是一個很好的例子。
0x0000 00000 (test26.go:5) MOVQ (TLS), CX將本地存儲 tls 保存到 CX 寄存器中,(TLS)表示它所關聯的 g,這里就是前面所講到的 main gouroutine。
0x0009 00009 (test26.go:5) CMPQ SP, 16(CX)比較 SP 寄存器(代表當前 main goroutine 的棧頂寄存器)和 16(CX),我們看下 g 結構體:
type g struct { // goroutine 使用的棧 stack stack // offset known to runtime/cgo // 用于棧的擴張和收縮檢查 stackguard0 uintptr // offset known to liblink // …………………… }對象 g 的第一個字段是 stack 結構體:
type stack struct { lo uintptr hi uintptr }共 16 字節。而 16(CX) 表示 g 對象的第 16 個字節,跳過了 g 的第一個字段,也就是 g.stackguard0 字段。
如果 SP 小于 g.stackguard0,這是必然的,因為前面已經把 g.stackguard0 設置成了一個非常大的值,因此跳轉到了 113 行。
0x0071 00113 (test26.go:7) NOP 0x0071 00113 (test26.go:5) PCDATA $0, $-1 0x0071 00113 (test26.go:5) CALL runtime.morestack_noctxt(SB) 0x0076 00118 (test26.go:5) JMP 0調用 runtime.morestack_noctxt 函數:
// src/runtime/asm_amd64.s TEXT runtime·morestack_noctxt(SB),NOSPLIT,$0 MOVL $0, DX JMP runtime·morestack(SB)直接跳轉到 morestack 函數:
TEXT runtime·morestack(SB),NOSPLIT,$0-0 // Cannot grow scheduler stack (m->g0). get_tls(CX) // BX = g,g 表示 main goroutine MOVQ g(CX), BX // BX = g.m MOVQ g_m(BX), BX // SI = g.m.g0 MOVQ m_g0(BX), SI CMPQ g(CX), SI JNE 3(PC) CALL runtime·badmorestackg0(SB) INT $3 // …………………… // Set g->sched to context in f. // 將函數的返回地址保存到 AX 寄存器 MOVQ 0(SP), AX // f's PC // 將函數的返回地址保存到 g.sched.pc MOVQ AX, (g_sched+gobuf_pc)(SI) // g.sched.g = g MOVQ SI, (g_sched+gobuf_g)(SI) // 取地址操作符,調用 morestack_noctxt 之前的 rsp LEAQ 8(SP), AX // f's SP // 將 main 函數的棧頂地址保存到 g.sched.sp MOVQ AX, (g_sched+gobuf_sp)(SI) // 將 BP 寄存器保存到 g.sched.bp MOVQ BP, (g_sched+gobuf_bp)(SI) // newstack will fill gobuf.ctxt. // Call newstack on m->g0's stack. // BX = g.m.g0 MOVQ m_g0(BX), BX // 將 g0 保存到本地存儲 tls MOVQ BX, g(CX) // 把 g0 棧的棧頂寄存器的值恢復到 CPU 的寄存器 SP,達到切換棧的目的,下面這一條指令執行之前, // CPU 還是使用的調用此函數的 g 的棧,執行之后 CPU 就開始使用 g0 的棧了 MOVQ (g_sched+gobuf_sp)(BX), SP // 準備參數 PUSHQ DX // ctxt argument // 不返回 CALL runtime·newstack(SB) MOVQ $0, 0x1003 // crash if newstack returns POPQ DX // keep balance check happy RET主要做的工作就是將當前 goroutine,也就是 main goroutine 的和調度相關的信息保存到 g.sched 中,以便在調度到它執行時,可以恢復。
最后,將 g0 的地址保存到 tls 本地存儲,并且切到 g0 棧執行之后的代碼。繼續調用 newstack 函數:
func newstack(ctxt unsafe.Pointer) { // thisg = g0 thisg := getg() // …………………… // gp = main goroutine gp := thisg.m.curg // Write ctxt to gp.sched. We do this here instead of in // morestack so it has the necessary write barrier. gp.sched.ctxt = ctxt // …………………… morebuf := thisg.m.morebuf thisg.m.morebuf.pc = 0 thisg.m.morebuf.lr = 0 thisg.m.morebuf.sp = 0 thisg.m.morebuf.g = 0 // 檢查 g.stackguard0 是否被設置成搶占標志 preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt if preempt { if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning { // 還原 stackguard0 為正常值,表示我們已經處理過搶占請求了 gp.stackguard0 = gp.stack.lo + _StackGuard // 不搶占,調用 gogo 繼續運行當前這個 g,不需要調用 schedule 函數去挑選另一個 goroutine gogo(&gp.sched) // never return } } // …………………… if preempt { if gp == thisg.m.g0 { throw("runtime: preempt g0") } if thisg.m.p == 0 && thisg.m.locks == 0 { throw("runtime: g is running but p is not") } // Synchronize with scang. casgstatus(gp, _Grunning, _Gwaiting) // …………………… // Act like goroutine called runtime.Gosched. // 修改為 running,調度起來運行 casgstatus(gp, _Gwaiting, _Grunning) // 調用 gopreempt_m 把 gp 切換出去 gopreempt_m(gp) // never return } // …………………… }去掉了很多暫時還看不懂的地方,留到后面再研究。只關注有關搶占相關的。第一次判斷 preempt 標志是 true 時,檢查了 g 的狀態,發現不能搶占,例如它所綁定的 P 的狀態不是 _Prunning,那就恢復它的 stackguard0 字段,下次就不會走這一套流程了。然后,調用 gogo(&gp.sched) 繼續執行當前的 goroutine。
中間又處理了很多判斷流程,再次判斷 preempt 標志是 true 時,調用 gopreempt_m(gp) 將 gp 切換出去。
func gopreempt_m(gp *g) { if trace.enabled { traceGoPreempt() } goschedImpl(gp) }最終調用 goschedImpl 函數:
func goschedImpl(gp *g) { status := readgstatus(gp) if status&^_Gscan != _Grunning { dumpgstatus(gp) throw("bad g status") } // 更改 gp 的狀態 casgstatus(gp, _Grunning, _Grunnable) // 解除 m 和 g 的關系 dropg() lock(&sched.lock) // 將 gp 放入全局可運行隊列 globrunqput(gp) unlock(&sched.lock) // 進入新一輪的調度循環 schedule() }將 gp 的狀態改為 _Grunnable,放入全局可運行隊列,等待下次有 m 來全局隊列找工作時才能繼續運行,畢竟你已經運行這么長時間了,給別人一點機會嘛。
最后,調用 schedule() 函數進入新一輪的調度循環,會找出一個 goroutine 來運行,永不返回。
這樣,關于 sysmon 線程在關于調度這塊到底做了啥,我們已經回答完了。總結一下:
搶占處于系統調用的 P,讓其他 m 接管它,以運行其他的 goroutine。
將運行時間過長的 goroutine 調度出去,給其他 goroutine 運行的機會。
參考資料
【深入Golang之goroutine】http://www.opscoder.info/golang_goroutine.html【阿波張 工作線程的喚醒及創建】https://mp.weixin.qq.com/s/T9CDaNF5KUFjE_Z6YW7mRw總結
以上是生活随笔為你收集整理的忠于职守 —— sysmon 线程到底做了什么?(九)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 意犹未尽 —— GPM 的状态流转(十)
- 下一篇: 锲而不舍 —— M 是怎样找工作的?(八