go int32不能打印0_Go并发实战--sync WaitGroup
前言
waitgroup也是一個非常有用的并發(fā)工具,有點像是Java中的CyclicBarrier,只不過Go中的WaitGroup等待的是協(xié)程而已。 通常來說,WaitGroup是go并發(fā)中最常用的工具了,在起協(xié)程并發(fā)做一些事兒,我們可以通過WaitGroup了表達(dá)這一組協(xié)程的任務(wù)是否完成,已決定是否繼續(xù)往下走,或者取任務(wù)結(jié)果,下面來看一下WaitGroup的使用及實現(xiàn)。
語法基礎(chǔ)
WaitGroup的核心關(guān)注點是:Add()、Done()、Wait()三個函數(shù) Add函數(shù)主要為WaitGroup的等待數(shù)+1或者+n Done函數(shù)調(diào)用的也是Add函數(shù),主要用于-1操作 Wait函數(shù)是指阻塞當(dāng)前協(xié)程,直到等待數(shù)歸為0才繼續(xù)向下執(zhí)行 下面來看一個demo:
func main() {waitGroup := &sync.WaitGroup{}DoSomething(waitGroup)waitGroup.Wait() // 這里會阻塞main,直到所有的任務(wù)都完成fmt.Println("end") }func DoSomething(waitGroup *sync.WaitGroup) {for i:=0;i <10;i++ {waitGroup.Add(1)go func(waitGroup *sync.WaitGroup) {fmt.Print("1-")defer waitGroup.Done()}(waitGroup)} }輸出:
實現(xiàn)原理
首先來看WaitGroup的結(jié)構(gòu)體:
type WaitGroup struct {noCopy noCopystate1 [3]uint32 }noCopy共64位,高32位是一個計數(shù)器,低32位代表有多少在等待,64位原子操作需要64位對齊,但32位編譯器不能確保對齊。因此,我們分配12個字節(jié),然后使用其中對齊的8個字節(jié)作為狀態(tài),另外4個字節(jié)作為SEMA的存儲。 state1 保存的是狀態(tài)信息 接下來看一下核心的兩個函數(shù)Add()、Wait() Add(): 1、添加將delta(可能為負(fù))加到waitgroup計數(shù)器上。 2、如果計數(shù)器變?yōu)榱?#xff0c;則所有在等待時阻塞的goroutine都被釋放。 3、如果計數(shù)器為負(fù),則直接panic。
需要注意的是: 1、當(dāng)計數(shù)器為零時發(fā)生的具有正增量的調(diào)用必須在等待之前發(fā)生,但任何時候都可能發(fā)生負(fù)增量的調(diào)用。這意味著要Add的函數(shù)調(diào)用應(yīng)該在創(chuàng)建goroutine或等待的其他事件的語句之前執(zhí)行。 2、如果waitgroup被重用以等待幾個獨立的事件集,則必須在返回所有以前的wait調(diào)用之后進(jìn)行新的add調(diào)用。
func (wg *WaitGroup) Add(delta int) {statep, semap := wg.state()if race.Enabled {_ = *statep // trigger nil deref earlyif delta < 0 {// 與wait變?yōu)橥讲僮鱮ace.ReleaseMerge(unsafe.Pointer(wg))}race.Disable()defer race.Enable()}state := atomic.AddUint64(statep, uint64(delta)<<32)v := int32(state >> 32)w := uint32(state)if race.Enabled && delta > 0 && v == int32(delta) { //第一個增量必須與wait同步。 //需要將其變?yōu)闉橐粋€讀取,因為可以有幾個從0開始的并發(fā)wg.counter轉(zhuǎn)換。race.Read(unsafe.Pointer(semap))}if v < 0 {panic("sync: negative WaitGroup counter")}if w != 0 && delta > 0 && v == int32(delta) {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}if v > 0 || w == 0 {return} //當(dāng)waiters>0時,此goroutine已將counter設(shè)置為0。 //現(xiàn)在狀態(tài)不能同時突變: //-添加不能與等待同時發(fā)生, //-如果看到counter==0,則wait不會增加waiters。 //仍然執(zhí)行簡單的檢查以檢測WaitGroup的誤用。if *statep != state {panic("sync: WaitGroup misuse: Add called concurrently with Wait")}// Reset waiters count to 0.*statep = 0for ; w != 0; w-- {runtime_Semrelease(semap, false)} }Wait(): Wait函數(shù)主要用于阻塞那些調(diào)用Wait的goroutine,直到waitgroup的任務(wù)計數(shù)器為0,才會放行,下面來看一下源碼:
func (wg *WaitGroup) Wait() {statep, semap := wg.state()if race.Enabled {_ = *stateprace.Disable()}// 循環(huán)檢查計數(shù)器情況for {state := atomic.LoadUint64(statep)v := int32(state >> 32)w := uint32(state)if v == 0 {// 如果技術(shù)為0的話就不用等待了,完成放行if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}// 如果有有新的調(diào)用wait函數(shù)的,添加新的等待者,等待者計數(shù)器+1if atomic.CompareAndSwapUint64(statep, state, state+1) {if race.Enabled && w == 0 {// 等待和新添加的是同步的// 只能為第一個競爭者進(jìn)行寫操作,否則的話會產(chǎn)生互相競爭race.Write(unsafe.Pointer(semap))}runtime_Semacquire(semap)if *statep != 0 {panic("sync: WaitGroup is reused before previous Wait has returned")}if race.Enabled {race.Enable()race.Acquire(unsafe.Pointer(wg))}return}} }關(guān)于WaitGroup暫時介紹這么多。
總結(jié)
以上是生活随笔為你收集整理的go int32不能打印0_Go并发实战--sync WaitGroup的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spark如何解决文件不存在_Spark
- 下一篇: 无人驾驶图像数据集_自动驾驶数据集