日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Go嵌套并发实现EDM,附坑点分析#1

發布時間:2025/5/22 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Go嵌套并发实现EDM,附坑点分析#1 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

看著身邊優秀的小伙伴們早就開始寫博客,自己深感落后,還好遲做總比不做好,勉勵自己見賢思齊。趁著年前最后一個周末,陽光正好,寫下第一篇博客,為2019年開個頭,以期完成今年為自己立下的flags。

從PHPer轉Gopher,很大一個原因就是業務對性能和并發的持續需求,另一個主要原因就是Go語言原生的并發特性,可以在提供同等高可用的能力下,使用更少的機器資源,節約可觀的成本。因此本文就結合自己在學習Go并發的實戰demo中,把遇到的一些坑點寫下來,共享進步。

1. 在Go語言中實現并發控制,目前主要有三種方式:

a) Channel - 分為無緩沖、有緩沖通道;

b) WaitGroup - sync包提供的goroutine間的同步機制;

c) Context - 在調用鏈不同goroutine間傳遞和共享數據;

本文demo中主要用到了前兩種,基本使用請查看官方文檔。

2. Demo需求與分析:

需求:實現一個EDM的高效郵件發送:需要支持多個國家(可以看成是多個任務),需要記錄每條任務發送的狀態(當前成功、失敗條數),需要支持可暫停(stop)、重新發送(run)操作。

分析:從需求可以看出,在郵件發送中可以通過并發實現多個國家(多個任務)并發、單個任務分批次并發實現快速、高效EDM需求。

3. Demo實戰源碼:

3.1 main.go

