Go 学习笔记(25)— 并发(04)[有缓冲/无缓冲通道、WaitGroup 协程同步、select 多路监听通道、close 关闭通道、channel 传参或作为结构体成员]
1. 無緩沖的通道
無緩沖的通道(unbuffered channel)是指在接收前沒有能力保存任何值的通道。
這種類型的通道要求發(fā)送 goroutine 和接收 goroutine 同時(shí)準(zhǔn)備好,才能完成發(fā)送和接收操作。
如果兩個(gè) goroutine 沒有同時(shí)準(zhǔn)備好,通道會導(dǎo)致先執(zhí)行發(fā)送或接收操作的 goroutine 阻塞等待。這種對通道進(jìn)行發(fā)送和接收的交互行為本身就是同步的。其中任意一個(gè)操作都無法離開另一個(gè)操作單獨(dú)存在。
下圖展示兩個(gè) goroutine 如何利用無緩沖的通道來共享一個(gè)值。
- 兩個(gè)
goroutine都到達(dá)通道,但兩者都沒有開始執(zhí)行發(fā)送或者接收。 - 左側(cè)的
goroutine將它的手伸進(jìn)了通道,這模擬了向通道發(fā)送數(shù)據(jù)的行為。這時(shí),這個(gè)goroutine會在通道中被鎖住,直到交換完成。 - 右側(cè)的
goroutine將它的手放入通道,這模擬了從通道里接收數(shù)據(jù)。這個(gè)goroutine一樣也會在通道中被鎖住,直到交換完成。 - 進(jìn)行交換。
- 右側(cè)的
goroutine拿到數(shù)據(jù)。 - 兩個(gè)
goroutine都將它們的手從通道里拿出來,這模擬了被鎖住的goroutine得到釋放。兩個(gè)goroutine現(xiàn)在都可以去做別的事情了。
圖:使用無緩沖的通道在 goroutine 之間同步, 摘自 《Go 語言實(shí)戰(zhàn)》
package mainimport ("runtime"
)func main() {c := make(chan struct{})go func(i chan struct{}) {sum := 0for i := 0; i <= 10000; i++ {sum += i}println("sum is :", sum)// 寫通道c <- struct{}{}}(c)//NumGoroutine 可以返回當(dāng)前程序的 goroutine 數(shù)目println("NumGoroutine=", runtime.NumGoroutine())// 讀取通道 c, 通過通道進(jìn)行同步等待<-c
}
無緩沖通道需要發(fā)送和接收配對。否則會被阻塞,直到另一方準(zhǔn)備好后被喚醒。
package mainimport "fmt"func main() {data := make(chan int) // 數(shù)據(jù)交換隊(duì)列exit := make(chan bool) // 退出通知go func() {for d := range data { // 從隊(duì)列迭代接收數(shù)據(jù),直到 close 。fmt.Println(d)}fmt.Println("recv over.")exit <- true // 發(fā)出退出通知。}()data <- 1 // 發(fā)送數(shù)據(jù)。data <- 2data <- 3close(data) // 關(guān)閉隊(duì)列。fmt.Println("send over.")<-exit // 等待退出通知。
}
輸出:
1
2
3
send over.
recv over.
2. 有緩沖的通道
在無緩沖通道的基礎(chǔ)上,為通道增加一個(gè)有限大小的存儲空間形成帶緩沖通道。帶緩沖通道在發(fā)送時(shí)無需等待接收方接收即可完成發(fā)送過程,并且不會發(fā)生阻塞,只有當(dāng)存儲空間滿時(shí)才會發(fā)生阻塞。同理,如果緩沖通道中有數(shù)據(jù),接收時(shí)將不會發(fā)生阻塞,直到通道中沒有數(shù)據(jù)可讀時(shí),通道將會再度阻塞。
有緩沖的通道(buffered channel)是一種在被接收前能存儲一個(gè)或者多個(gè)值的通道。
這種類型的通道并不強(qiáng)制要求 goroutine 之間必須同時(shí)完成發(fā)送和接收。通道會阻塞發(fā)送和接收動作的條件也會不同。
只有在通道中沒有要接收的值時(shí),接收動作才會阻塞。只有在通道沒有可用緩沖區(qū)容納被發(fā)送的值時(shí),發(fā)送動作才會阻塞。
這導(dǎo)致有緩沖的通道和無緩沖的通道之間的一個(gè)很大的不同:
-
無緩沖的通道保證進(jìn)行發(fā)送和接收的
goroutine會在同一時(shí)間進(jìn)行數(shù)據(jù)交換; -
有緩沖的通道沒有這種保證。
在下圖中可以看到兩個(gè) goroutine 分別向有緩沖的通道里增加一個(gè)值和從有緩沖的通道里移除一個(gè)值。
- 右側(cè)的
goroutine正在從通道接收一個(gè)值。 - 右側(cè)的
goroutine獨(dú)立完成了接收值的動作,而左側(cè)的goroutine正在發(fā)送一個(gè)新值到通道里。 - 左側(cè)的
goroutine還在向通道發(fā)送新值,而右側(cè)的goroutine正在從通道接收另外一個(gè)值。這個(gè)步驟里的兩個(gè)操作既不是同步的,也不會互相阻塞。 - 所有的發(fā)送和接收都完成,而通道里還有幾個(gè)值,也有一些空間可以存更多的值。
圖:使用有緩沖的通道在 goroutine 之間同步數(shù)據(jù),摘自 《Go 語言實(shí)戰(zhàn)》
有緩沖通道例子
package mainimport ("runtime"
)func main() {c := make(chan struct{})ci := make(chan int, 100)go func(i chan struct{}, j chan int) {for i := 0; i <= 10; i++ {ci <- i}close(ci)// 寫通道c <- struct{}{}}(c, ci)//NumGoroutine 可以返回當(dāng)前程序的 goroutine 數(shù)目println("NumGoroutine=", runtime.NumGoroutine())// 讀取通道 c, 通過通道進(jìn)行同步等待<-c// 此時(shí)ci 通道已經(jīng)關(guān)閉,匿名函數(shù)啟動的goroutine 已經(jīng)退出println("NumGoroutine=", runtime.NumGoroutine())// 但是通道 ci 還可以繼續(xù)讀取for v := range ci {println("v is :", v)}
}
異步方式也就是有緩沖的通道通過判斷緩沖區(qū)來決定是否阻塞。
- 緩沖區(qū)已滿,發(fā)送被阻塞;
- 緩沖區(qū)為空,接收被阻塞;
通常情況下,異步 channel 可減少排隊(duì)阻塞,具備更高的效率。但應(yīng)該考慮使用指針規(guī)避大對象拷貝,將多個(gè)元素打包,減小緩沖區(qū)大小等。
為什么Go語言對通道要限制長度而不提供無限長度的通道?
我們知道通道( channel )是在兩個(gè) goroutine 間通信的橋梁。使用 goroutine 的代碼必然有一方提供數(shù)據(jù),一方消費(fèi)數(shù)據(jù)。當(dāng)提供數(shù)據(jù)一方的數(shù)據(jù)供給速度大于消費(fèi)方的數(shù)據(jù)處理速度時(shí),如果通道不限制長度,那么內(nèi)存將不斷膨脹直到應(yīng)用崩潰。
因此,限制通道的長度有利于約束數(shù)據(jù)提供方的供給速度,供給數(shù)據(jù)量必須在消費(fèi)方處理量+通道長度的范圍內(nèi),才能正常地處理數(shù)據(jù)。
package mainimport "fmt"func main() {data := make(chan int, 3) // 緩沖區(qū)可以存儲 3 個(gè)元素exit := make(chan bool)data <- 1 // 在緩沖區(qū)未滿前,不會阻塞。data <- 2data <- 3go func() {for d := range data { // 在緩沖區(qū)未空前,不會阻塞。fmt.Println(d)}exit <- true}()data <- 4 // 如果緩沖區(qū)已滿,阻塞。data <- 5close(data)<-exit
}
緩沖區(qū)是內(nèi)部屬性,并非類型構(gòu)成要素。
var a, b chan int = make(chan int), make(chan int, 3)
除用 range 外,還可用 ok-idiom 模式判斷 channel 是否關(guān)閉。
for {if d, ok := <-data; ok {fmt.Println(d)} else {break}
}
向 closed channel 發(fā)送數(shù)據(jù)引發(fā) panic 錯(cuò)誤,接收立即返回零值。而 nil channel,無論收發(fā)都會被阻塞。
// 這個(gè)示例程序展示如何使用
// 有緩沖的通道和固定數(shù)目的
// goroutine來處理一堆工作
package mainimport ("fmt""math/rand""sync""time"
)const (numberGoroutines = 4 // 要使用的goroutine的數(shù)量taskLoad = 10 // 要處理的工作的數(shù)量
)// wg用來等待程序完成
var wg sync.WaitGroup// init初始化包,Go語言運(yùn)行時(shí)會在其他代碼執(zhí)行之前
// 優(yōu)先執(zhí)行這個(gè)函數(shù)
func init() {// 初始化隨機(jī)數(shù)種子rand.Seed(time.Now().Unix())
}// main是所有Go程序的入口
func main() {// 創(chuàng)建一個(gè)有緩沖的通道來管理工作tasks := make(chan string, taskLoad)// 啟動goroutine來處理工作wg.Add(numberGoroutines)for gr := 1; gr <= numberGoroutines; gr++ {go worker(tasks, gr)}// 增加一組要完成的工作for post := 1; post <= taskLoad; post++ {tasks <- fmt.Sprintf("Task : %d", post)}// 當(dāng)所有工作都處理完時(shí)關(guān)閉通道// 以便所有g(shù)oroutine退出close(tasks)// 等待所有工作完成wg.Wait()
}// worker作為goroutine啟動來處理
// 從有緩沖的通道傳入的工作
func worker(tasks chan string, worker int) {// 通知函數(shù)已經(jīng)返回defer wg.Done()for {// 等待分配工作task, ok := <-tasksif !ok {// 這意味著通道已經(jīng)空了,并且已被關(guān)閉fmt.Printf("Worker: %d : Shutting Down\n", worker)return}// 顯示我們開始工作了fmt.Printf("Worker: %d : Started %s\n", worker, task)// 隨機(jī)等一段時(shí)間來模擬工作sleep := rand.Int63n(100)time.Sleep(time.Duration(sleep) * time.Millisecond)// 顯示我們完成了工作fmt.Printf("Worker: %d : Completed %s\n", worker, task)}
}
輸出:
Worker: 4 : Started Task : 2
Worker: 1 : Started Task : 1
Worker: 2 : Started Task : 3
Worker: 3 : Started Task : 4
Worker: 4 : Completed Task : 2
Worker: 4 : Started Task : 5
Worker: 2 : Completed Task : 3
Worker: 2 : Started Task : 6
Worker: 3 : Completed Task : 4
Worker: 3 : Started Task : 7
Worker: 3 : Completed Task : 7
Worker: 3 : Started Task : 8
Worker: 4 : Completed Task : 5
Worker: 4 : Started Task : 9
Worker: 1 : Completed Task : 1
Worker: 1 : Started Task : 10
Worker: 3 : Completed Task : 8
Worker: 3 : Shutting Down
Worker: 2 : Completed Task : 6
Worker: 2 : Shutting Down
Worker: 1 : Completed Task : 10
Worker: 1 : Shutting Down
Worker: 4 : Completed Task : 9
Worker: 4 : Shutting Down
在main函數(shù)的第31行,創(chuàng)建了一個(gè)string類型的有緩沖的通道,緩沖的容量是10。在第34行,給WaitGroup賦值為4,代表創(chuàng)建了4個(gè)工作 goroutine。之后在第35行到第37行,創(chuàng)建了4個(gè) goroutine,并傳入用來接收工作的通道。在第40行到第42行,將10個(gè)字符串發(fā)送到通道,模擬發(fā)給 goroutine 的工作。一旦最后一個(gè)字符串發(fā)送到通道,通道就會在第46行關(guān)閉,而main函數(shù)就會在第49行等待所有工作的完成。
第46行中關(guān)閉通道的代碼非常重要。當(dāng)通道關(guān)閉后,goroutine 依舊可以從通道接收數(shù)據(jù),但是不能再向通道里發(fā)送數(shù)據(jù)。能夠從已經(jīng)關(guān)閉的通道接收數(shù)據(jù)這一點(diǎn)非常重要,因?yàn)檫@允許通道關(guān)閉后依舊能取出其中緩沖的全部值,而不會有數(shù)據(jù)丟失。從一個(gè)已經(jīng)關(guān)閉且沒有數(shù)據(jù)的通道里獲取數(shù)據(jù),總會立刻返回,并返回一個(gè)通道類型的零值。如果在獲取通道時(shí)還加入了可選的標(biāo)志,就能得到通道的狀態(tài)信息。
在worker函數(shù)里,可以在第58行看到一個(gè)無限的for循環(huán)。在這個(gè)循環(huán)里,會處理所有接收到的工作。每個(gè) goroutine 都會在第60行阻塞,等待從通道里接收新的工作。一旦接收到返回,就會檢查ok標(biāo)志,看通道是否已經(jīng)清空而且關(guān)閉。如果ok的值是false,goroutine 就會終止,并調(diào)用第56行通過defer聲明的Done函數(shù),通知main有工作結(jié)束。
如果ok標(biāo)志是true,表示接收到的值是有效的。第71行和第72行模擬了處理的工作。一旦工作完成,goroutine 會再次阻塞在第60行從通道獲取數(shù)據(jù)的語句。一旦通道被關(guān)閉,這個(gè)從通道獲取數(shù)據(jù)的語句會立刻返回,goroutine 也會終止自己。
3. WaitGroup
Go 語言中除了可以使用通道(channel)和互斥鎖進(jìn)行兩個(gè)并發(fā)程序間的同步外,還可以使用等待組進(jìn)行多個(gè)任務(wù)的同步,等待組可以保證在并發(fā)環(huán)境中完成指定數(shù)量的任務(wù)。sync.WaitGroup 類型(以下簡稱WaitGroup類型)是開箱即用的,也是并發(fā)安全的。
一般情況下,我會用這個(gè)方法來記錄需要等待的 goroutine 的數(shù)量。相對應(yīng)的,這個(gè)類型的 Done 方法,用于對其所屬值中計(jì)數(shù)器的值進(jìn)行減一操作。我們可以在需要等待的 goroutine 中,通過 defer 語句調(diào)用它。而此類型的 Wait 方法的功能是,阻塞當(dāng)前的 goroutine ,直到其所屬值中的計(jì)數(shù)器歸零。如果在該方法被調(diào)用的時(shí)候,那個(gè)計(jì)數(shù)器的值就是 0,那么它將不會做任何事情。
goroutine 和 chan , 一個(gè)用于并發(fā),另一個(gè)用于通信。沒有緩沖的通道具有同步的功能,除此之外, sync 包也提供了多個(gè) goroutine 同步的機(jī)制,主要是通過 WaitGroup 實(shí)現(xiàn)的。
WaitGroup 值中計(jì)數(shù)器的值不能小于 0,是因?yàn)檫@樣會引發(fā)一個(gè) panic 。
如果在一個(gè)此類值的 Wait 方法被執(zhí)行期間,跨越了兩個(gè)計(jì)數(shù)周期,那么就會引發(fā)一個(gè) panic 。縱觀上述會引發(fā) panic 的后兩種情況,我們可以總結(jié)出這樣一條關(guān)于 WaitGroup 值的使用禁忌,
即:不要把增加其計(jì)數(shù)器值的操作和調(diào)用其Wait方法的代碼,放在不同的 goroutine 中執(zhí)行。換句話說,要杜絕對同一個(gè)WaitGroup 值的兩種操作的并發(fā)執(zhí)行。
我們最好用 先統(tǒng)一 Add ,再并發(fā) Done ,最后 Wait 這種標(biāo)準(zhǔn)方式,來使用 WaitGroup 值。 尤其不要在調(diào)用 Wait 方法的同時(shí),并發(fā)地通過調(diào)用 Add 方法去增加其計(jì)數(shù)器的值,因?yàn)檫@也有可能引發(fā) panic 。
在 sync.WaitGroup (等待組)類型中,每個(gè) sync.WaitGroup 值在內(nèi)部維護(hù)著一個(gè)計(jì)數(shù),此計(jì)數(shù)的初始默認(rèn)值為零。
主要的接口如下:
type WaitGroup struct {// contains filtered or unexported fields
}// 添加等待信號
func (wg *WaitGroup) Add(delta int)// 釋放等待信號
func (wg *WaitGroup) Done()// 等待
func (wg *WaitGroup) Wait()
WaitGroup用來等待多個(gè)goroutine完成;main goroutine調(diào)用Add設(shè)置需要等待goroutine的數(shù)目;- 每一個(gè)
goroutine結(jié)束時(shí)調(diào)用Done(); Wait()被main用來等待所有的goroutine完成;
sync.WaitGroup 內(nèi)部擁有一個(gè)計(jì)數(shù)器,計(jì)數(shù)器的值可以通過方法調(diào)用實(shí)現(xiàn)計(jì)數(shù)器的增加和減少。當(dāng)我們添加了 N 個(gè)并發(fā)任務(wù)進(jìn)行工作時(shí),就將等待組的計(jì)數(shù)器值增加 N。每個(gè)任務(wù)完成時(shí),這個(gè)值減 1。同時(shí),在另外一個(gè) goroutine 中等待這個(gè)等待組的計(jì)數(shù)器值為 0 時(shí),表示所有任務(wù)已經(jīng)完成。
代碼示例:
package mainimport ("net/http""sync"
)var wg sync.WaitGroup
var urls = []string{"http://www.baidu.com","http://www.sina.com","http://www.qq.com",
}func main() {for _, url := range urls {// 為每一個(gè) url 啟動一個(gè) goroutine,同時(shí)給 wg 加 1wg.Add(1)go func(url string) {// 當(dāng)前go routine 結(jié)束后給wg 計(jì)數(shù)減1, wg.Done() 等價(jià)于wg.Add(-1)// defer wg.Add(-1)defer wg.Done()// 發(fā)送 http get 請求并打印 http 返回碼resp, err := http.Get(url)if err == nil {println(resp.Status)}}(url)}// 等待所有請求結(jié)束wg.Wait()
}
或者不使用匿名函數(shù),如下
package mainimport ("net/http""sync"
)var wg sync.WaitGroup
var urls = []string{"http://www.baidu.com","http://www.sina.com","http://www.qq.com",
}func getURLStatus(url string) {// 當(dāng)前go routine 結(jié)束后給wg 計(jì)數(shù)減1, wg.Done() 等價(jià)于wg.Add(-1)// defer wg.Add(-1)defer wg.Done()// 發(fā)送 http get 請求并打印 http 返回碼resp, err := http.Get(url)if err == nil {println(resp.Status)}
}func main() {for _, url := range urls {// 為每一個(gè) url 啟動一個(gè) goroutine,同時(shí)給 wg 加 1wg.Add(1)go getURLStatus(url)}// 等待所有請求結(jié)束wg.Wait()
}
4. select
select 是類 UNIX 系統(tǒng)提供的一個(gè)多路復(fù)用系統(tǒng)API, Go 語言借用多路復(fù)用的概念,提供了 select 關(guān)鍵字,用于多路監(jiān)昕多個(gè)通道。
select 語句只能與通道聯(lián)用,它一般由若干個(gè)分支組成。每次執(zhí)行這種語句的時(shí)候,一般只有一個(gè)分支中的代碼會被運(yùn)行。
當(dāng)監(jiān)聽的通道沒有狀態(tài)是可讀或可寫的, select 是阻塞的;只要監(jiān)聽的通道中有一個(gè)狀態(tài)是可讀或可寫的,則 select 就不會阻塞,而是進(jìn)入處理就緒通道的分支流程。如果監(jiān)聽的通道有多個(gè)可讀或可寫的狀態(tài), 則 select 隨機(jī)選取一個(gè)處理。
select 的特點(diǎn)是只要其中有一個(gè) case 已經(jīng)完成,程序就會繼續(xù)往下執(zhí)行,而不會考慮其他 case 的情況。
select 的用法與 switch 語言非常類似,由 select 開始一個(gè)新的選擇塊,每個(gè)選擇條件由 case 語句來描述。與 switch 語句相比, select 有比較多的限制,其中最大的一條限制就是每個(gè) case 語句里必須是一個(gè) IO 操作。結(jié)構(gòu)如下:
select{case 操作1:響應(yīng)操作1case 操作2:響應(yīng)操作2…default:沒有操作情況
}
操作1、操作2:包含通道收發(fā)語句,請參考下表。
| 操 作 | 語句示例 |
|---|---|
| 接收任意數(shù)據(jù) | case <- ch; |
| 接收變量 | case d := <- ch; |
| 發(fā)送數(shù)據(jù) | case ch <- 100; |
在 Go 中,支持通信操作的類型只有 chan ,所以 select 中的 case 條件只能是對 chan 類型變量的讀寫操作。由于 chan 類型變量的讀寫操作可能會引起阻塞,為了在使用 select 選擇器時(shí)不陷入阻塞狀態(tài),可以在 select 代碼塊中添加 default 關(guān)鍵字,當(dāng) case 條件全部都不滿足時(shí),默認(rèn)進(jìn)入 default 分支,執(zhí)行完 default 分支的代碼后,退出 select 選擇器。
package mainimport ("fmt""time"
)func main() {fmt.Println("開始時(shí)間:", time.Now().Format("2006-01-02 15:04:05"))select {case <-time.After(time.Second * 2):fmt.Println("2秒后的時(shí)間:", time.Now().Format("2006-01-02 15:04:05"))}
}
輸出結(jié)果:
開始時(shí)間: 2021-02-08 14-14-42
2秒后的時(shí)間: 2021-02-08 14:14:44
time.After函數(shù)返回一個(gè)通道類型的變量,然后在case中從這個(gè)通道中讀取信息,如果沒有協(xié)程給這個(gè)通道發(fā)送信息,那么case將會一直阻塞。在調(diào)用After函數(shù)時(shí),傳入了一個(gè)時(shí)長作為參數(shù),意思是從調(diào)用After函數(shù)算起,到設(shè)定的時(shí)長后,有協(xié)程將會向這個(gè)通道發(fā)送一條消息。當(dāng)通道收到消息后,這個(gè)case條件滿足,這個(gè)case分支下的代碼將會被執(zhí)
如果沒有任意一條 select 語句可以執(zhí)行(即所有的通道都被阻塞),那么有如下兩種可能的情況:
-
如果給出了
default語句,那么就會執(zhí)行default語句,同時(shí)程序的執(zhí)行會從select語句后的語句中恢復(fù); -
如果沒有
default語句,那么select語句將被阻塞,直到至少有一個(gè)通信可以進(jìn)行下去;
package mainfunc main() {ch := make(chan int, 1)go func(chan int) { // go func(ch chan int) { 這樣寫也可以? 為啥?for {select {case ch <- 0:case ch <- 1:}}}(ch)for i := 0; i < 10; i++ {println(<-ch)}}
輸出結(jié)果:
1
1
0
1
0
1
0
1
0
1
如果需要同時(shí)處理多個(gè) channel ,可使用 select 語句。它隨機(jī)選擇一個(gè)可用 channel 做收發(fā)操作,或執(zhí)行 default case 。
package mainimport ("fmt""os"
)func main() {a, b := make(chan int, 3), make(chan int)go func() {v, ok, s := 0, false, ""for {select { // 隨機(jī)選擇可?用 channel,接收數(shù)據(jù)。case v, ok = <-a:s = "a"case v, ok = <-b:s = "b"}if ok {fmt.Println(s, v)} else {os.Exit(0)}}}()for i := 0; i < 5; i++ {select { // 隨機(jī)選擇可用 channel,發(fā)送數(shù)據(jù)。case a <- i:case b <- i:}}close(a)select {} // 沒有可用 channel,阻塞 main goroutine。
}
輸出:
a 0
a 1
a 2
a 3
b 4
在循環(huán)中使用 select default case 需要小心,避免形成洪水。
- 如果在
select語句中發(fā)現(xiàn)某個(gè)通道已關(guān)閉,那么應(yīng)該怎樣屏蔽掉它所在的分支?
在 case 中通過第二個(gè)參數(shù)判斷 chan 是否關(guān)閉,如果關(guān)閉則通過 make(chan type) 來對關(guān)閉的 chan 置 nil ,當(dāng)再次執(zhí)行到 select 時(shí),因?yàn)?chan 時(shí) nil 會進(jìn)入阻塞而不會進(jìn)入候選分支。
package mainimport ("fmt""time"
)func main() {i := 0c := make(chan int, 2)c <- 1c <- 2close(c)for {select {case value, ok := <-c:if !ok {c = make(chan int)fmt.Println("ch is closed")} else {fmt.Printf("value is %#v\n", value)}default:time.Sleep(1e9) // 等待1秒鐘fmt.Println("default, ", i)i = i + 1if i > 3 {return}}}
}
輸出結(jié)果:
value is 1
value is 2
ch is closed
default, 0
default, 1
default, 2
default, 3
- 在
select語句與for語句聯(lián)用時(shí),怎樣直接退出外層的for語句?
- 可以使用
goto加lable跳轉(zhuǎn)到for外面; - 可以設(shè)置一個(gè)額外的標(biāo)記位,當(dāng)
chan關(guān)閉時(shí),設(shè)置flag=true,在for的最后判斷flag決定是否break;
5. 用 channel 實(shí)現(xiàn)信號量 (semaphore)
package mainimport ("fmt""sync"
)func main() {wg := sync.WaitGroup{}wg.Add(3)sem := make(chan int, 1)for i := 0; i < 3; i++ {go func(id int) {defer wg.Done()sem <- 1 // 向 sem 發(fā)送數(shù)據(jù),阻塞或者成功。for x := 0; x < 3; x++ {fmt.Println(id, x)}<-sem // 接收數(shù)據(jù),使得其他阻塞 goroutine 可以發(fā)送數(shù)據(jù)。}(i)}wg.Wait()
}
輸出:
2 0
2 1
2 2
0 0
0 1
0 2
1 0
1 1
1 2
6. 用 closed channel 發(fā)出退出通知
close 函數(shù)聲明如下:
func close(c chan<- Type)
內(nèi)置的 close 函數(shù),只能用于 chan 類型變量。使用 close 函數(shù)關(guān)閉通道后,這個(gè)通道不允許被寫入新的信息,但是關(guān)閉操作不會清除通道中已有的內(nèi)容,不影響通道被讀取。示例代碼如下:
package main
import ("fmt""time"
)
func write(ch chan int) {for i := 0; i < 10; i++ {ch <- i * 10time.Sleep(time.Second * 1)}close(ch)
}
func read(ch chan int) {for {if val, ok := <-ch; ok {fmt.Println("從通道中讀取值:", val)} else {// 通道被關(guān)閉fmt.Println("通道已關(guān)閉,退出讀取程序")break}}
}
func main() {var ch = make(chan int, 10)go write(ch)read(ch)
}
上邊的通道讀取操作是:
val,ok := <-ch
當(dāng)通道被關(guān)閉后:
- 如果從通道中讀取到信息,則
ok值為true,val是一個(gè)有效值; - 如果從通道中沒有讀取到信息,則
ok值為false,此時(shí)的val是臟數(shù)據(jù),切勿將ok為false時(shí)的val值拿去使用,此時(shí)的val值是chan指定數(shù)據(jù)類型的默認(rèn)值。
如果通道沒有被關(guān)閉,當(dāng)從通道中沒有讀取到信息時(shí),讀取操作將會產(chǎn)生程序阻塞。所以使用 close 函數(shù)的目的是關(guān)閉不會再寫入數(shù)據(jù)的通道,告訴通道讀取方,所有數(shù)據(jù)發(fā)送完畢。
package mainimport ("sync""time"
)func main() {var wg sync.WaitGroupquit := make(chan bool)for i := 0; i < 2; i++ {wg.Add(1)go func(id int) {defer wg.Done()task := func() {println(id, time.Now().Nanosecond())time.Sleep(time.Second)}for {select {case <-quit: // closed channel 不會阻塞,因此可用作退出通知。returndefault: // 執(zhí)行正常任務(wù)。task()}}}(i)}time.Sleep(time.Second * 5) // 讓測試 goroutine 運(yùn)行一會。close(quit) // 發(fā)出退出通知。wg.Wait()
}
7. channel 傳參或者作為結(jié)構(gòu)成員
channel 是第一類對象,可傳參 (內(nèi)部實(shí)現(xiàn)為指針) 或者作為結(jié)構(gòu)成員。
package mainimport "fmt"type Request struct {data []intret chan int
}func NewRequest(data ...int) *Request {return &Request{data, make(chan int, 1)}
}
func Process(req *Request) {x := 0for _, i := range req.data {x += i}req.ret <- x
}
func main() {req := NewRequest(10, 20, 30)Process(req)fmt.Println(<-req.ret)
}
8. 并發(fā)總結(jié)
- 并發(fā)是指
goroutine運(yùn)行的時(shí)候是相互獨(dú)立的。 - 使用關(guān)鍵字
go創(chuàng)建goroutine來運(yùn)行函數(shù)。 goroutine在邏輯處理器上執(zhí)行,而邏輯處理器具有獨(dú)立的系統(tǒng)線程和運(yùn)行隊(duì)列。- 競爭狀態(tài)是指兩個(gè)或者多個(gè)
goroutine試圖訪問同一個(gè)資源。 - 原子函數(shù)和互斥鎖提供了一種防止出現(xiàn)競爭狀態(tài)的辦法。
- 通道提供了一種在兩個(gè)
goroutine之間共享數(shù)據(jù)的簡單方法。 - 無緩沖的通道保證同時(shí)交換數(shù)據(jù),而有緩沖的通道不做這種保證。
總結(jié)
以上是生活随笔為你收集整理的Go 学习笔记(25)— 并发(04)[有缓冲/无缓冲通道、WaitGroup 协程同步、select 多路监听通道、close 关闭通道、channel 传参或作为结构体成员]的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 乌镇二月份穿什么衣服合适
- 下一篇: Go 学习笔记(26)— Go 习惯用法