日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Go Concurrency Patterns: Pipelines and cancellation

發布時間:2024/7/23 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Go Concurrency Patterns: Pipelines and cancellation 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

原文地址: https://blog.golang.org/pipelines

簡介

Go 語言提供的并發原語使得可以很方便的構建數據流 pipeline,使用這樣的 pipeline 可以高效的利用 I/O 和多 cpu 的優勢. 這篇文章我們將展示如何構建并使用 pipeline.

什么是 pipeline ?

在 go 語言中沒有正式的定義什么是 pipeline. 它只是眾多并發程序類型中的一種. 非正式的說,pipeline 是一系列通過 channel 聯系起來的 stage. 每個 stage 包含多個執行相同功能的 goroutine. 在每個 stage 中, goroutine 執行以下操作:

  • 從輸入 channel 中讀取數據
  • 處理數據,產生新的數據
  • 將數據發送到輸出 channel

除了第一個和最后一個 stage,每個 stage 可以擁有任意數量的 輸入channel 和 輸出channel。 第一個和最后一個 stage 只能有一個輸入channel一個輸出channel. 第一個 stage 也被稱為 Source 或 Producer, 最后一個 stage 被稱為 Sink 或 Consumer

接下來,我們通過一個簡單的示例來說明.

平方數

假設我們的 pipeline 有三個 stage.

第一個 stage 是 gen, 用來將與一組數字轉化為一個 channel.

func gen(nums ...int) <-chan int {out := make(chan int)go func() {for _, n := range nums {out <- n}close(out)}()return out }

第二個 stage 是 sq, 從 輸入channel 中接收數字,計算數字的平方數,并將數字寫入輸出channel中.

func sq(in <-chan int) <-chan int {out := make(chan int)go func() {for n := range in {out <- n * n}close(out)}()return out }

main 函數中建立該 pipeline,并運行最后最后一個 stage. 最后一個 stage 從第二個 stage 中接收平方數,并將接收到的數據打印出來.