package mainimport ("bufio""fmt""io""log""os""strconv""sync""time" )var (batchLength = 20wg sync.WaitGroupfinish = make(chan bool) )func main() {startTime := time.Now().UnixNano()for i := 1; i <= 3; i++ {filename := "./task/edm" + strconv.Itoa(i) + ".txt"start := 60go RunTask(filename, start, batchLength)}// main 阻塞等待goroutine執行完成fmt.Println(<-finish)fmt.Println("finished all tasks.")endTime := time.Now().UnixNano()fmt.Println("Total cost(ms):", (endTime-startTime)/1e6) }// 單任務 func RunTask(filename string, start, length int) (retErr error) {for {readLine, err := ReadLines(filename, start, length)if err == io.EOF {fmt.Println("Read EOF:", filename)retErr = errbreak}if err != nil {fmt.Println(err)retErr = errbreak}fmt.Println("current line:", readLine)start += length// 等待一批完成才進入下一批//wg.Wait()}wg.Wait()finish <- truereturn retErr } 復制代碼

注意上面wg.Wait()的位置(下面有討論),在finish channel之前,目的是為了等待子goroutine運行完,再通過一個無緩沖通道finish通知main goroutine,然后main運行結束。

func ReadLines()讀取指定行數據:

// 讀取指定行數據 func ReadLines(filename string, start, length int) (line int, retErr error) {fmt.Println("current file:", filename)fileObj, err := os.Open(filename)if err != nil {panic(err)}defer fileObj.Close()// 跳過開始行之前的行-ReadString方式startLine := 1endLine := start + lengthreader := bufio.NewReader(fileObj)for {line, err := reader.ReadString(byte('\n'))if err == io.EOF {fmt.Println("Read EOF:", filename)retErr = errbreak}if err != nil {log.Fatal(err)retErr = errbreak}if startLine > start && startLine <= endLine {wg.Add(1)// go并發執行go SendEmail(line)if startLine == endLine {break}}startLine++}return startLine, retErr }// 模擬郵件發送 func SendEmail(email string) error {defer wg.Done()time.Sleep(time.Second * 1)fmt.Println(email)return nil } 復制代碼

運行上面main.go,3個任務在1s內并發完成所有郵件(./task/edm1.txt中一行表示一個郵箱)發送。

truefinished all tasks.Total cost(ms): 1001 復制代碼

那么問題來了:沒有實現分批每次并發batchLength = 20,因為如果不分批發送,只要其中某個任務或某一封郵件出錯了,那下次重新run的時候,會不知道哪些用戶已經發送過了,出現重復發送。而分批發送即使中途出錯了,下一次重新run可從上次出錯的end行開始,最多是[start - end]一個batchLength 發送失敗,可以接受。

于是,將倒數第5行wg.Wait()注釋掉,倒數第8行注釋打開,如下:

// 單任務 func RunTask(filename string, start, length int) (retErr error) {for {readLine, err := ReadLines(filename, start, length)if err == io.EOF {fmt.Println("Read EOF:", filename)retErr = errbreak}if err != nil {fmt.Println(err)retErr = errbreak}fmt.Println("current line:", readLine)start += length// 等待一批完成才進入下一批wg.Wait()}//wg.Wait()finish <- truereturn retErr } 復制代碼

運行就報錯:

panic: sync: WaitGroup is reused before previous Wait has returned 復制代碼

提示WaitGroup在goroutine之間重用了,雖然是全局變量,看起來是使用不當。怎么調整呢?

3.2 main.go

package mainimport ("bufio""fmt""io""log""os""strconv""sync""time" )var (batchLength = 10outerWg sync.WaitGroup )func main() {startTime := time.Now().UnixNano()for i := 1; i <= 3; i++ {filename := "./task/edm" + strconv.Itoa(i) + ".txt"start := 60outerWg.Add(1)go RunTask(filename, start, batchLength)}// main 阻塞等待goroutine執行完成outerWg.Wait()fmt.Println("finished all tasks.")endTime := time.Now().UnixNano()fmt.Println("Total cost(ms):", (endTime-startTime)/1e6) }// 單任務 func RunTask(filename string, start, length int) (retErr error) {for {isFinish := make(chan bool)readLine, err := ReadLines(filename, start, length, isFinish)if err == io.EOF {fmt.Println("Read EOF:", filename)retErr = errbreak}if err != nil {fmt.Println(err)retErr = errbreak}// 等待一批完成才進入下一批fmt.Println("current line:", readLine)start += length<-isFinish// 關閉channel,釋放資源close(isFinish)}outerWg.Done()return retErr } 復制代碼

從上面可以看出:調整的思路是外層用WaitGroup控制,里層用channel 控制,執行又報錯 : (

fatal error: all goroutines are asleep - deadlock!goroutine 1 [semacquire]:sync.runtime_Semacquire(0x55fe7c)/usr/local/go/src/runtime/sema.go:56 +0x39sync.(*WaitGroup).Wait(0x55fe70)/usr/local/go/src/sync/waitgroup.go:131 +0x72main.main()/home/work/data/www/docker_env/www/go/src/WWW/edm/main.go:31 +0x1abgoroutine 5 [chan send]:main.ReadLines(0xc42001c0c0, 0xf, 0x3c, 0xa, 0xc42008e000, 0x0, 0x0, 0x0) 復制代碼

仔細檢查,發現上面代碼中定義的isFinish 是一個無緩沖channel,在發郵件SendMail() 子協程沒有完成時,讀取一個無數據的無緩沖通道將阻塞當前goroutine,其他goroutine也是一樣的都被阻塞,這樣就出現了all goroutines are asleep - deadlock!

于是將上面代碼改為有緩沖繼續嘗試:

isFinish := make(chan bool, 1) // 讀取指定行數據 func ReadLines(filename string, start, length int, isFinish chan bool) (line int, retErr error) {fmt.Println("current file:", filename)// 控制每一批發完再下一批var wg sync.WaitGroupfileObj, err := os.Open(filename)if err != nil {panic(err)}defer fileObj.Close()// 跳過開始行之前的行-ReadString方式startLine := 1endLine := start + lengthreader := bufio.NewReader(fileObj)for {line, err := reader.ReadString(byte('\n'))if err == io.EOF {fmt.Println("Read EOF:", filename)retErr = errbreak}if err != nil {log.Fatal(err)retErr = errbreak}if startLine > start && startLine <= endLine {wg.Add(1)// go并發執行go SendEmail(line, wg)if startLine == endLine {isFinish <- truebreak}}startLine++}wg.Wait()return startLine, retErr }// 模擬郵件發送 func SendEmail(email string, wg sync.WaitGroup) error {defer wg.Done()time.Sleep(time.Second * 1)fmt.Println(email)return nil } 復制代碼

運行,又報錯了 : (

fatal error: all goroutines are asleep - deadlock!goroutine 1 [semacquire]:sync.runtime_Semacquire(0x55fe7c)/usr/local/go/src/runtime/sema.go:56 +0x39sync.(*WaitGroup).Wait(0x55fe70) 復制代碼

這次提示有點不一樣,看起來是里層的WaitGroup 導致了死鎖,繼續檢查發現里層wg 是值傳遞,應該使用指針傳引用。

// go并發執行 go SendEmail(line, wg) 復制代碼

最后修改代碼如下:

// 讀取指定行數據 func ReadLines(filename string, start, length int, isFinish chan bool) (line int, retErr error) {fmt.Println("current file:", filename)// 控制每一批發完再下一批var wg sync.WaitGroupfileObj, err := os.Open(filename)if err != nil {panic(err)}defer fileObj.Close()// 跳過開始行之前的行-ReadString方式startLine := 1endLine := start + lengthreader := bufio.NewReader(fileObj)for {line, err := reader.ReadString(byte('\n'))if err == io.EOF {fmt.Println("Read EOF:", filename)retErr = errbreak}if err != nil {log.Fatal(err)retErr = errbreak}if startLine > start && startLine <= endLine {wg.Add(1)// go并發執行go SendEmail(line, &wg)if startLine == endLine {isFinish <- truebreak}}startLine++}wg.Wait()return startLine, retErr }// 模擬郵件發送 func SendEmail(email string, wg *sync.WaitGroup) error {defer wg.Done()time.Sleep(time.Second * 1)fmt.Println(email)return nil } 復制代碼

趕緊運行一下,這次終于成功啦 : )

current line: 100current file: ./task/edm2.txtRead EOF: ./task/edm2.txtRead EOF: ./task/edm2.txtfinished all tasks.Total cost(ms): 4003 復制代碼

每個任務模擬的是100行,從第60行開始運行,四個任務并發執行,每個任務分批內再次并發,并且控制了每一批次完成后再進行下一批,所以總運行時間約4s,符合期望值。完整源碼請閱讀原文或移步GitHub:github.com/astraw99/ed…

4. 小結:

本文通過兩層嵌套Go 并發,模擬實現了高性能并發EDM,具體的一些出錯行控制、任務中斷與再次執行將在下次繼續討論,主要邏輯已跑通,幾個坑點小結如下:

a) WaitGroup 一般用于main 主協程等待全部子協程退出后,再優雅退出主協程;嵌套使用時注意wg.Wait()放的位置;

b) 合理使用channel,無緩沖chan將阻塞當前goroutine,有緩沖chan在cap未滿的情況下不會阻塞當前goroutine,使用完記得釋放chan資源;

c) 注意函數間傳值或傳引用(本質上還是傳值,傳的指針的指針內存值)的合理使用;



