日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

由浅入深剖析go channel

發布時間:2024/1/23 编程问答 46 豆豆
生活随笔 收集整理的這篇文章主要介紹了 由浅入深剖析go channel 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

channel 是 goroutine 之間通信的一種方式,可以類比成 Unix 中的進程的通信方式管道。

CSP 模型

在講 channel 之前,有必要先提一下 CSP 模型,傳統的并發模型主要分為 Actor 模型和 CSP 模型,CSP 模型全稱為 communicating sequential processes,CSP 模型由并發執行實體(進程,線程或協程),和消息通道組成,實體之間通過消息通道發送消息進行通信。和 Actor 模型不同,CSP 模型關注的是消息發送的載體,即通道,而不是發送消息的執行實體。關于 CSP 模型的更進一步的介紹,有興趣的同學可以閱讀論文 Communicating Sequential Processes,Go 語言的并發模型參考了 CSP 理論,其中執行實體對應的是 goroutine, 消息通道對應的就是 channel。

channel 介紹

channel提供了一種通信機制,通過它,一個goroutine可以向另一個goroutine發送消息。channel本身還需關聯了一個類型,也就是channel可以發送數據的類型。例如:發送int類型信息的channel寫作chan int。

channel 創建

channel使用內置的make函數創建,下面聲明了一個chan int類型的channel:

ch := make(chan int)

c和map類似,make創建了一個底層數據結構體的引用,當賦值或參數傳遞時,只是拷貝了一個channel引用,指向相同的channel對象。和其他引用類型一樣,channel的空值為nil。使用==可以對類型相同的channel進行比較,只是指向相同對象或同為nil時,才返回true。

channel的讀寫操作

ch := make(chan int)// write to channel ch <- x// read from channel x <- ch// another way to read x = <- ch

channel一定要初始化后才能進行讀寫操作,否則會永久阻塞。

關閉channel

golang提供了內置的close函數對channel進行關閉操作。

ch := make(chan int) close(ch)

有關channel的關閉,你需要注意以下事項:

  • 關閉一個為初始化(nil)的channel會產生panic
  • 重復關閉同一個channel會產生panic
  • 向一個已關閉的channel中發送消息會產生panic
  • 從已關閉的channel讀取消息不會產生panic,且能讀出channel中還未被讀取的消息,若消息均已讀出,則會讀到類型的零值。從一個已關閉的channel中讀取消息永遠不會阻塞,并且會犯一個false的ok-idiom,可以用它來判斷channel是否關閉
  • 關閉channel會產生一個廣播機制,所有向channel讀取消息的goroutine都會收到消息
ch := make(chan int, 10) ch <- 11 ch <- 12close(ch)for x := range ch {fmt.Println(x) }x, ok := <- ch fmt.Println(x, ok)------- output:11 12 0 false

channel的類型

channel分為不帶緩存的channel和帶緩存的channel。

無緩存的channel

從無緩存的channle中讀取消息會阻塞,直到有goroutine向該channel中發送消息;同理,向無緩存的channel中發送消息也會阻塞,直到有goroutine從channel中讀取消息。

通過無緩存的channel進行通信時,接收者收到數據happens before發送者goroutine喚醒

有緩存的channel

有緩存的channel的聲明方式為指定make函數的第二個參數,該參數為channel緩存的容量

ch := make(chan int, 10)

有緩存的channel類似一個阻塞隊列(采用環形數組實現)。當緩存未滿時,向channel中發送消息時不會阻塞,當緩存滿時,發送操作將被阻塞,直到有其他goroutine從中讀取消息;相應的,當channel中消息不為空時,讀取消息不會出現阻塞,當channel為空時,讀取操作會造成阻塞,直到有goroutine向channel中寫入消息。

ch := make(chan int, 3)// blocked, read from empty buffered channel <- ch ch := make(chan int, 3) ch <- 1 ch <- 2 ch <- 3// blocked, send to full buffered channel ch <- 4

通過len函數可以獲得chan中的元素個數,通過cap函數可以得到channel的緩存長度。

channel的用法

goroutine通信

看一個effective go中的例子:

c := make(chan int) // Allocate a channel// Start the sort in a goroutine; when it completes, signal on the channel. go func() {list.Sort()c <- 1 // Send a signal; value does not matter. }()doSomethingForAWhile() <-c

