Go并发编程中的那些事[译]
- 原文地址:Concurrent programming
- 原文作者:StefanNilsson
- 譯文出自:掘金翻譯計劃
- 本文永久鏈接:github.com/xitu/gold-m…
- 譯者:kobehaha
- 校對者:joyking7?alfred-zhong
本文講的是Go并發(fā)編程中的那些事,
bouncing balls- 1. 多線程執(zhí)行
- 2. Channels
- 3. 同步
- 4. 死鎖
- 5. 數(shù)據(jù)競爭
- 6. 互斥鎖
- 7. 檢測數(shù)據(jù)競爭
- 8. Select標(biāo)識符
- 9. 最基本的并發(fā)實例
- 10. 并行計算
這篇文章將會以Go語言舉例介紹并發(fā)編程,包括以下內(nèi)容
- 線程的并發(fā)執(zhí)行(goroutines)
- 基本的同步技術(shù)(channel和鎖)
- Go中的基本并發(fā)模式
- 死鎖和數(shù)據(jù)競爭
- 并行計算
開始之前,你需要去了解怎樣寫最基本的 Go 程序。 如果你已經(jīng)對 C/C++,Java 或者Python比較熟悉,A tour of go將會給你一些幫助。你也可以看一下Go for C++ programmers?或者Go for Java programmers。
1.多線程執(zhí)行
goroutine?是 go 的一種調(diào)度機制。 Go 使用 go 進行聲明,以 goroutine 調(diào)度機制開啟一個新的執(zhí)行線程。它會在新創(chuàng)建的 goroutine 執(zhí)行程序。在單個程序中,所有g(shù)oroutines都是共享相同的地址空間。
相比于分配棧空間,goroutine 更加輕量,花銷更小。棧空間初始化很小,需要通過申請和釋放堆空間來擴展內(nèi)存。Goroutines 內(nèi)部是被復(fù)用在多個操作系統(tǒng)線程上。如果一個goroutine阻塞了一個操作系統(tǒng)線程,比如正在等待輸入,此時,這個線程中的其他 goroutine 為了保證繼續(xù)運行,將會遷移到其他線程中,而你不需要去關(guān)心這些細節(jié)。
下面的程序?qū)蛴?"Hello from main goroutine". 是否打印"Hello from another goroutine",取決于兩個goroutines誰先完成.
func main() {go fmt.Println("Hello from another goroutine")fmt.Println("Hello from main goroutine")// 程序執(zhí)行到這,所有活著的goroutines都會被殺掉}goroutine1.go
下一段程序?"Hello from main goroutine"?和?"Hello from another goroutine"?可能會以任何順序打印。但有一種可能性是第二個goroutine運行的非常慢,以至于到程序結(jié)束之前都不會打印。
func main() {go fmt.Println("Hello from another goroutine")fmt.Println("Hello from main goroutine")time.Sleep(time.Second) // 為其他goroutine完成等1秒鐘 }goroutine2.go
這有一個更實際的例子,我們定義一個使用并發(fā)來推遲事件的函數(shù)。
// 在指定時間過期后,文本會被打印到標(biāo)準輸出 // 這無論如何都不會被阻塞 func Publish(text string, delay time.Duration) {go func() {time.Sleep(delay)fmt.Println("BREAKING NEWS:", text)}() // 注意括號。我們必須調(diào)用匿名函數(shù) }publish1.go
你可能用下面的方式調(diào)用?Publish?函數(shù)
func main() {Publish("A goroutine starts a new thread of execution.", 5*time.Second)fmt.Println("Let’s hope the news will published before I leave.")// 等待消息被發(fā)布time.Sleep(10 * time.Second)fmt.Println("Ten seconds later: I’m leaving now.") }publish1.go
該程序很有可能按以下順序打印三行,每行輸出會間隔五秒鐘。
$ go run publish1.go Let’s hope the news will published before I leave. BREAKING NEWS: A goroutine starts a new thread of execution. Ten seconds later: I’m leaving now.一般來說,我們不可能讓線程休眠去等待對方。在下一節(jié)中, 我們將會介紹 Go 的一種同步機制,?channels?。然后演示如何使用channel來讓一個 goruntine 等待另外的 goruntine。
2. Channels
Sushi conveyor belt壽司輸送帶
channel?是一種 Go 語言結(jié)構(gòu),它通過傳遞特定元素類型的值來為兩個 goroutines 提供同步執(zhí)行和交流數(shù)據(jù)的機制
。?<-?標(biāo)識符表示了channel的傳輸方向,接收或者發(fā)送。如果沒有指定方向。那么 channel 就是雙向的。
Channels 是一種被 make 分配的引用類型
ic := make(chan int) // 不帶緩存的 int channel wc := make(chan *Work, 10) // 帶緩沖工作的 channel通過 channel 發(fā)送值,可使用 <- 作為二元運算符。通過 channel 接收值,可使用它作為一元運算符。
ic <- 3 // 向channel中發(fā)送3 work := <-wc // 從channel中接收指針到work如果 channel 是無緩沖的,發(fā)送者會一直阻塞直到有接收者從中接收值。如果是帶緩沖的,只有當(dāng)值被拷貝到緩沖區(qū)且緩沖區(qū)已滿時,發(fā)送者才會阻塞直到有接收者從中接收。接收者會一直阻塞直到 channel 中有值可被接收。
關(guān)閉
close?的作用是保證不能再向 channel 中發(fā)送值。 channel 被關(guān)閉后,仍然是可以從中接收值的。接收操作會獲得零值而不會阻塞。多值接收操作會額外返回一個布爾值,表示該值是否被發(fā)送的。
ch := make(chan string) go func() {ch <- "Hello!"close(ch) }() fmt.Println(<-ch) // 打印 "Hello!" fmt.Println(<-ch) // 不阻塞的打印空值 "" fmt.Println(<-ch) // 再一次打印 "" v, ok := <-ch // v 的值是 "" , ok 的值是 false伴有 range 分句的 for 語句會連續(xù)讀取通過 channel 發(fā)送的值,直到 channel 被關(guān)閉
func main() {var ch <-chan Sushi = Producer()for s := range ch {fmt.Println("Consumed", s)} }func Producer() <-chan Sushi {ch := make(chan Sushi)go func() {ch <- Sushi("海老握り") // Ebi nigirich <- Sushi("鮪とろ握り") // Toro nigiriclose(ch)}()return ch }sushi.go
3.同步
下一個例子中,Publish?函數(shù)返回一個channel,它會把發(fā)送的文本當(dāng)做消息廣播出去。
// 指定時間過期后函數(shù)Publish將會打印文本到標(biāo)準輸出. // 當(dāng)文本被發(fā)布channel將會被關(guān)閉. func Publish(text string, delay time.Duration) (wait <-chan struct{}) {ch := make(chan struct{})go func() {time.Sleep(delay)fmt.Println("BREAKING NEWS:", text)close(ch) // broadcast – a closed channel sends a zero value forever}()return ch }publish2.go
注意我們使用一個空結(jié)構(gòu)的 channel :?struct{}。 這表明該 channel 僅僅用于信號,而不是傳遞數(shù)據(jù)。
你可能會這樣使用該函數(shù)
func main() {wait := Publish("Channels let goroutines communicate.", 5*time.Second)fmt.Println("Waiting for the news...")<-waitfmt.Println("The news is out, time to leave.") }publish2.go
程序?qū)唇o出的順序打印下列三行信息。在信息發(fā)送后,最后一行會立刻出現(xiàn)
$ go run publish2.go Waiting for the news... BREAKING NEWS: Channels let goroutines communicate. The news is out, time to leave.4.死鎖
traffic jam讓我們?nèi)ソ榻B?Publish?函數(shù)中的一個bug。
func Publish(text string, delay time.Duration) (wait <-chan struct{}) {ch := make(chan struct{})go func() {time.Sleep(delay)fmt.Println("BREAKING NEWS:", text)**//close(ch)**}()return ch }這時由?Publish?函數(shù)開啟的 goroutine 打印重要信息然后退出,留下主 goroutine 繼續(xù)等待。
func main() {wait := Publish("Channels let goroutines communicate.", 5*time.Second)fmt.Println("Waiting for the news...")**<-wait**fmt.Println("The news is out, time to leave.") }在某些情況下,程序?qū)⒉粫腥魏芜M展,這種情況被稱為死鎖。
deadlock?是線程之間相互等待而都不能繼續(xù)執(zhí)行的一種情況
在運行時,Go 對于運行時死鎖檢測具有良好支持。但在某種情況下goroutine無法取得任何進展,這時Go程序會提供一個詳細的錯誤信息. 下面就是我們崩潰程序的日志:
Waiting for the news... BREAKING NEWS: Channels let goroutines communicate. fatal error: all goroutines are asleep - deadlock!goroutine 1 [chan receive]: main.main().../goroutineStop.go:11 +0xf6goroutine 2 [syscall]: created by runtime.main.../go/src/pkg/runtime/proc.c:225goroutine 4 [timer goroutine (idle)]: created by addtimer.../go/src/pkg/runtime/ztime_linux_amd64.c:73多數(shù)情況下下,在 Go 程序中很容易搞清楚是什么導(dǎo)致了死鎖。接著就是如何去修復(fù)它了。
5. 數(shù)據(jù)競爭
死鎖可能聽起來很糟糕, 但是真正給并發(fā)編程帶來災(zāi)難的是數(shù)據(jù)競爭。它們相當(dāng)常見,而且難于調(diào)試。
一個?數(shù)據(jù)競爭?發(fā)生在當(dāng)兩個線程并發(fā)訪問相同的變量,同時最少有一個訪問是在寫.
數(shù)據(jù)競爭是沒有規(guī)律的。舉個例子,打印數(shù)字1,嘗試找出它是如何發(fā)生的 — 一個可能的解釋是在代碼之后.
func race() {wait := make(chan struct{})n := 0go func() {**n++** // 一次操作:讀,增長,寫close(wait)}()**n++** // 另一個沖突訪問<-waitfmt.Println(n) // 輸出: 不確定 }datarace.go
兩個goroutines,?g1?和?g2, 在競爭過程中,我們無法知道他們執(zhí)行的順序.下面只是許多可能的結(jié)果性的一種.
- g1?從n變量中讀取值0
- g2?從n變量中讀取值0
- g1?增加它的值從0變?yōu)?
- g1?把它的值把1賦值給n
- g2?增加它的值從0到1
- g2?把它的值把1賦值給n
- 這段程序?qū)蛴的值,它的值為1
"數(shù)據(jù)競爭” 的稱呼多少有些誤導(dǎo),不僅僅是他的執(zhí)行順序無法被設(shè)定,而且也無法保證接下來會發(fā)生的情況。編譯器和硬件時常會為了更好的性能而調(diào)整代碼的順序。如果你仔細觀察一個正在運行的線程,那么你才可能會看到更多細節(jié)。
mid action避免數(shù)據(jù)競爭的唯一方式是同步操作在線程間所有共享的可變數(shù)據(jù)。存在幾種方式,在Go中,可能最多使用 channel 或者 lock。較底層的操作可使用?sync?and?sync/atomic?包,這里不再討論。
在Go中,處理并發(fā)數(shù)據(jù)訪問的首選方式是使用一個 channel,它將數(shù)據(jù)從一個goroutine傳遞到另一個goroutine。有一句經(jīng)典的話:"不要通過共享內(nèi)存來傳遞數(shù)據(jù);而要通過傳遞數(shù)據(jù)來共享內(nèi)存"。
func sharingIsCaring() {ch := make(chan int)go func() {n := 0 // 局部變量只能對當(dāng)前 goroutine 可見n++ch <- n // 數(shù)據(jù)通過 goroutine 傳遞}()n := <-ch // ...從另外一個 goroutine 中安全接受n++fmt.Println(n) // 輸出: 2 }datarace.go
在這份代碼中 channel 充當(dāng)了雙重角色。它作為一個同步點,在不同 goroutine 中傳遞數(shù)據(jù)。發(fā)送的 goroutine 將會等待其它的 goroutine 去接收數(shù)據(jù),而接收的 goroutine 將會等待其他的 goroutine 去發(fā)送數(shù)據(jù)。
Go內(nèi)存模型?- 當(dāng)一個 goroutine 在讀一個變量,另外一個goroutine在寫相同的變量,這個過程實際上是非常復(fù)雜的,但是只要你用 channel 在不同goroutines中共享數(shù)據(jù),那么這個操作就是安全的。
6. 互斥鎖
lock有時通過直接鎖定來同步數(shù)據(jù)比使用 channel 更加方便。為此,Go 標(biāo)準庫提供了互斥鎖sync.Mutex。
要讓這種類型的鎖正確工作,所有對于共享數(shù)據(jù)的操作(包括讀和寫)必須在一個 goroutine 持有該鎖時進行。這一點至關(guān)重要,goroutine 的一次錯誤就足以破壞程序和導(dǎo)致數(shù)據(jù)競爭。
因此你需要為API去設(shè)計一種定制化的數(shù)據(jù)結(jié)構(gòu),并且確保所有同步操作都在內(nèi)部執(zhí)行。在這個例子中,我們構(gòu)建了一種安全易用的并發(fā)數(shù)據(jù)結(jié)構(gòu),AtomicInt,它存儲了單個整型,任何goroutines 都能安全的通過?Add?和?Value?方法訪問數(shù)字。
// AtomicInt 是一種持有int類型的支持并發(fā)的數(shù)據(jù)結(jié)構(gòu)。 // 它的初始化值為0. type AtomicInt struct {mu sync.Mutex // 同一時間只能有一個 goroutine 持有鎖。n int }// Add adds n to the AtomicInt as a single atomic operation. // 原子性的將n增加到AtomicInt中 func (a *AtomicInt) Add(n int) {a.mu.Lock() // 等待鎖被釋放然后獲取。a.n += na.mu.Unlock() // 釋放鎖。 }// 返回a的值. func (a *AtomicInt) Value() int {a.mu.Lock()n := a.na.mu.Unlock()return n }func lockItUp() {wait := make(chan struct{})var n AtomicIntgo func() {n.Add(1) // one accessclose(wait)}()n.Add(1) // 另一個并發(fā)訪問<-waitfmt.Println(n.Value()) // Output: 2 }datarace.go
7. 檢測數(shù)據(jù)競爭
競爭有時候難以檢測。當(dāng)我執(zhí)行這段存在數(shù)據(jù)競爭的程序,它打印55555。再試一次,可能會得到不同的結(jié)果。?sync.WaitGroup是go標(biāo)準庫的一部分;它等待一系列 goroutines 執(zhí)行結(jié)束。
func race() {var wg sync.WaitGroupwg.Add(5)for i := 0; i < 5; **i++** {go func() {**fmt.Print(i)** // 局部變量i被6個goroutine共享wg.Done()}()}wg.Wait() // 等待5個goroutine執(zhí)行結(jié)束fmt.Println() }raceClosure.go
對于輸出?55555?較為合理的解釋是執(zhí)行?i++?操作的 goroutine 在其他 goroutines 打印之前就已經(jīng)執(zhí)行了5次。事實上,更新后的?i?對于其他 goroutines 可見是隨機的。
一個非常簡單的解決辦法是通過使用本地變量作為參數(shù)的方式去啟動另外的goroutine。
func correct() {var wg sync.WaitGroupwg.Add(5)for i := 0; i < 5; i++ {go func(n int) { // 局部變量。fmt.Print(n)wg.Done()}(i)}wg.Wait()fmt.Println() }raceClosure.go
這段代碼是正確的,他打印了期望的結(jié)果,24031。回想一下,在不同 goroutines 中,程序的執(zhí)行順序是亂序的。
我們?nèi)匀豢梢允褂瞄]包去避免數(shù)據(jù)競爭。但是我們需要注意在每個 goroutine 中需要有不同的變量。
func alsoCorrect() {var wg sync.WaitGroupwg.Add(5)for i := 0; i < 5; i++ {n := i // 為每個閉包創(chuàng)建單獨的變量go func() {fmt.Print(n)wg.Done()}()}wg.Wait()fmt.Println() }raceClosure.go
7. 自動競爭檢測
總的來說.我們不可能自動的發(fā)現(xiàn)所有的數(shù)據(jù)競爭。但是 Go(從1.1版本開始) 提供了一個強大的數(shù)據(jù)競爭檢測器?data race detector。
這個工具使用下來非常簡單: 僅僅增加?-race?到?go?命令后。運行上述程序?qū)詣訖z查并且打印出下面的輸出信息。
$ go run -race raceClosure.go Data race: ================== WARNING: DATA RACE Read at 0x00c420074168 by goroutine 6:main.race.func1()../raceClosure.go:22 +0x3fPrevious write at 0x00c420074168 by main goroutine:main.race()../raceClosure.go:20 +0x1bdmain.main()../raceClosure.go:10 +0x2fGoroutine 6 (running) created at:main.race()../raceClosure.go:24 +0x193main.main()../raceClosure.go:10 +0x2f ================== 12355 Correct: 01234 Also correct: 01234 Found 1 data race(s) exit status 66這個工具發(fā)現(xiàn)在程序20行存在數(shù)據(jù)競爭,一個goroutine向某個變量寫值,而22行存在另外一個 goroutine 在不同步的讀取這個變量的值。
注意這個工具只能找到實際執(zhí)行時發(fā)生的數(shù)據(jù)競爭。
8. Select 語句
在 Go 并發(fā)編程中,最后講的一個是?select?語句。它會挑選出一系列通信操作中能夠執(zhí)行的操作。如果任意的通信操作都可執(zhí)行,則會隨機挑選一個并執(zhí)行相關(guān)的語句。否則,如果也沒有默認執(zhí)行語句的話,則會阻塞直到其中的任意一個通信操作能夠執(zhí)行。
這有一個例子,顯示了如何用 select 去隨機生成數(shù)字.
// RandomBits 返回產(chǎn)生隨機位數(shù)的channel func RandomBits() <-chan int {ch := make(chan int)go func() {for {select {case ch <- 0: // 沒有相關(guān)操作語句case ch <- 1:}}}()return ch }randBits.go
更簡單,這里 select 被用于設(shè)置超時。這段代碼只能打印 news 或者 time-out 消息,這取決于兩個接收語句中誰可以執(zhí)行.
select { case news := <-NewsAgency:fmt.Println(news) case <-time.After(time.Minute):fmt.Println("Time out: no news in one minute.") }time.After是 go 標(biāo)準庫的一部分;他等待特定時間過去,然后將當(dāng)前時間發(fā)送到返回的 channel.
9. 最基本的并發(fā)實例
couples多花點時間仔細理解這個例子。當(dāng)你完全理解它,你將會徹底的理解 Go 內(nèi)部的并發(fā)工作機制。
程序演示了單個 channel 同時發(fā)送和接受多個 goroutines 的數(shù)據(jù)。它也展示了 select 語句如何從多個通信操作中選擇執(zhí)行。
func main() {people := []string{"Anna", "Bob", "Cody", "Dave", "Eva"}match := make(chan string, 1) // 給未匹配的元素預(yù)留空間wg := new(sync.WaitGroup)for _, name := range people {wg.Add(1)go Seek(name, match, wg)}wg.Wait()select {case name := <-match:fmt.Printf("No one received %s’s message.\n", name)default:// 沒有待處理的發(fā)送操作.} }// 尋求發(fā)送或接收匹配上名稱名稱的通道,并在完成后通知等待組. func Seek(name string, match chan string, wg *sync.WaitGroup) {select {case peer := <-match:fmt.Printf("%s received a message from %s.\n", name, peer)case match <- name:// 等待其他人接受消息.}wg.Done() }matching.go
實例輸出:
$ go run matching.go Anna received a message from Eva. Cody received a message from Bob. No one received Dave’s message.10. 并行計算
CPUs具有并發(fā)特性應(yīng)用會將一個大的計算劃分為小的計算單元,每個計算單元都會單獨的工作。
多 CPU 上的分布式計算不僅僅是一門科學(xué),更是一門藝術(shù)。
-
每個計算單元執(zhí)行時間大約在100us至1ms之間.如果這些單元太小,那么分配問題和管理子模塊的開銷可能會增大。如果這些單元太大,整個的計算體系可能會被一個小的耗時操作阻塞。很多因素都會影響計算速度,比如調(diào)度,程序終端,內(nèi)存布局(注意工作單元的個數(shù)和 CPU 的個數(shù)無關(guān))。
-
盡量減少數(shù)據(jù)共享的量。并發(fā)寫入是非常消耗性能的,特別是多個 goroutines 在不同CPU上執(zhí)行時。共享數(shù)據(jù)讀操作對性能影響不是很大。
-
數(shù)據(jù)的合理組織是一種高效的方式。如果數(shù)據(jù)保存在緩存中,數(shù)據(jù)的加載和存儲的速度將會大大加快。再次強調(diào),這對寫操作來說是非常重要的。
下面的例子將會顯示如何將多個耗時計算分配到多個可用的 CPU 上。這就是我們想要優(yōu)化的代碼。
type Vector []float64// Convolve computes w = u * v, where w[k] = Σ u[i]*v[j], i + j = k. // Precondition: len(u) > 0, len(v) > 0. func Convolve(u, v Vector) Vector {n := len(u) + len(v) - 1w := make(Vector, n)for k := 0; k < n; k++ {w[k] = mul(u, v, k)}return w }// mul returns Σ u[i]*v[j], i + j = k. func mul(u, v Vector, k int) float64 {var res float64n := min(k+1, len(u))j := min(k, len(v)-1)for i := k - j; i < n; i, j = i+1, j-1 {res += u[i] * v[j]}return res }這個想法很簡單:識別適合大小的工作單元,然后在單獨的 goroutine 中運行每個工作單元. 這就是?Convolve?的并發(fā)版本.
func Convolve(u, v Vector) Vector {n := len(u) + len(v) - 1w := make(Vector, n)// 將w劃分為多個將會計算100us-1ms時間計算的工作單元size := max(1, 1000000/n)var wg sync.WaitGroupfor i, j := 0, size; i < n; i, j = j, j+size {if j > n {j = n}// goroutines只為讀共享內(nèi)存.wg.Add(1)go func(i, j int) {for k := i; k < j; k++ {w[k] = mul(u, v, k)}wg.Done()}(i, j)}wg.Wait()return w }convolution.go
當(dāng)定義好計算單元,通常最好將調(diào)度留給程序執(zhí)行和操作系統(tǒng)。然而,在 Go1.*版本中,你需要指定 goroutines 的個數(shù)。
func init() {numcpu := runtime.NumCPU()runtime.GOMAXPROCS(numcpu) // 盡量使用所有可用的 CPU }Stefan Nilsson
原文發(fā)布時間為:2017年10月22日
本文來自云棲社區(qū)合作伙伴掘金,了解相關(guān)信息可以關(guān)注掘金網(wǎng)站。
總結(jié)
以上是生活随笔為你收集整理的Go并发编程中的那些事[译]的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: biostar handbook: 第一
- 下一篇: 搭建一个简易的https