日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 运维知识 > 数据库 >内容正文

数据库

goroutine并发扫描MySQL表_goroutine 并发之搜索文件内容

發(fā)布時(shí)間:2023/12/1 数据库 44 豆豆
生活随笔 收集整理的這篇文章主要介紹了 goroutine并发扫描MySQL表_goroutine 并发之搜索文件内容 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

golang并發(fā)編程 - 例子解析

February 26, 2013

最近在看《Programming in Go》, 其中關(guān)于并發(fā)編程寫得很不錯(cuò), 受益非淺, 其中有一些例子是需要多思考才能想明白的, 所以我打算記錄下來(lái), 強(qiáng)化一下思路

《Programming in Go》在?Chapter 7. Concurrent Programming?里面一共用3個(gè)例子來(lái)講述并發(fā)編程的3個(gè)模式, 第一個(gè)是?filter?, 篩選出后綴名和文件大小文件列表, 還算簡(jiǎn)單就不說(shuō), 然后第二個(gè)是升級(jí)版, 正則版?filter?, 不同的是他是根據(jù)正則搜索出文件的文本并且列出來(lái). 這個(gè)例子我起初看是有點(diǎn)蒙的, 這樣寫是沒(méi)錯(cuò), 但是為什么要這樣寫, 他的設(shè)計(jì)思路是什么, 和其他方法相比他有什么優(yōu)勢(shì), 這些都不清楚, 于是決定好好分析一下. 實(shí)際上這個(gè)例子實(shí)現(xiàn)的功能并不復(fù)雜, 所以我的文章實(shí)際上是在討論怎么產(chǎn)生出和作者相似的思路.

如果不考慮用 goroutine 的話, 思路其實(shí)很簡(jiǎn)單:

1. 列出文件列表, 編譯正則.

2. 遍歷文件, 打開并遍歷每行, 如果正則能匹配, 記錄下來(lái).

3. 列出來(lái).

如果用 goroutine , 就會(huì)有以下思路:

1. 在得到文件路徑數(shù)組之后, 分發(fā)任務(wù)給N個(gè)核.

2. 每個(gè)核負(fù)責(zé)打開文件, 將符合條件的那行文本寫入到 `channel`

3. 主線程等待并接收`channel`的結(jié)果. 顯示出來(lái), 完畢

** 然后下文才是重點(diǎn) **

1. channel關(guān)閉的時(shí)機(jī)

在go中, channel 是不會(huì)自動(dòng)關(guān)閉的, 所以需要在我們使用完之后手動(dòng)去關(guān)閉, 而且如果使用for語(yǔ)法來(lái)遍歷channel每次得到的數(shù)據(jù), 如果channel沒(méi)有關(guān)閉的話會(huì)陷入死循環(huán). 在 goroutine 中會(huì)造成 deadlock

for job := range jobs {

fmt.Println(job)

}

如果沒(méi)close, 會(huì)觸發(fā)dead lock. 因?yàn)閒or...range...會(huì)自動(dòng)阻塞直到讀取到數(shù)據(jù)或者channel關(guān)閉, 沒(méi)close的話就會(huì)導(dǎo)致整個(gè)channel處于睡眠狀態(tài). channel關(guān)閉后, 就不允許寫入(緩沖的數(shù)據(jù)還在, 還可以讀取), 所以, channel 關(guān)閉的時(shí)機(jī)很重要.

2. 分發(fā)任務(wù)

我所知道任務(wù)分發(fā)方法有兩種:

第一種是固定分配, 如果說(shuō)我想計(jì)算1+2+3+...+100, 然后分成4份, 也就是?1+2+..+25,?...,?...,?86+87+...+100, 然后再將結(jié)果累加起來(lái).

還有一種是搶占式的, 這里需要使用一個(gè)隊(duì)列, 將所有任務(wù)寫入隊(duì)列, 然后開N個(gè)goroutine, 每個(gè)goroutine從隊(duì)列讀取任務(wù)(要確保線程安全), 處理, 完成后再繼續(xù)讀取任務(wù). 不再是固定分配, 自己那份做完了就休息了, 所以看來(lái)第二種要好一點(diǎn).

采用第二種方式的話, 對(duì)應(yīng)go的做法, 那就是使用一個(gè)channel, 命名為?jobs, 將所有的任務(wù)寫入進(jìn)去, 寫入完畢之后關(guān)閉這個(gè) channel, 當(dāng)然, 因?yàn)槭荖核, 系統(tǒng)能同時(shí)處理的任務(wù)我們?cè)O(shè)置為N個(gè)(也就是我們使用了N個(gè)goroutine), 那么聲明?jobs?是緩沖區(qū)長(zhǎng)度為N的 channel.

Buffered channel?和普通的 channel 的差別是他可以同時(shí)容納多個(gè)單位數(shù)據(jù), 當(dāng)緩存的數(shù)據(jù)單位數(shù)量等于 channel 容量的時(shí)候, 再執(zhí)行寫入將會(huì)阻塞, 否則都是及時(shí)處理的.

3. 結(jié)果集

當(dāng)我們將數(shù)據(jù)處理后, 就需要將結(jié)果收集起來(lái). 需要注意的是, 這些操作不是在主 goroutine 執(zhí)行, 所以我們需要通過(guò) channel 傳遞給主 goroutine . 所以只需要在外部聲明一個(gè)名為?results?的 channel . 然后在主 goroutine 通過(guò)?for?來(lái)顯示, 這時(shí)候就會(huì)發(fā)現(xiàn)一個(gè)問(wèn)題, 這個(gè)?results?關(guān)閉的時(shí)機(jī)問(wèn)題. 正確的關(guān)閉時(shí)機(jī)是寫入所有的?Result?之后. 但是別忘了我們同時(shí)開了多個(gè) goroutine , 所以?results?應(yīng)該在?執(zhí)行任務(wù)的 goroutine 完成信號(hào)累計(jì)到N個(gè)?這個(gè)時(shí)機(jī)關(guān)閉. 所以我們?cè)僖胍粋€(gè)名叫?done?的 channel 來(lái)解決. 每個(gè) goroutine 發(fā)送完 result 后會(huì)寫入一次done, 然后我們就可以遍歷 done , 遍歷之后說(shuō)明全部完成了, 再執(zhí)行顯示.

