日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

Go 学习笔记(25)— 并发(04)[有缓冲/无缓冲通道、WaitGroup 协程同步、select 多路监听通道、close 关闭通道、channel 传参或作为结构体成员]

發布時間:2023/11/27 生活经验 26 豆豆

1. 無緩沖的通道

無緩沖的通道(unbuffered channel)是指在接收前沒有能力保存任何值的通道。

這種類型的通道要求發送 goroutine 和接收 goroutine 同時準備好,才能完成發送和接收操作。

如果兩個 goroutine 沒有同時準備好,通道會導致先執行發送或接收操作的 goroutine 阻塞等待。這種對通道進行發送和接收的交互行為本身就是同步的。其中任意一個操作都無法離開另一個操作單獨存在。

下圖展示兩個 goroutine 如何利用無緩沖的通道來共享一個值。

  1. 兩個 goroutine 都到達通道,但兩者都沒有開始執行發送或者接收。
  2. 左側的 goroutine 將它的手伸進了通道,這模擬了向通道發送數據的行為。這時,這個 goroutine 會在通道中被鎖住,直到交換完成。
  3. 右側的 goroutine 將它的手放入通道,這模擬了從通道里接收數據。這個 goroutine 一樣也會在通道中被鎖住,直到交換完成。
  4. 進行交換。
  5. 右側的 goroutine 拿到數據。
  6. 兩個 goroutine 都將它們的手從通道里拿出來,這模擬了被鎖住的 goroutine 得到釋放。兩個 goroutine 現在都可以去做別的事情了。

圖:使用無緩沖的通道在 goroutine 之間同步, 摘自 《Go 語言實戰》