主goroutine會阻塞,直到執行sort的goroutine完成。

range遍歷

channel也可以使用range取值,并且會一直從channel中讀取數據,直到有goroutine對改channel執行close操作,循環才會結束。

// consumer worker ch := make(chan int, 10) for x := range ch {fmt.Println(x) }

等價于

for {x, ok := <- chif !ok {break}fmt.Println(x) }

配合select使用

select用法類似于IO多路復用,可以同時監聽多個channel的消息狀態,看下面的例子

select {case <- ch1;...case <- ch2;...case ch3 <- 10;...default:... }
  • select可以同時監聽多個channel的寫入或讀取
  • 執行select時,若只有一個case通過(不阻塞),則執行這個case塊
  • 若有多個case通過,則隨機挑選一個case執行
  • 若所有case均阻塞,且定義了default模塊,則執行default模塊。若未定義default模塊,則select語句阻塞,直到有case被喚醒。
  • 使用break會跳出select塊。

1.設置超時時間

ch := make(chan struct{})// finish task while send msg to ch go doTask(ch)timeout := time.After(5 * time.Second) select {case <- ch:fmt.Println("task finished.")case <- timeout:fmt.Println("task timeout.") }

2.quite channel

有一些場景中,一些worker goroutine需要一直循環處理信息,直到收到quit信號

