goroutine并发扫描MySQL表_goroutine 并发之搜索文件内容
golang并發(fā)編程 - 例子解析
February 26, 2013
最近在看《Programming in Go》, 其中關(guān)于并發(fā)編程寫得很不錯, 受益非淺, 其中有一些例子是需要多思考才能想明白的, 所以我打算記錄下來, 強化一下思路
《Programming in Go》在?Chapter 7. Concurrent Programming?里面一共用3個例子來講述并發(fā)編程的3個模式, 第一個是?filter?, 篩選出后綴名和文件大小文件列表, 還算簡單就不說, 然后第二個是升級版, 正則版?filter?, 不同的是他是根據(jù)正則搜索出文件的文本并且列出來. 這個例子我起初看是有點蒙的, 這樣寫是沒錯, 但是為什么要這樣寫, 他的設(shè)計思路是什么, 和其他方法相比他有什么優(yōu)勢, 這些都不清楚, 于是決定好好分析一下. 實際上這個例子實現(xiàn)的功能并不復雜, 所以我的文章實際上是在討論怎么產(chǎn)生出和作者相似的思路.
如果不考慮用 goroutine 的話, 思路其實很簡單:
1. 列出文件列表, 編譯正則.
2. 遍歷文件, 打開并遍歷每行, 如果正則能匹配, 記錄下來.
3. 列出來.
如果用 goroutine , 就會有以下思路:
1. 在得到文件路徑數(shù)組之后, 分發(fā)任務(wù)給N個核.
2. 每個核負責打開文件, 將符合條件的那行文本寫入到 `channel`
3. 主線程等待并接收`channel`的結(jié)果. 顯示出來, 完畢
** 然后下文才是重點 **
1. channel關(guān)閉的時機
在go中, channel 是不會自動關(guān)閉的, 所以需要在我們使用完之后手動去關(guān)閉, 而且如果使用for語法來遍歷channel每次得到的數(shù)據(jù), 如果channel沒有關(guān)閉的話會陷入死循環(huán). 在 goroutine 中會造成 deadlock
for job := range jobs {
fmt.Println(job)
}
如果沒close, 會觸發(fā)dead lock. 因為for...range...會自動阻塞直到讀取到數(shù)據(jù)或者channel關(guān)閉, 沒close的話就會導致整個channel處于睡眠狀態(tài). channel關(guān)閉后, 就不允許寫入(緩沖的數(shù)據(jù)還在, 還可以讀取), 所以, channel 關(guān)閉的時機很重要.
2. 分發(fā)任務(wù)
我所知道任務(wù)分發(fā)方法有兩種:
第一種是固定分配, 如果說我想計算1+2+3+...+100, 然后分成4份, 也就是?1+2+..+25,?...,?...,?86+87+...+100, 然后再將結(jié)果累加起來.
還有一種是搶占式的, 這里需要使用一個隊列, 將所有任務(wù)寫入隊列, 然后開N個goroutine, 每個goroutine從隊列讀取任務(wù)(要確保線程安全), 處理, 完成后再繼續(xù)讀取任務(wù). 不再是固定分配, 自己那份做完了就休息了, 所以看來第二種要好一點.
采用第二種方式的話, 對應(yīng)go的做法, 那就是使用一個channel, 命名為?jobs, 將所有的任務(wù)寫入進去, 寫入完畢之后關(guān)閉這個 channel, 當然, 因為是N核, 系統(tǒng)能同時處理的任務(wù)我們設(shè)置為N個(也就是我們使用了N個goroutine), 那么聲明?jobs?是緩沖區(qū)長度為N的 channel.
Buffered channel?和普通的 channel 的差別是他可以同時容納多個單位數(shù)據(jù), 當緩存的數(shù)據(jù)單位數(shù)量等于 channel 容量的時候, 再執(zhí)行寫入將會阻塞, 否則都是及時處理的.
3. 結(jié)果集
當我們將數(shù)據(jù)處理后, 就需要將結(jié)果收集起來. 需要注意的是, 這些操作不是在主 goroutine 執(zhí)行, 所以我們需要通過 channel 傳遞給主 goroutine . 所以只需要在外部聲明一個名為?results?的 channel . 然后在主 goroutine 通過?for?來顯示, 這時候就會發(fā)現(xiàn)一個問題, 這個?results?關(guān)閉的時機問題. 正確的關(guān)閉時機是寫入所有的?Result?之后. 但是別忘了我們同時開了多個 goroutine , 所以?results?應(yīng)該在?執(zhí)行任務(wù)的 goroutine 完成信號累計到N個?這個時機關(guān)閉. 所以我們再引入一個名叫?done?的 channel 來解決. 每個 goroutine 發(fā)送完 result 后會寫入一次done, 然后我們就可以遍歷 done , 遍歷之后說明全部完成了, 再執(zhí)行顯示.
Result 的數(shù)據(jù)結(jié)構(gòu)
type Result struct {
filenamestringlino int
linestring}
書中的?cgrep1?就是這樣的
func awaitCompletion(done
close(results)
}
但是這樣有可能造成死鎖, 因為書中?results?緩沖區(qū)長度限定為最大1000個, 也就是超過1000個 result 的時候再打算寫入 result 會等待取出 result 后才執(zhí)行, done 也不會寫入, 而?awaitCompletion?是等到所有 goroutine 都完成了才會取出?results, 而且當?result?非常大的時候因為內(nèi)存的緣故也是不可能一次性取出的. 所以就需要在讀取?results?的同時讀取?done, 當讀取?done?次數(shù)大于 N 后關(guān)閉?results, 所以, 因為要在多個 channel 中同時讀取, 所以需要使用?select.
下面是書中的?cgrep3?, 改進版:
func waitAndProcessResults(timeout int64, done
finish:= time.After(time.Duration(timeout))for working := workers; working > 0; {
select {//Blocking
case result :=
case
}for{
select {//Nonblocking
case result :=
default:
return}
}
}
看到這里, 我就有個疑問, 為什么在全部完成之后(done都接收到N個了), 還要再遍歷出?results, 直到讀取不到才算讀取完成呢(我反應(yīng)一向比較慢^_^)? 于是我做了個實驗, 去掉了后面再次循環(huán)的部分, 發(fā)現(xiàn)有時會遺漏掉數(shù)據(jù)(我用4個測試文件...), 證明這段代碼是有用的!!!
我的想法是, 他是在處理完 result, 然后寫入?results, 寫完了才發(fā)送?done, 也就是在收到所有的 done 之后, 所有的數(shù)據(jù)應(yīng)該是已經(jīng)處理完成的. 為了驗證這個想法, 我寫了一下代碼:
for working := workers; working > 0; {
select {//Blocking
case result :=
//received result
case
if working <= 0{
println(len(results))
}
}
}
然后看到輸出的數(shù)是大于0的, 也就是說在接收到全部 done 之后,?results?還有數(shù)據(jù)在緩沖區(qū)中, 然后在看看發(fā)送result?的代碼, 突然就明白了
func doJobs(done chan
job.Do(lineRx)
}done
}
我把寫入和讀取想當然認為一起發(fā)生了, 因為有緩沖區(qū)的緣故, doJobs在發(fā)送進?results?的緩沖區(qū)之后就立刻發(fā)送?done?了, 但是寫入的數(shù)據(jù)有沒有被處理, 是不知道的, 所以在接收到所有?done?之后,?results?緩沖區(qū)還有數(shù)據(jù), 需要再循環(huán)一遍.
附我的代碼一份:
package main
import ("bufio"
"fmt"
"log"
"os"
"regexp"
"runtime")
type Job struct {
filenamestringresults chan
}
type Result struct {
filenamestringlinestringlino int
}var worker = runtime.NumCPU()
func main() {//config cpu number
runtime.GOMAXPROCS(worker)
files:= os.Args[2:]
regex, err := regexp.Compile(os.Args[1])if err !=nil {log.Fatal(err)return}//任務(wù)列表, 并發(fā)數(shù)目為CPU個數(shù)
jobs := make(chan Job,worker)//結(jié)果
results := make(chan Result, minimum(1000,len(files)))
defer close(results)//標記完成
dones := make(chan int,worker)
defer close(dones)
go addJob(files, jobs,results)for i := 0; i < worker; i++{
go doJob(jobs, regex,dones)
}
awaitForCloseResult(dones,results)
}
func addJob(files []string, jobs chan
jobs
}
close(jobs)
}
func doJob(jobs
job.Do(regex)
}
dones
func awaitForCloseResult(dones
working:= 0MyForLable:
for{
select {case result :=
if working >=worker {if rlen := len(results); rlen > 0{
println("----------------------------------")
println("left:",rlen)
println("----------------------------------")for i := 1; i <= rlen; i++{
println(
}
}breakMyForLable
}
}
}
}
func (j*Job) Do(re *regexp.Regexp) {
f, err := os.Open(j.filename)if err !=nil {
println(err)return}
defer f.Close()
b:= bufio.NewReader(f)
lino:= 0
for{
line, _, err := b.ReadLine()if re.Match(line) {
j.results
}if err !=nil {break}
lino+= 1}
}
func minimum(a,b int) int {if a >b {returnb
}returna
}
func println(o...interface{}) {
fmt.Println(o...)
}
轉(zhuǎn)自:http://chenye.org/goroutine-note.html
總結(jié)
以上是生活随笔為你收集整理的goroutine并发扫描MySQL表_goroutine 并发之搜索文件内容的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 当贝k1为什么比x3贵
- 下一篇: js删除mysql记录_(DELETEU