func main() {// Set up the pipeline.c := gen(2, 3)out := sq(c)// Consume the output.fmt.Println(<-out) // 4fmt.Println(<-out) // 9 }

因為 gen 的輸入channel 和輸出 channel具有相同的輸入和輸出類型,因此我們可以重復的使用他們任意次.

我們可以將 main 方法重寫為如下形式:

func main() {// Set up the pipeline and consume the output.for n := range sq(sq(gen(2, 3))) {fmt.Println(n) // 16 then 81} }

扇入,扇出

多個函數可以從一個channel中讀取數據,直到這個channel關閉,這叫做 扇出(fan-out). 通過這種方式,我們可以將一些列任務分派給多個 woker,這些 worker 可以在多個 CPU 上執行或者進行 I/O 操作.

一個函數可以從多個輸入 channel 中讀取并處理數據,直到所有的 channel 被關閉. 并將輸出寫入到同一個輸出channel 上,處理完數據后關閉輸出 channel. 這叫做 扇入(fan-in).

舉個例子,我們可以運行兩個 sq 方法,這兩個方法均從同一個輸入 channel 上讀取數據. 這里我們再引入另外一個方法 merge, 該方法用于將兩個 sq 的輸出整合到通過一個輸出channel中.

func main() {in := gen(2, 3)// Distribute the sq work across two goroutines that both read from in.c1 := sq(in)c2 := sq(in)// Consume the merged output from c1 and c2.for n := range merge(c1, c2) {fmt.Println(n) // 4 then 9, or 9 then 4} } func merge(cs ...<-chan int) <-chan int {var wg sync.WaitGroupout := make(chan int)// Start an output goroutine for each input channel in cs. output// copies values from c to out until c is closed, then calls wg.Done.output := func(c <-chan int) {for n := range c {out <- n}wg.Done()}wg.Add(len(cs))for _, c := range cs {go output(c)}// Start a goroutine to close out once all the output goroutines are// done. This must start after the wg.Add call.go func() {wg.Wait()close(out)}()return out }

盡快停止

截至目前,我們將所有的 pipeline 函數設計為如下模式:

  • 當前 stage 應該關閉 輸出channel,當我們處理完了所有的輸入數據,并且所有的輸出數據已經發送到了 輸出channel 之后.
  • 當前 stage 應該持續接收數據直到 輸入channel 被關閉.

這樣設計使得我們可以再接收stage 中使用 range 循環來處理所有的數據,當所有數據被處理并發送到輸出channel之后,我們的循環為自動退出.

但是在真實情況下,我們往往不會接收從輸入channel中接收所有的數據. 有時,我們僅僅需要讀取輸入數據的一個子集便可以繼續往下進行了. 更通常的情況下,stage 提前退出,因為上流 stage 發生了錯誤. 在這種情況下,我們不應該等待所有的數據到來,并且我們希望上流 stage 直接退出而不是繼續產生哪些我們已經不在需要的數據.

在我們的例子中,如果當前 stage 無法正確的處理所有的 輸入數據,那么上流嘗試繼續發送數據到 stage 會被永久的阻塞住.

// Consume the first value from the output.out := merge(c1, c2)fmt.Println(<-out) // 4 or 9return// Since we didn't receive the second value from out,// one of the output goroutines is hung attempting to send it.

這會導致資源泄露. goroutine 會消耗內存和運行時資源, goroutine 堆棧中的對該 channel 的引用會阻止垃圾回收器回收該 channel 所占的資源,直到它自己退出.

我們需要我們 pipeline 中的上流 stage 總是能自動退出即使下流 stage 無法接收該stage 所產生的所有數據. 一種方案是給輸出channel設置 buffer. buffer 中可以保存指定數量的數據,只要buffer沒有滿,往這樣的channel 中發送數據的操作總是能立馬返回.

c := make(chan int, 2) // buffer size 2 c <- 1 // succeeds immediately c <- 2 // succeeds immediately c <- 3 // blocks until another goroutine does <-c and receives 1

如果我們在創建一個輸出channel的時候,便直到需要發送多少數據,那么使用 buffer 會簡化我們的代碼.

func gen(nums ...int) <-chan int {// 這里,對于每個輸入數字,我們均會產生一個輸出,// 因此我們便可以將輸出 channel 的buffer 大小設置為輸入 nums 的大小// 這樣我們往 out channel 中發送數據的操作永遠不會阻塞當前方法out := make(chan int, len(nums))for _, n := range nums {out <- n}close(out)return out }

另外一種方案是,下流 stage 通知上流stage,它已經停止接收數據了.

取消接收

當我們在 main 方法中決定不再從 out channel 中接收數據,直接退出的時候,我們必須通知上流 stage,我們已經不再從該 channel 中接受數據了. 我們可以通過一個 done channel 來實現.

func main() {in := gen(2, 3)// Distribute the sq work across two goroutines that both read from in.c1 := sq(in)c2 := sq(in)// 因為當前 stage 有兩個上流 channel,因此我們將 done 的 buffer 大小初始化為 2done := make(chan struct{}, 2)out := merge(done, c1, c2)fmt.Println(<-out) // 4 or 9// Tell the remaining senders we're leaving.done <- struct{}{}done <- struct{}{} }

上流 stage 需要做如下修改:

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {var wg sync.WaitGroupout := make(chan int)// Start an output goroutine for each input channel in cs. output// copies values from c to out until c is closed or it receives a value// from done, then output calls wg.Done.output := func(c <-chan int) {for n := range c {// 這里使用 select 語句代替原先的單純發送數據的操作// 以便當下流 stage 停止接收,往 done channel 上發送停止接收的信號select {case out <- n:// 當我們在 main 方法中往 done channel 發送數據后,我們便會在這里接收到該數據// 我們便可以結束當前 stage 了case <-done: }}wg.Done()}// ... the rest is unchanged ... }

這種方法存在一個問題,那就是對于每個下流 stage,都得知道上流 stage 的數量,這樣我們才能確定 done channel 的大小. 這看起來并不是一個優雅的解決方案.

我們需要一種解決方案,這個解決方案不需要知道上流和下流的 stage 數量.

在 go 中,我們可以通過關閉 channel 來實現. 因為試圖從一個已經關閉的 channel 上接收數據總是會直接返回,返回值是一個對應數據類型的 zero 值.

這意味著,我們只需要在 main 函數中關閉 done channel,然后所有嘗試從 done 中接收信號的上流stage 都會收到一個零值,這樣他們便可以直接退出了.

修改 main 函數,使用這種方案. 我們需要給每個上流 stage 增加一個done channel 參數,這樣,當 在main 中,我們關閉 done 之后,所有上流 stage 都能收到信號,并退出. 上流stage 的實現類似與 merge 的實現,略.

func main() {// Set up a done channel that's shared by the whole pipeline,// and close that channel when this pipeline exits, as a signal// for all the goroutines we started to exit.done := make(chan struct{}) // 注意,這里 done 不要 bufferdefer close(done) // 使用 defer,在 main 函數退出時,該 channel 會被關閉in := gen(done, 2, 3)// Distribute the sq work across two goroutines that both read from in.c1 := sq(done, in)c2 := sq(done, in)// Consume the first value from output.out := merge(done, c1, c2)fmt.Println(<-out) // 4 or 9// done will be closed by the deferred call. }

計算文件 MD5 checksum

接下來,我們看一個更加真實的例子.

MD5 經常被用來計算文件的 checksum. md5sum 命令可以輸出一組文件的 checksum.

% md5sum *.go d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go ee869afd31f83cbb2d10ee81b2b831dc parallel.go b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go

在這個例子中,我們來實現 md5sum 命令. 不同的是我們的md5sum 命令接收一個目錄,輸出這個目錄下所有文件的 checksum,按照路徑排序.

func main() {// Calculate the MD5 sum of all files under the specified directory,// then print the results sorted by path name.m, err := MD5All(os.Args[1])if err != nil {fmt.Println(err)return}var paths []stringfor path := range m {paths = append(paths, path)}sort.Strings(paths)for _, path := range paths {fmt.Printf("%x %s\n", m[path], path)} }

MD5All 的實現如下

// MD5All reads all the files in the file tree rooted at root and returns a map // from file path to the MD5 sum of the file's contents. If the directory walk // fails or any read operation fails, MD5All returns an error. func MD5All(root string) (map[string][md5.Size]byte, error) {m := make(map[string][md5.Size]byte)err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {if err != nil {return err}if !info.Mode().IsRegular() {return nil}data, err := ioutil.ReadFile(path)if err != nil {return err}m[path] = md5.Sum(data)return nil})if err != nil {return nil, err}return m, nil }

并行化計算 MD5 checksum

在這節中,我們將 MD5All 拆分為兩個有兩個 stage 的 pipeline. 第一個stage sumFiles 遍歷文件目錄,計算文件 checksum,并將結果發送到輸出 channel 中, 計算結果的類型為 result.

type result struct {path stringsum [md5.Size]byteerr error } func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {// For each regular file, start a goroutine that sums the file and sends// the result on c. Send the result of the walk on errc.c := make(chan result)errc := make(chan error, 1)// 主線程開啟一個 goroutine, 在goroutine 中遍歷文件,并計算checksum,將結果輸出到 c channel,如果發生錯誤,將錯誤信息發送到 errc channelgo func() {var wg sync.WaitGrouperr := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {if err != nil {return err}if !info.Mode().IsRegular() {return nil}wg.Add(1)// 為每個文件使用一個單獨的 goroutine 來計算文件 checksumgo func() {data, err := ioutil.ReadFile(path)// 嘗試往 channel c 中發送計算結果,如果發送操作被阻塞且 done 已經被關閉// select 語句便會進入 done 對應的 case,程序得以繼續往下進行select {case c <- result{path, md5.Sum(data), err}:case <-done:}wg.Done()}()// Abort the walk if done is closed.select {case <-done:return errors.New("walk canceled")default:return nil}})// Walk has returned, so all calls to wg.Add are done. Start a// goroutine to close c once all the sends are done.// 等待所有計算文件 checksum 的 goroutine 退出go func() { wg.Wait()close(c) // 結束時,關閉 channel c}()// No select needed here, since errc is buffered.errc <- err}()return c, errc }

MD5All 用來接收 checksum 或者 sumfiles 中發生的錯誤.

func MD5All(root string) (map[string][md5.Size]byte, error) {// MD5All closes the done channel when it returns; it may do so before// receiving all the values from c and errc.done := make(chan struct{})defer close(done)c, errc := sumFiles(done, root)m := make(map[string][md5.Size]byte)// 從 c 上讀取數據,無論 sumFiles 是否正常結束,// range c 都確保我們不會阻塞在這個 for 循環處for r := range c {if r.err != nil {return nil, r.err}m[r.path] = r.sum}// 檢查是否發生錯誤if err := <-errc; err != nil {return nil, err}return m, nil }

限制并行數量

在上一節中,我們給每個文件創建一個 goroutine 用來計算文件的 MD5 checksum. 這里有一個問題,如果某個目錄下有很多文件,那么我們便需要創建大量個 goroutine,這可能會超出實際的物理內存大小.

我們可以通過限制并行處理的文件數量來解決這個問題. 這里,我們通過創建指定數量的 goroutine 來讀取文件. 此時,我們的 pipeline 就需要有三個stage 了: 遍歷文件目錄,讀取數據并計算 MD5 checksum, 收集計算結果.

第一個 stage walkFiles 讀取文件并將結果寫入輸出 channel 中

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {paths := make(chan string)errc := make(chan error, 1)go func() {// Close the paths channel after Walk returns.defer close(paths)// No select needed for this send, since errc is buffered.errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {if err != nil {return err}if !info.Mode().IsRegular() {return nil}select {case paths <- path:case <-done:return errors.New("walk canceled")}return nil})}()return paths, errc }

第二個 stage 啟用指定數量個 goroutine 執行 digester 方法. 這個 goroutine 從 paths channel 中讀取文件路徑并計算 MD5 checksum,將結果輸出到 channel c 上

// 注意,這里我們不關閉 channel c,因為我們有多個 goroutine 往 c 中發送數據 func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {for path := range paths {data, err := ioutil.ReadFile(path)select {case c <- result{path, md5.Sum(data), err}:case <-done:return}} } // Start a fixed number of goroutines to read and digest files.c := make(chan result)var wg sync.WaitGroupconst numDigesters = 20wg.Add(numDigesters)for i := 0; i < numDigesters; i++ {go func() {digester(done, paths, c)wg.Done()}()}go func() {wg.Wait()close(c)}()

最后一個 stage 從 channel c 上接收計算結果或者錯誤信息.

m := make(map[string][md5.Size]byte)for r := range c {if r.err != nil {return nil, r.err}m[r.path] = r.sum}// Check whether the Walk failed.if err := <-errc; err != nil {return nil, err}return m, nil

END!!!

總結

以上是生活随笔為你收集整理的Go Concurrency Patterns: Pipelines and cancellation的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 国产高潮国产高潮久久久 | 免费观看成年人视频 | 911精品国产一区二区在线 | 青娱乐毛片 | 亚欧美一区二区三区 | 秋霞成人午夜鲁丝一区二区三区 | 欧美顶级metart裸体全部自慰 | 日韩欧美综合一区 | 少妇高潮淫片免费观看 | 美女张开双腿让男人捅 | 国产一卡二卡在线 | 国产欧美精品 | а天堂中文在线官网 | 日本人妻熟妇久久久久久 | 日韩一卡二卡在线 | 日韩精品在线观看视频 | 刘亦菲国产毛片bd | av电影在线观看不卡 | 久久露脸国语精品国产 | 日韩高清一级 | 天天干天天干 | 2020亚洲天堂 | a级欧美| 69av在线播放 | 操人网 | 免费观看一区二区三区 | 欧美少妇18p| 91官网入口| 91视频在线观看视频 | 日韩精品一区二区三区久久 | 欧美无砖区 | 国产成人亚洲综合a∨婷婷 台湾a级片 | av在线不卡一区 | 97在线免费观看视频 | 日本午夜视频在线观看 | 福利在线免费观看 | 成人性免费视频 | 97视频在线观看免费 | 欧美资源在线观看 | 人人爽久久涩噜噜噜网站 | 中国av片 | 一区二区精品区 | 婷婷色激情 | 青青在线视频 | 青青青青草| 无码一区二区波多野结衣播放搜索 | av在线不卡一区 | 92精品| 亚欧成人 | 亚洲欧美日韩在线播放 | 九热在线视频 | 亚洲小视频网站 | 成年人在线免费观看网站 | 亚洲国产精品狼友在线观看 | 黑丝美女一区二区 | 国产精品视频一二三区 | 国产精品中文字幕在线观看 | 成人欧美一级特黄 | 激情自拍偷拍 | 久久久久久久久久久久久女过产乱 | 懂色av成人一区二区三区 | 亚洲一区中文 | 日本久久伊人 | 国产精品视频99 | 黄色网址进入 | 日韩激情视频一区二区 | 免费黄色网址在线观看 | 亚洲福利视频一区二区 | 久久五月天综合 | 国产露脸91国语对白 | 黄色天堂av | 国产精品免费一区二区区 | 亚洲网站免费看 | 亚洲激情一区二区三区 | 久久桃色 | 美日韩丰满少妇在线观看 | 97色伦影院| 亚洲一区二区麻豆 | 91国内精品久久久久 | 亲子乱对白乱都乱了 | 夜久久久| 精东av在线| 国产日韩av一区二区 | 亚洲欧美自拍一区 | 97伊人| 波多野结衣中文字幕一区二区 | 国语对白在线观看 | 国产视频精品视频 | 热久久亚洲 | 欧美日韩国产一区二区在线观看 | 人人搞人人干 | 播放黄色一级片 | 影音先锋男人资源网站 | 日韩不卡一二三区 | 国产美女一区 | 国产剧情一区 | 999免费视频 | 亚洲一级二级片 | 亚洲美女视频在线 |