锲而不舍 —— M 是怎样找工作的?(八)
在 schedule 函數(shù)中,我們簡(jiǎn)單提過(guò)找一個(gè) runnable goroutine 的過(guò)程,這一講我們來(lái)詳細(xì)分析源碼。
工作線程 M 費(fèi)盡心機(jī)也要找到一個(gè)可運(yùn)行的 goroutine,這是它的工作和職責(zé),不達(dá)目的,絕不罷體,這種鍥而不舍的精神值得每個(gè)人學(xué)習(xí)。
共經(jīng)歷三個(gè)過(guò)程:先從本地隊(duì)列找,定期會(huì)從全局隊(duì)列找,最后實(shí)在沒(méi)辦法,就去別的 P 偷。如下圖所示:
先看第一個(gè):從 P 本地隊(duì)列找。源碼如下:
// 從本地可運(yùn)行隊(duì)列里找到一個(gè) g // 如果 inheritTime 為真,gp 應(yīng)該繼承這個(gè)時(shí)間片, // 否則,新開(kāi)啟一個(gè)時(shí)間片 func runqget(_p_ *p) (gp *g, inheritTime bool) { // If there's a runnext, it's the next G to run. // 如果 runnext 不為空,則 runnext 是下一個(gè)待運(yùn)行的 G for { next := _p_.runnext if next == 0 { // 為空,則直接跳出循環(huán) break } // 再次比較 next 是否沒(méi)有變化 if _p_.runnext.cas(next, 0) { // 如果沒(méi)有變化,則返回 next 所指向的 g。且需要繼承時(shí)間片 return next.ptr(), true } } for { // 獲取隊(duì)列頭 h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with other consumers // 獲取隊(duì)列尾 t := _p_.runqtail if t == h { // 頭和尾相等,說(shuō)明本地隊(duì)列為空,找不到 g return nil, false } // 獲取隊(duì)列頭的 g gp := _p_.runq[h%uint32(len(_p_.runq))].ptr() // 原子操作,防止這中間被其他線程因?yàn)橥倒ぷ鞫薷?if atomic.Cas(&_p_.runqhead, h, h+1) { // cas-release, commits consume return gp, false } } }整個(gè)源碼結(jié)構(gòu)比較簡(jiǎn)單,主要是兩個(gè) for 循環(huán)。
第一個(gè) for 循環(huán)嘗試返回 P 的 runnext 成員,因?yàn)?runnext 具有最高的運(yùn)行優(yōu)先級(jí),因此要首先嘗試獲取 runnext。當(dāng)發(fā)現(xiàn) runnext 為空時(shí),直接跳出循環(huán),進(jìn)入第二個(gè)。否則,用原子操作獲取 runnext,并將其值修改為 0,也就是空。這里用到原子操作的原因是防止在這個(gè)過(guò)程中,有其他線程過(guò)來(lái)“偷工作”,導(dǎo)致并發(fā)修改 runnext 成員。
第二個(gè) for 循環(huán)則是在嘗試獲取 runnext 成員失敗后,嘗試從本地隊(duì)列中返回隊(duì)列頭的 goroutine。同樣,先用原子操作獲取隊(duì)列頭,使用原子操作的原因同樣是防止其他線程“偷工作”時(shí)并發(fā)對(duì)隊(duì)列頭的并發(fā)寫操作。之后,直接獲取隊(duì)列尾,因?yàn)椴粨?dān)心其他線程同時(shí)更改,所以直接獲取。注意,“偷工作”時(shí)只會(huì)修改隊(duì)列頭。
比較隊(duì)列頭和隊(duì)列尾,如果兩者相等,說(shuō)明 P 本地隊(duì)列沒(méi)有可運(yùn)行的 goroutine,直接返回空。否則,算出隊(duì)列頭指向的 goroutine,再用一個(gè) CAS 原子操作來(lái)嘗試修改隊(duì)列頭,使用原子操作的原因同上。
從本地隊(duì)列獲取可運(yùn)行 goroutine 的過(guò)程比較簡(jiǎn)單,我們?cè)賮?lái)看從全局隊(duì)列獲取 goroutine 的過(guò)程。在 schedule 函數(shù)中調(diào)用 globrunqget 的代碼:
// 為了公平,每調(diào)用 schedule 函數(shù) 61 次就要從全局可運(yùn)行 goroutine 隊(duì)列中獲取 if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) // 從全局隊(duì)列最大獲取 1 個(gè) gorutine gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) }這說(shuō)明,并不是每次調(diào)度都會(huì)從全局隊(duì)列獲取可運(yùn)行的 goroutine。實(shí)際情況是調(diào)度器每調(diào)度 61 次并且全局隊(duì)列有可運(yùn)行 goroutine 的情況下才會(huì)調(diào)用 globrunqget 函數(shù)嘗試從全局獲取可運(yùn)行 goroutine。畢竟,從全局獲取需要上鎖,這個(gè)開(kāi)銷可就大了,能不做就不做。
我們來(lái)詳細(xì)看下 globrunqget 的源碼:
// 嘗試從全局隊(duì)列里獲取可運(yùn)行的 goroutine 隊(duì)列 func globrunqget(_p_ *p, max int32) *g { // 如果隊(duì)列大小為 0 if sched.runqsize == 0 { return nil } // 根據(jù) p 的數(shù)量平分全局運(yùn)行隊(duì)列中的 goroutines n := sched.runqsize/gomaxprocs + 1 if n > sched.runqsize { n = sched.runqsize // 如果 gomaxprocs 為 1 } // 修正"偷"的數(shù)量 if max > 0 && n > max { n = max } // 最多只能"偷"本地工作隊(duì)列一半的數(shù)量 if n > int32(len(_p_.runq))/2 { n = int32(len(_p_.runq)) / 2 } // 更新全局可運(yùn)行隊(duì)列長(zhǎng)度 sched.runqsize -= n // 如果都要被"偷"走,修改隊(duì)列尾 if sched.runqsize == 0 { sched.runqtail = 0 } // 獲取隊(duì)列頭指向的 goroutine gp := sched.runqhead.ptr() // 移動(dòng)隊(duì)列頭 sched.runqhead = gp.schedlink n-- for ; n > 0; n-- { // 獲取當(dāng)前隊(duì)列頭 gp1 := sched.runqhead.ptr() // 移動(dòng)隊(duì)列頭 sched.runqhead = gp1.schedlink // 嘗試將 gp1 放入 P 本地,使全局隊(duì)列得到更多的執(zhí)行機(jī)會(huì) runqput(_p_, gp1, false) } // 返回最開(kāi)始獲取到的隊(duì)列頭所指向的 goroutine return gp }代碼比較簡(jiǎn)單。首先根據(jù)全局隊(duì)列的可運(yùn)行 goroutine 長(zhǎng)度和 P 的總數(shù),來(lái)計(jì)算一個(gè)數(shù)值,表示每個(gè) P 可平均分到的 goroutine 數(shù)量。
然后根據(jù)函數(shù)參數(shù)中的 max 以及 P 本地隊(duì)列的長(zhǎng)度來(lái)決定把多少全局隊(duì)列中的 goroutine 轉(zhuǎn)移到 P 本地。
最后,for 循環(huán)挨個(gè)把全局隊(duì)列中 n-1 個(gè) goroutine 轉(zhuǎn)移到本地,并且返回最開(kāi)始獲取到的隊(duì)列頭所指向的 goroutine,畢竟它最需要得到運(yùn)行的機(jī)會(huì)。
把全局隊(duì)列中的可運(yùn)行 goroutine 轉(zhuǎn)移到本地隊(duì)列,給了全局隊(duì)列中可運(yùn)行 goroutine 運(yùn)行的機(jī)會(huì),不然全局隊(duì)列中的 goroutine 一直得不到運(yùn)行。
最后,我們繼續(xù)看第三個(gè)過(guò)程,從其他 P “偷工作”:
// 從本地運(yùn)行隊(duì)列和全局運(yùn)行隊(duì)列都沒(méi)有找到需要運(yùn)行的 goroutine, // 調(diào)用 findrunnable 函數(shù)從其它工作線程的運(yùn)行隊(duì)列中偷取,如果偷不到,則當(dāng)前工作線程進(jìn)入睡眠 // 直到獲取到 runnable goroutine 之后 findrunnable 函數(shù)才會(huì)返回。 if gp == nil { gp, inheritTime = findrunnable() // blocks until work is available }這是整個(gè)找工作過(guò)程最復(fù)雜的部分:
、/ // 從其他地方找 goroutine 來(lái)執(zhí)行 func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() top: _p_ := _g_.m.p.ptr() // …………………… // local runq // 從本地隊(duì)列獲取 if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } // global runq // 從全局隊(duì)列獲取 if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false } } // …………………… // Steal work from other P's. // 如果其他的 P 都處于空閑狀態(tài),那肯定沒(méi)有其他工作要做 procs := uint32(gomaxprocs) if atomic.Load(&sched.npidle) == procs-1 { goto stop } // 如果有很多工作線程在找工作,那我就停下休息。避免消耗太多 CPU if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) { goto stop } if !_g_.m.spinning { // 設(shè)置自旋狀態(tài)為 true _g_.m.spinning = true // 自旋狀態(tài)數(shù)加 1 atomic.Xadd(&sched.nmspinning, 1) } // 從其它 p 的本地運(yùn)行隊(duì)列盜取 goroutine for i := 0; i < 4; i++ { for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { // …………………… stealRunNextG := i > 2 // first look for ready queues with more than 1 g if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil { return gp, false } } } stop: // …………………… // return P and block lock(&sched.lock) if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 { unlock(&sched.lock) goto top } if sched.runqsize != 0 { gp := globrunqget(_p_, 0) unlock(&sched.lock) return gp, false } // 當(dāng)前工作線程解除與 p 之間的綁定,準(zhǔn)備去休眠 if releasep() != _p_ { throw("findrunnable: wrong p") } // 把 p 放入空閑隊(duì)列 pidleput(_p_) unlock(&sched.lock) wasSpinning := _g_.m.spinning if _g_.m.spinning { // m 即將睡眠,不再處于自旋 _g_.m.spinning = false if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("findrunnable: negative nmspinning") } } // check all runqueues once again // 休眠之前再檢查一下所有的 p,看一下是否有工作要做 for i := 0; i < int(gomaxprocs); i++ { _p_ := allp[i] if _p_ != nil && !runqempty(_p_) { lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ != nil { acquirep(_p_) if wasSpinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } goto top } break } } // …………………… // 休眠 stopm() goto top }這部分也是最能說(shuō)明 M 找工作的鍥而不舍精神:盡力去各個(gè)運(yùn)行隊(duì)列中尋找 goroutine,如果實(shí)在找不到則進(jìn)入睡眠狀態(tài),等待有工作時(shí),被其他 M 喚醒。
先獲取當(dāng)前指向的 g,也就是 g0,然后拿到其綁定的 p,即 _p_。
首先再次嘗試從 _p_ 本地隊(duì)列獲取 goroutine,如果沒(méi)有獲取到,則嘗試從全局隊(duì)列獲取。如果還沒(méi)有獲取到就會(huì)嘗試去“偷”了,這也是沒(méi)有辦法的事。
不過(guò),在偷之前,先看大的局勢(shì)。如果其他所有的 P 都處于空閑狀態(tài),就說(shuō)明其他 P 肯定沒(méi)有工作可做,就沒(méi)必要再去偷了,畢竟“地主家也沒(méi)有余糧了”,跳到 stop 部分。接著再看下當(dāng)前正在“偷工作”的線程數(shù)量“太多了”,就沒(méi)必要扎堆了,這么多人,競(jìng)爭(zhēng)肯定大,工作肯定不好找,也不好偷。
在真正的“偷”工作之前,把自己的自旋狀態(tài)設(shè)置為 true,全局自旋數(shù)量加 1。
終于到了“偷工作”的部分了,好緊張!整個(gè)過(guò)程由兩層 for 循環(huán)組成,外層控制嘗試偷的次數(shù),內(nèi)層控制“偷”的順序,并真正的去“偷”。實(shí)際上,內(nèi)層會(huì)遍歷所有的 P,因此,整體看來(lái),會(huì)嘗試 4 次掃遍所有的 P,并去“偷工作”,是不是非常有毅力!
第二層的循環(huán)并不是每次都按一個(gè)固定的順序去遍歷所有的 P,這樣不太科學(xué),而是使用了一些方法,“隨機(jī)”地遍歷。具體是使用了下面這個(gè)變量:
var stealOrder randomOrder type randomOrder struct { count uint32 coprimes []uint32 }初始化的時(shí)候會(huì)給 count 賦一個(gè)值,例如 8,根據(jù) count 計(jì)算出 coprimes,里面的元素是小于 count 的值,且和 8 互質(zhì),算出來(lái)是:[1, 3, 5, 7]。
第二層循環(huán),開(kāi)始隨機(jī)給一個(gè)值,例如 2,則第一個(gè)訪問(wèn)的 P 就是 P2;從 coprimes 里取出索引為 2 的值為 5,那么,第二個(gè)訪問(wèn)的 P 索引就是 2+5=7;依此類推,第三個(gè)就是 7+5=12,和 count 做一個(gè)取余操作,即 12%8=4……
在最后一次遍歷所有的 P 的過(guò)程中,連人家的 runnext 也要嘗試偷過(guò)來(lái),畢竟前三次的失敗經(jīng)驗(yàn)證明,工作太不好“偷”了,民不聊生啊,只能做得絕一點(diǎn)了, stealRunNextG 控制是否要打 runnext 的主意:
stealRunNextG := i > 2確定好準(zhǔn)備偷的對(duì)象 allp[enum.position() 之后,調(diào)用 runqsteal(_p_,allp[enum.position()],stealRunNextG) 函數(shù)執(zhí)行。
// 從 p2 偷走一半的工作放到 _p_ 的本地 func runqsteal(_p_, p2 *p, stealRunNextG bool) *g { // 隊(duì)尾 t := _p_.runqtail // 從 p2 偷取工作,放到 _p_.runq 的隊(duì)尾 n := runqgrab(p2, &_p_.runq, t, stealRunNextG) if n == 0 { return nil } n-- // 找到最后一個(gè) g,準(zhǔn)備返回 gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr() if n == 0 { // 說(shuō)明只偷了一個(gè) g return gp } // 隊(duì)列頭 h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers // 判斷是否偷太多了 if t-h+n >= uint32(len(_p_.runq)) { throw("runqsteal: runq overflow") } // 更新隊(duì)尾,將偷來(lái)的工作加入隊(duì)列 atomic.Store(&_p_.runqtail, t+n) // store-release, makes the item available for consumption return gp }調(diào)用 runqgrab 從 p2 偷走它一半的工作放到 _p_ 本地:
n := runqgrab(p2, &_p_.runq, t, stealRunNextG)runqgrab 函數(shù)將從 p2 偷來(lái)的工作放到以 t 為地址的數(shù)組里,數(shù)組就是 _p_.runq。我們知道, t 是 _p_.runq 的隊(duì)尾,因此這行代碼表達(dá)的真正意思是將從 p2 偷來(lái)的工作,神不知,鬼不覺(jué)地放到 _p_.runq 的隊(duì)尾,之后,再悄悄改一下 `_p_.runqtail 就把這些偷來(lái)的工作據(jù)為己有了。
接著往下看,返回的 n 表示偷到的工作數(shù)量。先將 n 自減 1,目的是把第 n 個(gè)工作(也就是 g)直接返回,如果這時(shí)候 n 變成 0 了,說(shuō)明就只偷到了一個(gè) g,那就直接返回。否則,將隊(duì)尾往后移動(dòng) n,把偷來(lái)的工作合法化,簡(jiǎn)直完美!
我們接著往下看 runqgrab 函數(shù)的實(shí)現(xiàn):
// 從 _p_ 批量獲取可運(yùn)行 goroutine,放到 batch 數(shù)組里 // batch 是一個(gè)環(huán),起始于 batchHead // 返回偷的數(shù)量,返回的 goroutine 可被任何 P 執(zhí)行 func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 { for { // 隊(duì)列頭 h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with other consumers // 隊(duì)列尾 t := atomic.Load(&_p_.runqtail) // load-acquire, synchronize with the producer // g 的數(shù)量 n := t - h // 取一半 n = n - n/2 if n == 0 { if stealRunNextG { // 連 runnext 都要偷,沒(méi)有人性 // Try to steal from _p_.runnext. if next := _p_.runnext; next != 0 { // 這里是為了防止 _p_ 執(zhí)行當(dāng)前 g,并且馬上就要阻塞,所以會(huì)馬上執(zhí)行 runnext, // 這個(gè)時(shí)候偷就沒(méi)必要了,因?yàn)樽?g 在 P 之間"游走"不太劃算, // 就不偷了,給他們一個(gè)機(jī)會(huì)。 // channel 一次同步的的接收發(fā)送需要 50ns 左右,因此 3us 差不多給了他們 50 次機(jī)會(huì)了,做得還是不錯(cuò)的 if GOOS != "windows" { usleep(3) } else { osyield() } if !_p_.runnext.cas(next, 0) { continue } // 真的偷走了 next batch[batchHead%uint32(len(batch))] = next // 返回偷的數(shù)量,只有 1 個(gè) return 1 } } // 沒(méi)偷到 return 0 } // 如果 n 這時(shí)變得太大了,重新來(lái)一遍了,不能偷的太多,做得太過(guò)分了 if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t continue } // 將 g 放置到 bacth 中 for i := uint32(0); i < n; i++ { g := _p_.runq[(h+i)%uint32(len(_p_.runq))] batch[(batchHead+i)%uint32(len(batch))] = g } // 工作被偷走了,更新一下隊(duì)列頭指針 if atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume return n } } }外層直接就是一個(gè)無(wú)限循環(huán),先用原子操作取出 p 的隊(duì)列頭和隊(duì)列尾,算出一半的 g 的數(shù)量,如果 n == 0,說(shuō)明地主家也沒(méi)有余糧,這時(shí)看 stealRunNextG 的值。如果為假,說(shuō)明不偷 runnext,那就直接返回 0,啥也沒(méi)偷到;如果為真,則要嘗試偷一下 runnext。
先判斷 runnext 不為空,那就真的準(zhǔn)備偷了。不過(guò)在這之前,要先休眠 3 us。這是為了防止 p 正在執(zhí)行當(dāng)前的 g,馬上就要阻塞(可能是向一個(gè)非緩沖的 channel 發(fā)送數(shù)據(jù),沒(méi)有接收者),之后會(huì)馬上執(zhí)行 runnext。這個(gè)時(shí)候偷就沒(méi)必要了,因?yàn)?runnext 馬上就要執(zhí)行了,偷走它還不是要去執(zhí)行,那何必要偷呢?大家的愿望就是提高效率,這樣讓 g 在 P 之間"游走"不太劃算,索性先不偷了,給他們一個(gè)機(jī)會(huì)。channel一次同步的的接收或發(fā)送需要 50ns 左右,因此休眠 3us 差不多給了他們 50 次機(jī)會(huì)了,做得還是挺厚道的。
繼續(xù)看,再次判斷 n 是否小于等于 p.runq 長(zhǎng)度的一半,因?yàn)檫@個(gè)時(shí)候很可能 p 也被其他線程偷了,它的 p.runq 就沒(méi)那么多工作了,這個(gè)時(shí)候就不能偷這么多了,要重新再走一次循環(huán)。
最后一個(gè) for 循環(huán),將 p.runq 里的 g 放到 batch 數(shù)組里。使用原子操作更新 p 的隊(duì)列頭指針,往后移動(dòng) n 個(gè)位置,這些都是被偷走的,傷心!
回到 findrunnable 函數(shù),經(jīng)過(guò)上述三個(gè)層面的“偷竊”過(guò)程,我們?nèi)匀粵](méi)有找到工作,真慘!于是就走到了 stop 這個(gè)代碼塊。
先上鎖,因?yàn)橐獙?P 放到全局空閑 P 鏈表里去。在這之前還不死心,再瞧一下全局隊(duì)列里是否有工作,如果有,再去嘗試偷全局。
如果沒(méi)有,就先解除當(dāng)前工作線程和當(dāng)前 P 的綁定關(guān)系:
// 解除 p 與 m 的關(guān)聯(lián) func releasep() *p { _g_ := getg() // …………………… _p_ := _g_.m.p.ptr() // …………………… // 清空一些字段 _g_.m.p = 0 _g_.m.mcache = nil _p_.m = 0 _p_.status = _Pidle return _p_ }主要的工作就是將 p 的 m 字段清空,并將 p 的狀態(tài)修改為 _Pidle。
這之后,將其放入全局空閑 P 列表:
// 將 p 放到 _Pidle 列表里 //go:nowritebarrierrec func pidleput(_p_ *p) { if !runqempty(_p_) { throw("pidleput: P has non-empty run queue") } _p_.link = sched.pidle sched.pidle.set(_p_) // 增加全局空閑 P 的數(shù)量 atomic.Xadd(&sched.npidle, 1) // TODO: fast atomic }構(gòu)造鏈表的過(guò)程其實(shí)比較簡(jiǎn)單,先將 p.link 指向原來(lái)的 sched.pidle 所指向的 p,也就是原空閑鏈表的最后一個(gè) P,最后,再更新 sched.pidle,使其指向當(dāng)前 p,這樣,新的鏈表就構(gòu)造完成。
接下來(lái)就要真正地準(zhǔn)備休眠了,但是仍然不死心!還要再查看一次所有的 P 是否有工作,如果發(fā)現(xiàn)任何一個(gè) P 有工作的話(判斷 P 的本地隊(duì)列不空),就先從全局空閑 P 鏈表里先拿到一個(gè) P:
// 試圖從 _Pidle 列表里獲取 p //go:nowritebarrierrec func pidleget() *p { _p_ := sched.pidle.ptr() if _p_ != nil { sched.pidle = _p_.link atomic.Xadd(&sched.npidle, -1) // TODO: fast atomic } return _p_ }比較簡(jiǎn)單,獲取鏈表最后一個(gè),再更新 sched.pidle,使其指向前一個(gè) P。調(diào)用 acquirep(_p_) 綁定獲取到的 p 和 m,主要的動(dòng)作就是設(shè)置 p 的 m 字段,更改 p 的工作狀態(tài)為 _Prunning,并且設(shè)置 m 的 p 字段。做完這些之后,再次進(jìn)入 top 代碼段,再走一遍之前找工作的過(guò)程。
// 休眠,停止執(zhí)行工作,直到有新的工作需要做為止 func stopm() { // 當(dāng)前 goroutine,g0 _g_ := getg() // …………………… retry: lock(&sched.lock) // 將 m 放到全局空閑鏈表里去 mput(_g_.m) unlock(&sched.lock) // 進(jìn)入睡眠狀態(tài) notesleep(&_g_.m.park) // 這里被其他工作線程喚醒 noteclear(&_g_.m.park) // …………………… acquirep(_g_.m.nextp.ptr()) _g_.m.nextp = 0 }先將 m 放入全局空閑鏈表里,注意涉及到全局變量的修改,要上鎖。接著,調(diào)用 notesleep(&_g_.m.park) 使得當(dāng)前工作線程進(jìn)入休眠狀態(tài)。其他工作線程在檢測(cè)到“當(dāng)前有很多工作要做”,會(huì)調(diào)用 noteclear(&_g_.m.park) 將其喚醒。注意,這兩個(gè)函數(shù)傳入的參數(shù)都是一樣的:&_g_.m.park,它的類型是:
type note struct { key uintptr }很簡(jiǎn)單,只有一個(gè) key 字段。
note 的底層實(shí)現(xiàn)機(jī)制跟操作系統(tǒng)相關(guān),不同系統(tǒng)使用不同的機(jī)制,比如 linux 下使用的 futex 系統(tǒng)調(diào)用,而 mac 下則是使用的 pthreadcondt 條件變量,note 對(duì)這些底層機(jī)制做了一個(gè)抽象和封裝。
這種封裝給擴(kuò)展性帶來(lái)了很大的好處,比如當(dāng)睡眠和喚醒功能需要支持新平臺(tái)時(shí),只需要在 note 層增加對(duì)特定平臺(tái)的支持即可,不需要修改上層的任何代碼。
上面這一段來(lái)自阿波張的系列教程。我們接著來(lái)看下 notesleep 的實(shí)現(xiàn):
// runtime/lock_futex.go func notesleep(n *note) { // g0 gp := getg() if gp != gp.m.g0 { throw("notesleep not on g0") } // -1 表示無(wú)限期休眠 ns := int64(-1) // …………………… // 這里之所以需要用一個(gè)循環(huán),是因?yàn)?futexsleep 有可能意外從睡眠中返回, // 所以 futexsleep 函數(shù)返回后還需要檢查 note.key 是否還是 0, // 如果是 0 則表示并不是其它工作線程喚醒了我們, // 只是 futexsleep 意外返回了,需要再次調(diào)用 futexsleep 進(jìn)入睡眠 for atomic.Load(key32(&n.key)) == 0 { // 表示 m 被阻塞 gp.m.blocked = true futexsleep(key32(&n.key), 0, ns) // …………………… // 被喚醒,更新標(biāo)志 gp.m.blocked = false } }繼續(xù)往下追:
// runtime/os_linux.go func futexsleep(addr *uint32, val uint32, ns int64) { var ts timespec if ns < 0 { futex(unsafe.Pointer(addr), _FUTEX_WAIT, val, nil, nil, 0) return } // …………………… }當(dāng) *addr 和 val 相等的時(shí)候,休眠。futex 由匯編語(yǔ)言實(shí)現(xiàn):
TEXT runtime·futex(SB),NOSPLIT,$0 // 為系統(tǒng)調(diào)用準(zhǔn)備參數(shù) MOVQ addr+0(FP), DI MOVL op+8(FP), SI MOVL val+12(FP), DX MOVQ ts+16(FP), R10 MOVQ addr2+24(FP), R8 MOVL val3+32(FP), R9 // 系統(tǒng)調(diào)用編號(hào) MOVL $202, AX // 執(zhí)行 futex 系統(tǒng)調(diào)用進(jìn)入休眠,被喚醒后接著執(zhí)行下一條 MOVL 指令 SYSCALL // 保存系統(tǒng)調(diào)用的返回值 MOVL AX, ret+40(FP) RET這樣,找不到工作的 m 就休眠了。當(dāng)其他線程發(fā)現(xiàn)有工作要做時(shí),就會(huì)先找到空閑的 m,再通過(guò) m.park 字段來(lái)喚醒本線程。喚醒之后,回到 findrunnable 函數(shù),繼續(xù)尋找 goroutine,找到后返回 schedule 函數(shù),然后就會(huì)去運(yùn)行找到的 goroutine。
這就是 m 找工作的整個(gè)過(guò)程,歷盡千辛萬(wàn)苦,終于修成正果。
參考資料
【阿波張 Goroutine 調(diào)度策略】https://mp.weixin.qq.com/s/2objs5JrlnKnwFbF4a2z2g總結(jié)
以上是生活随笔為你收集整理的锲而不舍 —— M 是怎样找工作的?(八)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 忠于职守 —— sysmon 线程到底做
- 下一篇: 深度解密Go语言之pprof