日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Go嵌套并发实现EDM,附坑点分析#1

發布時間:2025/5/22 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Go嵌套并发实现EDM,附坑点分析#1 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

看著身邊優秀的小伙伴們早就開始寫博客,自己深感落后,還好遲做總比不做好,勉勵自己見賢思齊。趁著年前最后一個周末,陽光正好,寫下第一篇博客,為2019年開個頭,以期完成今年為自己立下的flags。

從PHPer轉Gopher,很大一個原因就是業務對性能和并發的持續需求,另一個主要原因就是Go語言原生的并發特性,可以在提供同等高可用的能力下,使用更少的機器資源,節約可觀的成本。因此本文就結合自己在學習Go并發的實戰demo中,把遇到的一些坑點寫下來,共享進步。

1. 在Go語言中實現并發控制,目前主要有三種方式:

a) Channel - 分為無緩沖、有緩沖通道;

b) WaitGroup - sync包提供的goroutine間的同步機制;

c) Context - 在調用鏈不同goroutine間傳遞和共享數據;

本文demo中主要用到了前兩種,基本使用請查看官方文檔。

2. Demo需求與分析:

需求:實現一個EDM的高效郵件發送:需要支持多個國家(可以看成是多個任務),需要記錄每條任務發送的狀態(當前成功、失敗條數),需要支持可暫停(stop)、重新發送(run)操作。

分析:從需求可以看出,在郵件發送中可以通過并發實現多個國家(多個任務)并發、單個任務分批次并發實現快速、高效EDM需求。

3. Demo實戰源碼:

3.1 main.go