Result 的數(shù)據(jù)結(jié)構(gòu)

type Result struct {

filenamestringlino int

linestring}

書中的?cgrep1?就是這樣的

func awaitCompletion(done

close(results)

}

但是這樣有可能造成死鎖, 因?yàn)闀?results?緩沖區(qū)長(zhǎng)度限定為最大1000個(gè), 也就是超過(guò)1000個(gè) result 的時(shí)候再打算寫入 result 會(huì)等待取出 result 后才執(zhí)行, done 也不會(huì)寫入, 而?awaitCompletion?是等到所有 goroutine 都完成了才會(huì)取出?results, 而且當(dāng)?result?非常大的時(shí)候因?yàn)閮?nèi)存的緣故也是不可能一次性取出的. 所以就需要在讀取?results?的同時(shí)讀取?done, 當(dāng)讀取?done?次數(shù)大于 N 后關(guān)閉?results, 所以, 因?yàn)橐诙鄠€(gè) channel 中同時(shí)讀取, 所以需要使用?select.

下面是書中的?cgrep3?, 改進(jìn)版:

func waitAndProcessResults(timeout int64, done

finish:= time.After(time.Duration(timeout))for working := workers; working > 0; {

select {//Blocking

case result :=

case

}for{

select {//Nonblocking

case result :=

default:

return}

}

}

看到這里, 我就有個(gè)疑問(wèn), 為什么在全部完成之后(done都接收到N個(gè)了), 還要再遍歷出?results, 直到讀取不到才算讀取完成呢(我反應(yīng)一向比較慢^_^)? 于是我做了個(gè)實(shí)驗(yàn), 去掉了后面再次循環(huán)的部分, 發(fā)現(xiàn)有時(shí)會(huì)遺漏掉數(shù)據(jù)(我用4個(gè)測(cè)試文件...), 證明這段代碼是有用的!!!

我的想法是, 他是在處理完 result, 然后寫入?results, 寫完了才發(fā)送?done, 也就是在收到所有的 done 之后, 所有的數(shù)據(jù)應(yīng)該是已經(jīng)處理完成的. 為了驗(yàn)證這個(gè)想法, 我寫了一下代碼:

for working := workers; working > 0; {

select {//Blocking

case result :=

//received result

case

if working <= 0{

println(len(results))

}

}

}

然后看到輸出的數(shù)是大于0的, 也就是說(shuō)在接收到全部 done 之后,?results?還有數(shù)據(jù)在緩沖區(qū)中, 然后在看看發(fā)送result?的代碼, 突然就明白了

func doJobs(done chan

job.Do(lineRx)

}done

}

我把寫入和讀取想當(dāng)然認(rèn)為一起發(fā)生了, 因?yàn)橛芯彌_區(qū)的緣故, doJobs在發(fā)送進(jìn)?results?的緩沖區(qū)之后就立刻發(fā)送?done?了, 但是寫入的數(shù)據(jù)有沒(méi)有被處理, 是不知道的, 所以在接收到所有?done?之后,?results?緩沖區(qū)還有數(shù)據(jù), 需要再循環(huán)一遍.

附我的代碼一份:

package main

import ("bufio"

"fmt"

"log"

"os"

"regexp"

"runtime")

type Job struct {

filenamestringresults chan

}

type Result struct {

filenamestringlinestringlino int

}var worker = runtime.NumCPU()

func main() {//config cpu number

runtime.GOMAXPROCS(worker)

files:= os.Args[2:]

regex, err := regexp.Compile(os.Args[1])if err !=nil {log.Fatal(err)return}//任務(wù)列表, 并發(fā)數(shù)目為CPU個(gè)數(shù)

jobs := make(chan Job,worker)//結(jié)果

results := make(chan Result, minimum(1000,len(files)))

defer close(results)//標(biāo)記完成

dones := make(chan int,worker)

defer close(dones)

go addJob(files, jobs,results)for i := 0; i < worker; i++{

go doJob(jobs, regex,dones)

}

awaitForCloseResult(dones,results)

}

func addJob(files []string, jobs chan

jobs

}

close(jobs)

}

func doJob(jobs

job.Do(regex)

}

dones

func awaitForCloseResult(dones

working:= 0MyForLable:

for{

select {case result :=

if working >=worker {if rlen := len(results); rlen > 0{

println("----------------------------------")

println("left:",rlen)

println("----------------------------------")for i := 1; i <= rlen; i++{

println(

}

}breakMyForLable

}

}

}

}

func (j*Job) Do(re *regexp.Regexp) {

f, err := os.Open(j.filename)if err !=nil {

println(err)return}

defer f.Close()

b:= bufio.NewReader(f)

lino:= 0

for{

line, _, err := b.ReadLine()if re.Match(line) {

j.results

}if err !=nil {break}

lino+= 1}

}

func minimum(a,b int) int {if a >b {returnb

}returna

}

func println(o...interface{}) {

fmt.Println(o...)

}

轉(zhuǎn)自:http://chenye.org/goroutine-note.html

總結(jié)

以上是生活随笔為你收集整理的goroutine并发扫描MySQL表_goroutine 并发之搜索文件内容的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。