msgCh := make(chan struct{}) quitCh := make(chan struct{}) for {select {case <- msgCh:doWork()case <- quitCh:finish()return }

單向channel

即只可寫入或只可讀的channel,事實上channel只讀或只寫都沒有意義,所謂的單向channel其實只是聲明時用,比如

func foo(ch chan<- int) <-chan int {...}

chan<-int表示一個只可寫入的channel,<-chan int表示一個只可讀取的cahnel。上面這個函數約定了foo內只能從向ch中寫入數據,返回只一個只能讀取的channel,雖然使用普通的channel也沒有問題,但這樣在方法聲明時約定可以防止channel被濫用,這種預防機制發生再編譯期間。

channel源碼分析

channel的主要實現在src/runtime/chan.go中,一下源碼均基于go 1.9.2。源碼閱讀時為了更好理解channel特性,幫助正確合理的使用 channel,閱讀代碼的過程可以回憶前面章節的 channel 特性。

channel類結構

channel相關類定義如下:

// channel類型定義 type hchan struct {// channel中的元素數量,lenqcount uint // total data in the queue// channel的大小, capdataqsiz uint //size of the circular queue// channel的緩沖區,環形數組實現buf unsafe.Pointer // points to an array of dataqsiz elements// 單個元素的大小elemsize uint16// closed 標志位closed uint32// 元素的類型elemtype *_type // element type// send 和receive的索引,用于實現環形數組隊列sendx uint //send indexrecvx uint //receive index// recv goroutine 等待隊列recvq waitq // list of recv waiters// send goroutine 等待隊列sendq waitq // list of send waiters// lock protects all fields in hchan, as well as several// fields in sudogs blocked on this channel.//// Do not change another G's status while holding this lock// (in particular, do not ready a G), as this can deadlock// with stack shrinking.lock mutex }// 等待隊列的鏈表實現 type waitq struct {first *sudoglast *sudog }// in src/runtime/runtime2.go // 對G的封裝 type sudog struct {// The following fields are protected by the hchan.lock of the// channel this sudog is blocking on. shrinkstack depends on // this for sudogs involved in channel ops.g *gselectdone *uint32 // CAS to 1 to win select race (may point to stack)next *sudogprev *sudogelem unsafe.Pointer // data element(may point to stack)// The following fields are never accessed concurrently.// For channels, waitlink is only accessed by g.// For semaphores, all fields (including the ones above)// are only accessed when holding a semaRoot lock.acquiretime int64releasetime int64ticket uint32parent *sudog // semaRoot binary treewaitlink *sudog // g.waiting list or semaRootwaittail *sudog //semaRootc *hchan // channel }

可以看到,channel的主要組成有:一個環形數組實現的隊列,用于存儲消息元素;兩個鏈表實現的goroutine等待隊列,用于存儲阻塞在recv和send操作上的goroutine;一個互斥鎖,用于各個屬性變動的同步。

channel make實現

func makchan(t *chantype, size int64) *hchan {elem := t.elem// compiler checks this but be safe.if elem.size >= 1<<16 {throw("makechan: invalid channel element type")}if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")}if size < 0 || int64(uintpr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) {panic(plainError("makechan: size out of range"))}var c *hchanif elem.kind&kindNoPointers != 0 || size == 0 {// case 1: channel 不含有指針// case 2: size == 0, 即無緩沖 channel// Allocate memory in one call.// Hchan does not contain pointers interesting for GC in this case:// buf pointers into the same allocation, elemtype is persistent.// SudoG's are referenced from their owning thread so they can't be collected.// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.// 在堆上分配連續的空間用作channelc = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))if size > 0 && elem.size != 0 {c.buf = add(unsafe.Pointer(c), hchanSize)} else {// race detector uses this location for synchronization// Also prevents us from pointing beyond the allocation(see issue 9401).c.buf = unsafe.Pointer(c)}} else {// 有緩沖channel初始化c = new(hchan)// 堆上分配buf內存c.buf = newarray(elem, int(size))}c.elemsize = uint16(elem.size)c.elemtype = elemc.dataqsiz = uint(size)if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsize=", size, "\n")} return c }

make的過程還比較簡單,需要注意一點的是當元素不含指針的時候,會將整個hchan分配成一個連續的空間。

channel send

// entry point for c <- x from compiled code // go:nosplit func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, getcallerpc(unsafe.Pointer(&c))) }/** generic single channel send/recv* If block is not nil,* then the protocol will not* sleep but return if it could* not complete.** sleep can wake up with g.param == nil* when a channel involved in the sleep has* been closed. it is easiest to loop and re-run* the operation; we'll see that it's now closed.*/func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// 前面章節說到的,當channel未初始化或為nil時,向其中發送數據將會永久阻塞if c == nil {if !block {return false}// gopark 會使當前goroutine休眠,并通過unlockf喚醒,但是此時傳入的unlockf為nil,因此,goroutine會一直休眠gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)throw("unreachable")}if debugChan {print("chansend: chan=", c, "\n")}if raceenabled {racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not closed, we observe that the channel is// not ready for sending. Each of these observations is a single word-sized read// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).// Because a closed channel cannot transition from 'ready for sending' to// 'not ready for sending', even if the channel is closed between the two observations,// they imply a moment between the two when the channel was both not yet closed// and not ready for sending. We behave as if we observed the channel at that moment,// and report that the send cannot proceed.//// It is okay if the reads are reordered here: if we observe that the channel is not// ready for sending and then observe that it is not closed, that implies that the// channel wasn't closed during the first observation.if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {return false}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}//獲取同步鎖lock(&c.lock)//之前章節提過,向已經關閉的channel發送消息會產生panicif c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}// CASE1: 當有goroutine在recv隊列上等待時,跳過緩存隊列,將消息直接發給receiver goroutineif sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}// CASE2: 緩存隊列未滿,則將消息復制到緩存隊列上if c.qcount < c.dataqsiz {//Space is available in the channel buffer. Enqueue the element to send.qp := chanbuf(c, c.sendx)if raceenabled {raceacquire(qp)racerelease(qp)}typedmemmove(c.elemtype, qp, ep)c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcoun++unlock(&c.lock)return true}if !block {unlock(&c.lock)return false}// CASE3: 緩存隊列已滿,將goroutine加入send隊列// 初始化 sudog// Block on the channel.Some receiver will complete our operation for us.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.selectdone = nilmysg.c = cgp.waiting = mysggp.param = nil// 加入隊列c.sendq.enqueue(mysg)// 休眠goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)// 喚醒 goroutine// someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilif gp.param == nil {if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}gp.param = nilif mysg.releasetime >0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nilreleaseSudog(mysg)return true }

從send代碼中可以看到,之前章節提到的一些特性都在代碼中有所體現

send有以下幾種情況:

  • 有goroutine阻塞在channel recv隊列上,此時緩存隊列為空,直接將消息發給receiver goroutine,只產生一次復制
  • 當channel緩存隊列有剩余空間時,將數據放到隊列里,等待接收,接收后總共產生兩次復制
  • 當channel緩存隊列已滿時,將當前goroutine加入send隊列并阻塞。

channel receive

//entry points for <- c from compiled code //go:nosplit func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true) }//go:nosplit func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {_, received = chanrecv(c, elem, true)return }// chanrecv receives on channel c and writes the received data to ep. // ep may be nil, in which case received data is ignored. // If block == false and no elements are available, returns (false, false). // Otherwise, if c is closed, zeros *ep and returns (true, false). // Otherwise, fills in *ep with an element and returns (true, true). // A non-nil ep must point to the heap or the caller's stack. func chanrecv(c *hchan, ep unsafe.Pointer, block bool)(selected, received bool) {// raceenabled: don't need to check ep, as it is always on the stack// or is new memory allocated by reflect.if debugChan {print("chanrecv: chan=", c, "\n")}// 從nil的channel中接收消息,永久阻塞if c == nil {if !block {return}gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)throw("unreachable")}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not ready for receiving, we observe that the// channel is not closed. Each of these observations is a single word-sized read// (first c.sendq.first or c.qcount, and second c.closed).// Because a channel cannot be reopened, the later observation of the channel// being not closed implies that it was also not closed at the moment of the// first observation. We behave as if we observed the channel at that moment// and report that the receive cannot proceed.//// The order of operations is important here: reversing the operations can lead to// incorrect behavior when racing with a close.if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&atomic.Load(&c.closed) == 0 {return}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}lock(&c.lock)// CASE1: 從已經close且為空的channel recv數據,返回控制if c.closed != 0 && c.qcount == 0 {if raceenabled {raceacquire(unsafe.Pointer(c))}unlock(&c.lock) if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}// CASE2: send隊列不為空// CASE2.1: 緩存隊列為空,直接從sender recv元素// CASE2.2: 緩存隊列不為空,此時只有可能是緩存隊列已滿,從隊列頭取出元素,并喚醒sender將元素寫入緩存隊列尾部。由于是環形隊列,因此,隊列滿時只需要將隊列頭賦值給receiver,同時將sender元素復制到該位置,并移動隊列頭尾索引,不需要移動隊列元素if sg := c.sendq.dequeue(); sg != nil {// Found a waiting sender, If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queu// and add sender's value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}// CASE3: 緩存隊列不為空,直接從隊列去元素,移動頭索引if c.qcount > 0 {// Receive directly from queueqp := chanbuf(c, c.recvx)if raceenabled {raceacquire(qp)racerelease(qp)}if ep != nil {typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.qcount--unlock(&c.lock)return true, true}if !block {unlock(&c.lock)return false, false}// CASE4: 緩存隊列為空,將goroutine加入recv隊列,并阻塞// no sender available: block on this channelgp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.selectdone = nilmysg.c = cgp.param = nilc.recvq.enqueue(mysg)goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)// someone woke us upif mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}closed := gp.param == nilgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, !closed }

channel close

func closechan(c *hchan) {if c == nil {panic(plainError("close of nil channel"))}lock(&c.lock)// 重復close,產生panicif c.closed != 0 {unlock(&c.lock)panic(plainError("close of closed channel"))}if raceenabled {callerpc := getcallerpc(unsafe.Pointer(&c))racewritepc(unsafe.Pointer(c), callerpc, funcPC(closechan))racerelease(unsafe.Pointer(c))}c.closed = 1var glist *g//喚醒所有 receiver// release all readersfor {sg := c.recvq.dequeue()if sg == nil {break}if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)sg.elem = nil}if sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = nilif raceenabled {raceacquireg(gp, unsafe.Pointer(c))}gp.schedlink.set(glist)glist = gp}// 喚醒所有sender,并產生panic// release all writers (they will panic)for {sg := c.sendq.dequeue()if sg == nil {break}sg.elem = nilif sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = nilif raceenabled {raceacquireg(gp, unsafe.Pointer(c))}gp.schedlink.set(glist)glist = gp}unlock(&c.lock)// Ready all Gs now that we've dropped the channel lock.for glist != nil {gp := glistglist = glist.schedlink.ptr()gp.schedlink = 0goready(gp, 3)} }

?

總結

以上是生活随笔為你收集整理的由浅入深剖析go channel的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。