package mainimport ("bufio""fmt""io""log""os""strconv""sync""time" )var (batchLength = 20wg sync.WaitGroupfinish = make(chan bool) )func main() {startTime := time.Now().UnixNano()for i := 1; i <= 3; i++ {filename := "./task/edm" + strconv.Itoa(i) + ".txt"start := 60go RunTask(filename, start, batchLength)}// main 阻塞等待goroutine執行完成fmt.Println(<-finish)fmt.Println("finished all tasks.")endTime := time.Now().UnixNano()fmt.Println("Total cost(ms):", (endTime-startTime)/1e6) }// 單任務 func RunTask(filename string, start, length int) (retErr error) {for {readLine, err := ReadLines(filename, start, length)if err == io.EOF {fmt.Println("Read EOF:", filename)retErr = errbreak}if err != nil {fmt.Println(err)retErr = errbreak}fmt.Println("current line:", readLine)start += length// 等待一批完成才進入下一批//wg.Wait()}wg.Wait()finish <- truereturn retErr } 復制代碼

注意上面wg.Wait()的位置(下面有討論),在finish channel之前,目的是為了等待子goroutine運行完,再通過一個無緩沖通道finish通知main goroutine,然后main運行結束。

func ReadLines()讀取指定行數據:

// 讀取指定行數據 func ReadLines(filename string, start, length int) (line int, retErr error) {fmt.Println("current file:", filename)fileObj, err := os.Open(filename)if err != nil {panic(err)}defer fileObj.Close()// 跳過開始行之前的行-ReadString方式startLine := 1endLine := start + lengthreader := bufio.NewReader(fileObj)for {line, err := reader.ReadString(byte('\n'))if err == io.EOF {fmt.Println("Read EOF:", filename)retErr = errbreak}if err != nil {log.Fatal(err)retErr = errbreak}if startLine > start && startLine <= endLine {wg.Add(1)// go并發執行go SendEmail(line)if startLine == endLine {break}}startLine++}return startLine, retErr }// 模擬郵件發送 func SendEmail(email string) error {defer wg.Done()time.Sleep(time.Second * 1)fmt.Println(email)return nil } 復制代碼

運行上面main.go,3個任務在1s內并發完成所有郵件(./task/edm1.txt中一行表示一個郵箱)發送。

truefinished all tasks.Total cost(ms): 1001 復制代碼

那么問題來了:沒有實現分批每次并發batchLength = 20,因為如果不分批發送,只要其中某個任務或某一封郵件出錯了,那下次重新run的時候,會不知道哪些用戶已經發送過了,出現重復發送。而分批發送即使中途出錯了,下一次重新run可從上次出錯的end行開始,最多是[start - end]一個batchLength 發送失敗,可以接受。

于是,將倒數第5行wg.Wait()注釋掉,倒數第8行注釋打開,如下:

// 單任務 func RunTask(filename string, start, length int) (retErr error) {for {readLine, err := ReadLines(filename, start, length)if err == io.EOF {fmt.Println("Read EOF:", filename)retErr = errbreak}if err != nil {fmt.Println(err)retErr = errbreak}fmt.Println("current line:", readLine)start += length// 等待一批完成才進入下一批wg.Wait()}//wg.Wait()finish <- truereturn retErr } 復制代碼

運行就報錯:

panic: sync: WaitGroup is reused before previous Wait has returned 復制代碼

提示WaitGroup在goroutine之間重用了,雖然是全局變量,看起來是使用不當。怎么調整呢?

3.2 main.go

package mainimport ("bufio""fmt""io""log""os""strconv""sync""time" )var (batchLength = 10outerWg sync.WaitGroup )func main() {startTime := time.Now().UnixNano()for i := 1; i <= 3; i++ {filename := "./task/edm" + strconv.Itoa(i) + ".txt"start := 60outerWg.Add(1)go RunTask(filename, start, batchLength)}// main 阻塞等待goroutine執行完成outerWg.Wait()fmt.Println("finished all tasks.")endTime := time.Now().UnixNano()fmt.Println("Total cost(ms):", (endTime-startTime)/1e6) }// 單任務 func RunTask(filename string, start, length int) (retErr error) {for {isFinish := make(chan bool)readLine, err := ReadLines(filename, start, length, isFinish)if err == io.EOF {fmt.Println("Read EOF:", filename)retErr = errbreak}if err != nil {fmt.Println(err)retErr = errbreak}// 等待一批完成才進入下一批fmt.Println("current line:", readLine)start += length<-isFinish// 關閉channel,釋放資源close(isFinish)}outerWg.Done()return retErr } 復制代碼

從上面可以看出:調整的思路是外層用WaitGroup控制,里層用channel 控制,執行又報錯 : (

fatal error: all goroutines are asleep - deadlock!goroutine 1 [semacquire]:sync.runtime_Semacquire(0x55fe7c)/usr/local/go/src/runtime/sema.go:56 +0x39sync.(*WaitGroup).Wait(0x55fe70)/usr/local/go/src/sync/waitgroup.go:131 +0x72main.main()/home/work/data/www/docker_env/www/go/src/WWW/edm/main.go:31 +0x1abgoroutine 5 [chan send]:main.ReadLines(0xc42001c0c0, 0xf, 0x3c, 0xa, 0xc42008e000, 0x0, 0x0, 0x0) 復制代碼

仔細檢查,發現上面代碼中定義的isFinish 是一個無緩沖channel,在發郵件SendMail() 子協程沒有完成時,讀取一個無數據的無緩沖通道將阻塞當前goroutine,其他goroutine也是一樣的都被阻塞,這樣就出現了all goroutines are asleep - deadlock!

于是將上面代碼改為有緩沖繼續嘗試:

isFinish := make(chan bool, 1) // 讀取指定行數據 func ReadLines(filename string, start, length int, isFinish chan bool) (line int, retErr error) {fmt.Println("current file:", filename)// 控制每一批發完再下一批var wg sync.WaitGroupfileObj, err := os.Open(filename)if err != nil {panic(err)}defer fileObj.Close()// 跳過開始行之前的行-ReadString方式startLine := 1endLine := start + lengthreader := bufio.NewReader(fileObj)for {line, err := reader.ReadString(byte('\n'))if err == io.EOF {fmt.Println("Read EOF:", filename)retErr = errbreak}if err != nil {log.Fatal(err)retErr = errbreak}if startLine > start && startLine <= endLine {wg.Add(1)// go并發執行go SendEmail(line, wg)if startLine == endLine {isFinish <- truebreak}}startLine++}wg.Wait()return startLine, retErr }// 模擬郵件發送 func SendEmail(email string, wg sync.WaitGroup) error {defer wg.Done()time.Sleep(time.Second * 1)fmt.Println(email)return nil } 復制代碼

運行,又報錯了 : (

fatal error: all goroutines are asleep - deadlock!goroutine 1 [semacquire]:sync.runtime_Semacquire(0x55fe7c)/usr/local/go/src/runtime/sema.go:56 +0x39sync.(*WaitGroup).Wait(0x55fe70) 復制代碼

這次提示有點不一樣,看起來是里層的WaitGroup 導致了死鎖,繼續檢查發現里層wg 是值傳遞,應該使用指針傳引用。

// go并發執行 go SendEmail(line, wg) 復制代碼

最后修改代碼如下:

// 讀取指定行數據 func ReadLines(filename string, start, length int, isFinish chan bool) (line int, retErr error) {fmt.Println("current file:", filename)// 控制每一批發完再下一批var wg sync.WaitGroupfileObj, err := os.Open(filename)if err != nil {panic(err)}defer fileObj.Close()// 跳過開始行之前的行-ReadString方式startLine := 1endLine := start + lengthreader := bufio.NewReader(fileObj)for {line, err := reader.ReadString(byte('\n'))if err == io.EOF {fmt.Println("Read EOF:", filename)retErr = errbreak}if err != nil {log.Fatal(err)retErr = errbreak}if startLine > start && startLine <= endLine {wg.Add(1)// go并發執行go SendEmail(line, &wg)if startLine == endLine {isFinish <- truebreak}}startLine++}wg.Wait()return startLine, retErr }// 模擬郵件發送 func SendEmail(email string, wg *sync.WaitGroup) error {defer wg.Done()time.Sleep(time.Second * 1)fmt.Println(email)return nil } 復制代碼

趕緊運行一下,這次終于成功啦 : )

current line: 100current file: ./task/edm2.txtRead EOF: ./task/edm2.txtRead EOF: ./task/edm2.txtfinished all tasks.Total cost(ms): 4003 復制代碼

每個任務模擬的是100行,從第60行開始運行,四個任務并發執行,每個任務分批內再次并發,并且控制了每一批次完成后再進行下一批,所以總運行時間約4s,符合期望值。完整源碼請閱讀原文或移步GitHub:github.com/astraw99/ed…

4. 小結:

本文通過兩層嵌套Go 并發,模擬實現了高性能并發EDM,具體的一些出錯行控制、任務中斷與再次執行將在下次繼續討論,主要邏輯已跑通,幾個坑點小結如下:

a) WaitGroup 一般用于main 主協程等待全部子協程退出后,再優雅退出主協程;嵌套使用時注意wg.Wait()放的位置;

b) 合理使用channel,無緩沖chan將阻塞當前goroutine,有緩沖chan在cap未滿的情況下不會阻塞當前goroutine,使用完記得釋放chan資源;

c) 注意函數間傳值或傳引用(本質上還是傳值,傳的指針的指針內存值)的合理使用;



后記:第一篇博客寫到這里差不多算完成了,一不小心一個下午就過去了,寫的邏輯、可讀性可能不太好請見諒,歡迎留言批評指正。感謝您的閱讀。

轉載于:https://juejin.im/post/5c4da16f5188253a317b7637

總結

以上是生活随笔為你收集整理的Go嵌套并发实现EDM,附坑点分析#1的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。