后記:第一篇博客寫到這里差不多算完成了,一不小心一個下午就過去了,寫的邏輯、可讀性可能不太好請見諒,歡迎留言批評指正。感謝您的閱讀。

轉載于:https://juejin.im/post/5c4da16f5188253a317b7637

總結

以上是生活随笔為你收集整理的Go嵌套并发实现EDM,附坑点分析#1的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 拍真实国产伦偷精品 | 欧美日本在线 | 日韩欧美视频二区 | 中文在线永久免费观看 | 成人a级大片 | 91免费黄色| 国产日产欧美一区二区 | 欧美精品aaa | 一色屋免费视频 | 亚洲av成人无码久久精品 | 亚洲清纯国产 | 黄色香港三级三级三级 | 国产精品一区二区三区免费观看 | 免费网站观看www在线观看 | 成人性生活免费看 | 久久婷婷视频 | 黄色国产网站 | 欧美人与按摩师xxxx | 色人综合| 一级日韩片 | 国产真实乱人偷精品视频 | 久久中文网 | 在线免费av播放 | 97超碰网| 亚洲天堂性 | 日韩人妻精品在线 | 另类av小说 | 亚洲成av人片一区二区梦乃 | 无毛av| 天天摸天天舔天天操 | 一本久久综合亚洲鲁鲁五月天 | 亚洲瘦老头同性xxxxx | 日本一区二区三区欧美 | 欧美精品一区二区三区三州 | 午夜丰满寂寞少妇精品 | 欧美性猛交ⅹxxx乱大交3 | 亚洲欧美日韩在线不卡 | 亚洲国产精品欧美久久 | 九九在线视频 | 波多野结衣中文字幕一区二区 | 精品午夜一区二区三区在线观看 | 国产伊人网 | 久久久免费av | 成人观看视频 | 99热这里只有精品9 日韩综合在线 | 亚洲免费av一区二区 | 国产另类ts人妖一区二区 | 91在线观看免费高清完整版在线观看 | 欧美日韩精品一区二区三区 | 日韩成人av网站 | 欧美日韩亚洲不卡 | 欧美三级欧美成人高清 | 欧美日韩色图片 | 免费视频网站在线观看入口 | 影音先锋中文字幕一区二区 | 亚洲高清视频在线播放 | wwwxxx日本人| 天天综合网久久 | 欧美国产免费 | 色一情一交一乱一区二区三区 | 成人手机看片 | 国产美女免费无遮挡 | 亚洲伦理影院 | 天堂网在线资源 | 欧洲中文字幕日韩精品成人 | 日韩黄色在线观看 | 国产不卡一 | 成人学院中文字幕 | 国产av一区二区三区精品 | 精品欧美一区二区三区久久久 | 国产精品18久久久久久久久 | 亚洲区久久 | 成年人看的视频网站 | 免费看操片 | 国产丝袜在线播放 | 日韩精品网址 | 麻豆久久久久久久久久 | 亚洲双插 | 日本熟妇一区二区 | 在线看免费 | 亚洲视频免费在线 | 国产老女人乱淫免费可以 | 欧美人妖69xxxxxhd3d | 能免费看黄色的网站 | 老司机午夜免费精品视频 | 黑人巨大精品欧美一区免费视频 | 欧美一区免费观看 | 中文天堂av | 欧美群妇大交乱 | 国产成人无码一区二区三区在线 | 日本欧美在线播放 | 看黄色大片 | 欲色综合 | 国产福利观看 | 91一区 | av55 | 免费高清av在线看 | 国产精品一区不卡 | 国产美女www爽爽爽视频 | 91av精品|