package mainimport ("runtime"
)func main() {c := make(chan struct{})go func(i chan struct{}) {sum := 0for i := 0; i <= 10000; i++ {sum += i}println("sum is :", sum)// 寫通道c <- struct{}{}}(c)//NumGoroutine 可以返回當前程序的 goroutine 數目println("NumGoroutine=", runtime.NumGoroutine())// 讀取通道 c, 通過通道進行同步等待<-c
}

無緩沖通道需要發送和接收配對。否則會被阻塞,直到另一方準備好后被喚醒。

package mainimport "fmt"func main() {data := make(chan int)  // 數據交換隊列exit := make(chan bool) // 退出通知go func() {for d := range data { // 從隊列迭代接收數據,直到 close 。fmt.Println(d)}fmt.Println("recv over.")exit <- true // 發出退出通知。}()data <- 1 // 發送數據。data <- 2data <- 3close(data) // 關閉隊列。fmt.Println("send over.")<-exit // 等待退出通知。
}

輸出:

1
2
3
send over.
recv over.

2. 有緩沖的通道

在無緩沖通道的基礎上,為通道增加一個有限大小的存儲空間形成帶緩沖通道。帶緩沖通道在發送時無需等待接收方接收即可完成發送過程,并且不會發生阻塞,只有當存儲空間滿時才會發生阻塞。同理,如果緩沖通道中有數據,接收時將不會發生阻塞,直到通道中沒有數據可讀時,通道將會再度阻塞。

有緩沖的通道(buffered channel)是一種在被接收前能存儲一個或者多個值的通道。

這種類型的通道并不強制要求 goroutine 之間必須同時完成發送和接收。通道會阻塞發送和接收動作的條件也會不同。

只有在通道中沒有要接收的值時,接收動作才會阻塞。只有在通道沒有可用緩沖區容納被發送的值時,發送動作才會阻塞。

這導致有緩沖的通道和無緩沖的通道之間的一個很大的不同:

  • 無緩沖的通道保證進行發送和接收的 goroutine 會在同一時間進行數據交換;

  • 有緩沖的通道沒有這種保證。

在下圖中可以看到兩個 goroutine 分別向有緩沖的通道里增加一個值和從有緩沖的通道里移除一個值。

  1. 右側的 goroutine 正在從通道接收一個值。
  2. 右側的 goroutine 獨立完成了接收值的動作,而左側的 goroutine 正在發送一個新值到通道里。
  3. 左側的 goroutine 還在向通道發送新值,而右側的 goroutine 正在從通道接收另外一個值。這個步驟里的兩個操作既不是同步的,也不會互相阻塞。
  4. 所有的發送和接收都完成,而通道里還有幾個值,也有一些空間可以存更多的值。

圖:使用有緩沖的通道在 goroutine 之間同步數據,摘自 《Go 語言實戰》

有緩沖通道例子

package mainimport ("runtime"
)func main() {c := make(chan struct{})ci := make(chan int, 100)go func(i chan struct{}, j chan int) {for i := 0; i <= 10; i++ {ci <- i}close(ci)// 寫通道c <- struct{}{}}(c, ci)//NumGoroutine 可以返回當前程序的 goroutine 數目println("NumGoroutine=", runtime.NumGoroutine())// 讀取通道 c, 通過通道進行同步等待<-c// 此時ci 通道已經關閉,匿名函數啟動的goroutine 已經退出println("NumGoroutine=", runtime.NumGoroutine())// 但是通道 ci 還可以繼續讀取for v := range ci {println("v is :", v)}
}

異步方式也就是有緩沖的通道通過判斷緩沖區來決定是否阻塞。

  • 緩沖區已滿,發送被阻塞;
  • 緩沖區為空,接收被阻塞;

通常情況下,異步 channel 可減少排隊阻塞,具備更高的效率。但應該考慮使用指針規避大對象拷貝,將多個元素打包,減小緩沖區大小等。

為什么Go語言對通道要限制長度而不提供無限長度的通道?

我們知道通道( channel )是在兩個 goroutine 間通信的橋梁。使用 goroutine 的代碼必然有一方提供數據,一方消費數據。當提供數據一方的數據供給速度大于消費方的數據處理速度時,如果通道不限制長度,那么內存將不斷膨脹直到應用崩潰。

因此,限制通道的長度有利于約束數據提供方的供給速度,供給數據量必須在消費方處理量+通道長度的范圍內,才能正常地處理數據。

package mainimport "fmt"func main() {data := make(chan int, 3) // 緩沖區可以存儲 3 個元素exit := make(chan bool)data <- 1 // 在緩沖區未滿前,不會阻塞。data <- 2data <- 3go func() {for d := range data { // 在緩沖區未空前,不會阻塞。fmt.Println(d)}exit <- true}()data <- 4 // 如果緩沖區已滿,阻塞。data <- 5close(data)<-exit
}

緩沖區是內部屬性,并非類型構成要素。

var a, b chan int = make(chan int), make(chan int, 3)

除用 range 外,還可用 ok-idiom 模式判斷 channel 是否關閉。

for {if d, ok := <-data; ok {fmt.Println(d)} else {break}
}

向 closed channel 發送數據引發 panic 錯誤,接收立即返回零值。而 nil channel,無論收發都會被阻塞。

// 這個示例程序展示如何使用
// 有緩沖的通道和固定數目的
// goroutine來處理一堆工作
package mainimport ("fmt""math/rand""sync""time"
)const (numberGoroutines = 4  // 要使用的goroutine的數量taskLoad         = 10 // 要處理的工作的數量
)// wg用來等待程序完成
var wg sync.WaitGroup// init初始化包,Go語言運行時會在其他代碼執行之前
// 優先執行這個函數
func init() {// 初始化隨機數種子rand.Seed(time.Now().Unix())
}// main是所有Go程序的入口
func main() {// 創建一個有緩沖的通道來管理工作tasks := make(chan string, taskLoad)// 啟動goroutine來處理工作wg.Add(numberGoroutines)for gr := 1; gr <= numberGoroutines; gr++ {go worker(tasks, gr)}// 增加一組要完成的工作for post := 1; post <= taskLoad; post++ {tasks <- fmt.Sprintf("Task : %d", post)}// 當所有工作都處理完時關閉通道// 以便所有goroutine退出close(tasks)// 等待所有工作完成wg.Wait()
}// worker作為goroutine啟動來處理
// 從有緩沖的通道傳入的工作
func worker(tasks chan string, worker int) {// 通知函數已經返回defer wg.Done()for {// 等待分配工作task, ok := <-tasksif !ok {// 這意味著通道已經空了,并且已被關閉fmt.Printf("Worker: %d : Shutting Down\n", worker)return}// 顯示我們開始工作了fmt.Printf("Worker: %d : Started %s\n", worker, task)// 隨機等一段時間來模擬工作sleep := rand.Int63n(100)time.Sleep(time.Duration(sleep) * time.Millisecond)// 顯示我們完成了工作fmt.Printf("Worker: %d : Completed %s\n", worker, task)}
}

輸出:

Worker: 4 : Started Task : 2
Worker: 1 : Started Task : 1
Worker: 2 : Started Task : 3
Worker: 3 : Started Task : 4
Worker: 4 : Completed Task : 2
Worker: 4 : Started Task : 5
Worker: 2 : Completed Task : 3
Worker: 2 : Started Task : 6
Worker: 3 : Completed Task : 4
Worker: 3 : Started Task : 7
Worker: 3 : Completed Task : 7
Worker: 3 : Started Task : 8
Worker: 4 : Completed Task : 5
Worker: 4 : Started Task : 9
Worker: 1 : Completed Task : 1
Worker: 1 : Started Task : 10
Worker: 3 : Completed Task : 8
Worker: 3 : Shutting Down
Worker: 2 : Completed Task : 6
Worker: 2 : Shutting Down
Worker: 1 : Completed Task : 10
Worker: 1 : Shutting Down
Worker: 4 : Completed Task : 9
Worker: 4 : Shutting Down

在main函數的第31行,創建了一個string類型的有緩沖的通道,緩沖的容量是10。在第34行,給WaitGroup賦值為4,代表創建了4個工作 goroutine。之后在第35行到第37行,創建了4個 goroutine,并傳入用來接收工作的通道。在第40行到第42行,將10個字符串發送到通道,模擬發給 goroutine 的工作。一旦最后一個字符串發送到通道,通道就會在第46行關閉,而main函數就會在第49行等待所有工作的完成。

第46行中關閉通道的代碼非常重要。當通道關閉后,goroutine 依舊可以從通道接收數據,但是不能再向通道里發送數據。能夠從已經關閉的通道接收數據這一點非常重要,因為這允許通道關閉后依舊能取出其中緩沖的全部值,而不會有數據丟失。從一個已經關閉且沒有數據的通道里獲取數據,總會立刻返回,并返回一個通道類型的零值。如果在獲取通道時還加入了可選的標志,就能得到通道的狀態信息。

在worker函數里,可以在第58行看到一個無限的for循環。在這個循環里,會處理所有接收到的工作。每個 goroutine 都會在第60行阻塞,等待從通道里接收新的工作。一旦接收到返回,就會檢查ok標志,看通道是否已經清空而且關閉。如果ok的值是false,goroutine 就會終止,并調用第56行通過defer聲明的Done函數,通知main有工作結束。

如果ok標志是true,表示接收到的值是有效的。第71行和第72行模擬了處理的工作。一旦工作完成,goroutine 會再次阻塞在第60行從通道獲取數據的語句。一旦通道被關閉,這個從通道獲取數據的語句會立刻返回,goroutine 也會終止自己。

3. WaitGroup

Go 語言中除了可以使用通道(channel)和互斥鎖進行兩個并發程序間的同步外,還可以使用等待組進行多個任務的同步,等待組可以保證在并發環境中完成指定數量的任務。sync.WaitGroup 類型(以下簡稱WaitGroup類型)是開箱即用的,也是并發安全的。

一般情況下,我會用這個方法來記錄需要等待的 goroutine 的數量。相對應的,這個類型的 Done 方法,用于對其所屬值中計數器的值進行減一操作。我們可以在需要等待的 goroutine 中,通過 defer 語句調用它。而此類型的 Wait 方法的功能是,阻塞當前的 goroutine ,直到其所屬值中的計數器歸零。如果在該方法被調用的時候,那個計數器的值就是 0,那么它將不會做任何事情。

goroutinechan , 一個用于并發,另一個用于通信。沒有緩沖的通道具有同步的功能,除此之外, sync 包也提供了多個 goroutine 同步的機制,主要是通過 WaitGroup 實現的。

WaitGroup 值中計數器的值不能小于 0,是因為這樣會引發一個 panic


如果在一個此類值的 Wait 方法被執行期間,跨越了兩個計數周期,那么就會引發一個 panic 。縱觀上述會引發 panic 的后兩種情況,我們可以總結出這樣一條關于 WaitGroup 值的使用禁忌,

即:不要把增加其計數器值的操作和調用其Wait方法的代碼,放在不同的 goroutine 中執行。換句話說,要杜絕對同一個WaitGroup 值的兩種操作的并發執行。

我們最好用 先統一 Add ,再并發 Done ,最后 Wait 這種標準方式,來使用 WaitGroup 值。 尤其不要在調用 Wait 方法的同時,并發地通過調用 Add 方法去增加其計數器的值,因為這也有可能引發 panic

sync.WaitGroup (等待組)類型中,每個 sync.WaitGroup 值在內部維護著一個計數,此計數的初始默認值為零。

主要的接口如下:

type WaitGroup struct {// contains filtered or unexported fields
}// 添加等待信號
func (wg *WaitGroup) Add(delta int)// 釋放等待信號
func (wg *WaitGroup) Done()// 等待
func (wg *WaitGroup) Wait()
  • WaitGroup 用來等待多個 goroutine 完成;
  • main goroutine 調用 Add 設置需要等待 goroutine 的數目;
  • 每一個 goroutine 結束時調用 Done()
  • Wait()main 用來等待所有的 goroutine 完成;

sync.WaitGroup 內部擁有一個計數器,計數器的值可以通過方法調用實現計數器的增加和減少。當我們添加了 N 個并發任務進行工作時,就將等待組的計數器值增加 N。每個任務完成時,這個值減 1。同時,在另外一個 goroutine 中等待這個等待組的計數器值為 0 時,表示所有任務已經完成。

代碼示例:

package mainimport ("net/http""sync"
)var wg sync.WaitGroup
var urls = []string{"http://www.baidu.com","http://www.sina.com","http://www.qq.com",
}func main() {for _, url := range urls {// 為每一個 url 啟動一個 goroutine,同時給 wg 加 1wg.Add(1)go func(url string) {// 當前go routine 結束后給wg 計數減1, wg.Done() 等價于wg.Add(-1)// defer wg.Add(-1)defer wg.Done()// 發送 http get 請求并打印 http 返回碼resp, err := http.Get(url)if err == nil {println(resp.Status)}}(url)}// 等待所有請求結束wg.Wait()
}

或者不使用匿名函數,如下

package mainimport ("net/http""sync"
)var wg sync.WaitGroup
var urls = []string{"http://www.baidu.com","http://www.sina.com","http://www.qq.com",
}func getURLStatus(url string) {// 當前go routine 結束后給wg 計數減1, wg.Done() 等價于wg.Add(-1)// defer wg.Add(-1)defer wg.Done()// 發送 http get 請求并打印 http 返回碼resp, err := http.Get(url)if err == nil {println(resp.Status)}
}func main() {for _, url := range urls {// 為每一個 url 啟動一個 goroutine,同時給 wg 加 1wg.Add(1)go getURLStatus(url)}// 等待所有請求結束wg.Wait()
}

4. select

select 是類 UNIX 系統提供的一個多路復用系統API, Go 語言借用多路復用的概念,提供了 select 關鍵字,用于多路監昕多個通道。

select 語句只能與通道聯用,它一般由若干個分支組成。每次執行這種語句的時候,一般只有一個分支中的代碼會被運行。

當監聽的通道沒有狀態是可讀或可寫的, select 是阻塞的;只要監聽的通道中有一個狀態是可讀或可寫的,則 select 就不會阻塞,而是進入處理就緒通道的分支流程。如果監聽的通道有多個可讀或可寫的狀態, 則 select 隨機選取一個處理。

select 的特點是只要其中有一個 case 已經完成,程序就會繼續往下執行,而不會考慮其他 case 的情況。

select 的用法與 switch 語言非常類似,由 select 開始一個新的選擇塊,每個選擇條件由 case 語句來描述。與 switch 語句相比, select 有比較多的限制,其中最大的一條限制就是每個 case 語句里必須是一個 IO 操作。結構如下:

select{case 操作1:響應操作1case 操作2:響應操作2default:沒有操作情況
}

操作1、操作2:包含通道收發語句,請參考下表。

操 作語句示例
接收任意數據case <- ch;
接收變量case d := <- ch;
發送數據case ch <- 100;

Go 中,支持通信操作的類型只有 chan ,所以 select 中的 case 條件只能是對 chan 類型變量的讀寫操作。由于 chan 類型變量的讀寫操作可能會引起阻塞,為了在使用 select 選擇器時不陷入阻塞狀態,可以在 select 代碼塊中添加 default 關鍵字,當 case 條件全部都不滿足時,默認進入 default 分支,執行完 default 分支的代碼后,退出 select 選擇器。

package mainimport ("fmt""time"
)func main() {fmt.Println("開始時間:", time.Now().Format("2006-01-02 15:04:05"))select {case <-time.After(time.Second * 2):fmt.Println("2秒后的時間:", time.Now().Format("2006-01-02 15:04:05"))}
}

輸出結果:

開始時間: 2021-02-08 14-14-42
2秒后的時間: 2021-02-08 14:14:44

time.After 函數返回一個通道類型的變量,然后在 case 中從這個通道中讀取信息,如果沒有協程給這個通道發送信息,那么 case 將會一直阻塞。在調用 After 函數時,傳入了一個時長作為參數,意思是從調用 After 函數算起,到設定的時長后,有協程將會向這個通道發送一條消息。當通道收到消息后,這個 case 條件滿足,這個 case 分支下的代碼將會被執

如果沒有任意一條 select 語句可以執行(即所有的通道都被阻塞),那么有如下兩種可能的情況:

  • 如果給出了 default 語句,那么就會執行 default 語句,同時程序的執行會從 select 語句后的語句中恢復;

  • 如果沒有 default 語句,那么 select 語句將被阻塞,直到至少有一個通信可以進行下去;

package mainfunc main() {ch := make(chan int, 1)go func(chan int) {	// go func(ch chan int) { 這樣寫也可以? 為啥?for {select {case ch <- 0:case ch <- 1:}}}(ch)for i := 0; i < 10; i++ {println(<-ch)}}

輸出結果:

1
1
0
1
0
1
0
1
0
1

如果需要同時處理多個 channel ,可使用 select 語句。它隨機選擇一個可用 channel 做收發操作,或執行 default case

package mainimport ("fmt""os"
)func main() {a, b := make(chan int, 3), make(chan int)go func() {v, ok, s := 0, false, ""for {select { // 隨機選擇可?用 channel,接收數據。case v, ok = <-a:s = "a"case v, ok = <-b:s = "b"}if ok {fmt.Println(s, v)} else {os.Exit(0)}}}()for i := 0; i < 5; i++ {select { // 隨機選擇可用 channel,發送數據。case a <- i:case b <- i:}}close(a)select {} // 沒有可用 channel,阻塞 main goroutine。
}

輸出:

a 0
a 1
a 2
a 3
b 4

在循環中使用 select default case 需要小心,避免形成洪水。

  1. 如果在 select 語句中發現某個通道已關閉,那么應該怎樣屏蔽掉它所在的分支?

case 中通過第二個參數判斷 chan 是否關閉,如果關閉則通過 make(chan type) 來對關閉的 channil ,當再次執行到 select 時,因為 channil 會進入阻塞而不會進入候選分支。

package mainimport ("fmt""time"
)func main() {i := 0c := make(chan int, 2)c <- 1c <- 2close(c)for {select {case value, ok := <-c:if !ok {c = make(chan int)fmt.Println("ch is closed")} else {fmt.Printf("value is %#v\n", value)}default:time.Sleep(1e9) // 等待1秒鐘fmt.Println("default, ", i)i = i + 1if i > 3 {return}}}
}

輸出結果:

value is 1
value is 2
ch is closed
default,  0
default,  1
default,  2
default,  3
  1. select 語句與 for 語句聯用時,怎樣直接退出外層的 for 語句?
  • 可以使用 gotolable 跳轉到 for 外面;
  • 可以設置一個額外的標記位,當 chan 關閉時,設置 flag=true ,在 for 的最后判斷 flag 決定是否 break

5. 用 channel 實現信號量 (semaphore)

package mainimport ("fmt""sync"
)func main() {wg := sync.WaitGroup{}wg.Add(3)sem := make(chan int, 1)for i := 0; i < 3; i++ {go func(id int) {defer wg.Done()sem <- 1 // 向 sem 發送數據,阻塞或者成功。for x := 0; x < 3; x++ {fmt.Println(id, x)}<-sem // 接收數據,使得其他阻塞 goroutine 可以發送數據。}(i)}wg.Wait()
}

輸出:

2 0
2 1
2 2
0 0
0 1
0 2
1 0
1 1
1 2

6. 用 closed channel 發出退出通知

close 函數聲明如下:

func close(c chan<- Type)

內置的 close 函數,只能用于 chan 類型變量。使用 close 函數關閉通道后,這個通道不允許被寫入新的信息但是關閉操作不會清除通道中已有的內容,不影響通道被讀取。示例代碼如下:

package main
import ("fmt""time"
)
func write(ch chan int) {for i := 0; i < 10; i++ {ch <- i * 10time.Sleep(time.Second * 1)}close(ch)
}
func read(ch chan int) {for {if val, ok := <-ch; ok {fmt.Println("從通道中讀取值:", val)} else {// 通道被關閉fmt.Println("通道已關閉,退出讀取程序")break}}
}
func main() {var ch = make(chan int, 10)go write(ch)read(ch)
}

上邊的通道讀取操作是:

val,ok := <-ch

當通道被關閉后:

  • 如果從通道中讀取到信息,則 ok 值為 trueval 是一個有效值;
  • 如果從通道中沒有讀取到信息,則 ok 值為 false ,此時的 val 是臟數據,切勿將 okfalse 時的 val 值拿去使用,此時的 val 值是 chan 指定數據類型的默認值。

如果通道沒有被關閉,當從通道中沒有讀取到信息時,讀取操作將會產生程序阻塞。所以使用 close 函數的目的是關閉不會再寫入數據的通道,告訴通道讀取方,所有數據發送完畢。

package mainimport ("sync""time"
)func main() {var wg sync.WaitGroupquit := make(chan bool)for i := 0; i < 2; i++ {wg.Add(1)go func(id int) {defer wg.Done()task := func() {println(id, time.Now().Nanosecond())time.Sleep(time.Second)}for {select {case <-quit: // closed channel 不會阻塞,因此可用作退出通知。returndefault: // 執行正常任務。task()}}}(i)}time.Sleep(time.Second * 5) // 讓測試 goroutine 運行一會。close(quit)                 // 發出退出通知。wg.Wait()
}

7. channel 傳參或者作為結構成員

channel 是第一類對象,可傳參 (內部實現為指針) 或者作為結構成員。

package mainimport "fmt"type Request struct {data []intret  chan int
}func NewRequest(data ...int) *Request {return &Request{data, make(chan int, 1)}
}
func Process(req *Request) {x := 0for _, i := range req.data {x += i}req.ret <- x
}
func main() {req := NewRequest(10, 20, 30)Process(req)fmt.Println(<-req.ret)
}

8. 并發總結

  • 并發是指 goroutine 運行的時候是相互獨立的。
  • 使用關鍵字 go 創建 goroutine 來運行函數。
  • goroutine 在邏輯處理器上執行,而邏輯處理器具有獨立的系統線程和運行隊列。
  • 競爭狀態是指兩個或者多個 goroutine 試圖訪問同一個資源。
  • 原子函數和互斥鎖提供了一種防止出現競爭狀態的辦法。
  • 通道提供了一種在兩個 goroutine 之間共享數據的簡單方法。
  • 無緩沖的通道保證同時交換數據,而有緩沖的通道不做這種保證。

總結

以上是生活随笔為你收集整理的Go 学习笔记(25)— 并发(04)[有缓冲/无缓冲通道、WaitGroup 协程同步、select 多路监听通道、close 关闭通道、channel 传参或作为结构体成员]的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。