【Go】sync.WaitGroup 源码分析
WaitGroup
sync.WaitGroup 用于等待一組 goroutine 返回,如:
var wg = sync.WaitGroup{}func do() {time.Sleep(time.Second)fmt.Println("done")wg.Done() }func main() {go do()go do()wg.Add(2)wg.Wait()fmt.Println("main done") }概覽
如上面的例子, WaitGroup 只堆外暴露了三個(gè)方法:
// 等待的 goroutine 數(shù)加 delta func (wg *WaitGroup) Add(delta int) // 等待的 goroutine 數(shù)減一 func (wg *WaitGroup) Done() // 阻塞,等待這一組 goroutine 全部退出 func (wg *WaitGroup) Wait() type WaitGroup struct {noCopy noCopystate1 [3]uint32 }WaitGroup 結(jié)構(gòu)體中也只有兩個(gè)字段:
- noCopy: 用來(lái)保證不會(huì)被開(kāi)發(fā)者錯(cuò)誤拷貝
- state1: 用來(lái)保存相關(guān)狀態(tài)量
另外,他還提供了一個(gè)私有的方法用來(lái)獲取狀態(tài)和信號(hào)量
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]} else {return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]} }statep 就是狀態(tài)量,注意這里通過(guò) unsafe 將 3 位數(shù)組(共 96 位)強(qiáng)轉(zhuǎn)成了 uint64 這會(huì)導(dǎo)致部分?jǐn)?shù)據(jù)丟失,具體來(lái)說(shuō),在64位的機(jī)器上會(huì)丟失最低 32 位,也即 state1[2] 在 32 位機(jī)器上會(huì)丟失最高 32 位,也即 state1[0], 這也是 64 位和 32 位機(jī)器上數(shù)組三位元素表示意義不同的原因。
強(qiáng)轉(zhuǎn)之后,以 64 位機(jī)器為例,數(shù)組第二位會(huì)作為 statep 的高 32 位,第一位會(huì)作為 statep 的低 32 位,也就是說(shuō),此時(shí) statep 的結(jié)構(gòu)如下:
+----------------------+-----------------------+ | | | | Counter | Waiter | | | | +----------------------+-----------------------+Add
func (wg *WaitGroup) Done() {wg.Add(-1) }Done 其實(shí)就是對(duì) Add 的一個(gè)封裝。
func (wg *WaitGroup) Add(delta int) {statep, semap := wg.state()// 把 delta 加到 count 中state := atomic.AddUint64(statep, uint64(delta)<<32)// 獲取 countv := int32(state >> 32)// 丟失高 32 位的 Counter, 得到 Waiterw := uint32(state)if v < 0 {panic("sync: negative WaitGroup counter")}// Waiter 不等于 0 說(shuō)明現(xiàn)在還有 goroutine 沒(méi)有 done, 這時(shí)是不允許 Add 的// 也即在 Wait 的過(guò)程中不允許通過(guò) Add 添加 if w != 0 && delta > 0 && v == int32(delta) {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// 正常修改 Counter 后返回if v > 0 || w == 0 {return}// 到這說(shuō)明 Counter == 0 并且 delta 不是一個(gè)正數(shù)(執(zhí)行 Done,并且是最后一次 Done)// 狀態(tài)改變,說(shuō)明有人在 Wait 過(guò)程中 Add 了if *statep != state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// 狀態(tài)置 0*statep = 0// 喚醒 Wait 中的 goroutinefor ; w != 0; w-- {runtime_Semrelease(semap, false, 0)} }總結(jié)一下,首先 Done 只是對(duì) Add 的簡(jiǎn)單封裝,在 Add 時(shí),通過(guò)巧妙利用精度丟失和位移運(yùn)算分別計(jì)算出 add 后的 Counter 和 Waiter, 前者表示已經(jīng) add 了多少 Goroutine, 后者表示還有多少個(gè) goroutine 需要 Wait, 這里需要注意,在 Wait 的過(guò)程中是不允許 Add 新 goroutine 的;在執(zhí)行 Done 時(shí),只是簡(jiǎn)單的將 Counter 減 1,直到 Counter == 1 時(shí),也即最后一個(gè) goroutine 已經(jīng)執(zhí)行完畢時(shí),Done 會(huì)通知 Wait 停止阻塞,并將標(biāo)志清空。
Wait
func (wg *WaitGroup) Wait() {statep, semap := wg.state()for {state := atomic.LoadUint64(statep)v := int32(state >> 32)// Counter == 0, 沒(méi)有 Add, 直接返回if v == 0 {return}// 每一次 CAS 讓 Waiter 加一,并進(jìn)入阻塞,等待最后一個(gè) Done 的 goroutine 將其喚醒if atomic.CompareAndSwapUint64(statep, state, state+1) {runtime_Semacquire(semap)if *statep != 0 {panic("sync: WaitGroup is reused before previous Wait has returned")}return}// 如果 CAS 比較沒(méi)通過(guò),說(shuō)明在此過(guò)程中有 goroutine Done 了,需要重新去獲取最新的狀態(tài)} }總結(jié)
WaitGroup 用于阻塞某個(gè) Goroutine 以等待一組 goroutine 返回,在實(shí)現(xiàn)上,它采用一個(gè)長(zhǎng)度為 3 的 32 位無(wú)符號(hào)整型數(shù)組保存 Waiter, Counter, 和信號(hào)量,每次 Add 時(shí),會(huì)將 Counder 加上 delta,而當(dāng)執(zhí)行 Done 或 delta 為負(fù)數(shù)時(shí),如果 Done 的是最后一個(gè) Goroutine, Add 會(huì)去喚醒 Wait
執(zhí)行 Wait 只是將 Waiter 加一并阻塞等待 Add 的喚醒,所以其實(shí) Waiter 的值只會(huì)是 0 或 1.
總結(jié)
以上是生活随笔為你收集整理的【Go】sync.WaitGroup 源码分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 列的数目比列的名字要多_你们要的甘特图来
- 下一篇: [2022年大学生创新创业训练计划项目立