vitess源码阅读笔记cache系列之用go实现通用资源池
更新至2012.4.8的vitess代碼
?
新的代碼增加了同步用的條件變量,沒有空閑資源時的排隊不再使用channel來同步(使用其它編程語言的同學可以方便的移植這個代碼了),
轉而使用condition variable。不再使用mu.RLock,統一使用Lock,不再糾結。?整體代碼清晰了許多。
為了進一步提高性能和代碼復用,vitess還提供了通用的池管理,RoundRobin.go中實現了通用的資源池,方便管理池內資源總數,超時。
先上第一道菜:RoundRobin在整個vitess架構中扮演的什么角色? 個人覺得RoundRobin的重要性在vitess中怎么著也能算個丐幫的幾個長老之一吧,作為其它幾個pool的核心基礎 如cache_pool.go就是RoundRobin + memcache client實現的,而conn_pool.go則是用RoundRobin實現的連接池。 先看看RoundRobin.go的基本原理,老規矩,還是從數據結構入手 //?Factory?is?a?function?that?can?be?used?to?create?a?resource.type?Factory?func()?(Resource,?error)????????//工廠方法,資源創建函數,如connection
//?Every?resource?needs?to?suport?the?Resource?interface.??
type?Resource?interface?{????
????Close()
????IsClosed()?bool
}
這里要注意的是使用RoundRobin資源池來管理的對象必須實現Resouce接口 。
為什么要實現這兩個接口呢?,因為資源池需要知道如何Close超時的資源,
以及哪些資源是空閑的,哪些已經關閉了。
//?RoundRobin?allows?you?to?use?a?pool?of?resources?in?a?round?robin?fashion.type?RoundRobin?struct?{
????//?mu?controls?resources?&?factory
????//?Use?Lock?to?modify,?RLock?otherwise
????mu????????sync.RWMutex????????
????resources?chan?fifoWrapper?//用chan來模擬一個FIFO的隊列,先來先服務
????factory???Factory
????//?Use?sync/atomic?to?access?the?following?vars
????size????????int64????????????????//LQ:?池的總大小
????waitCount???int64????????????????//LQ:?還有多少人在等池內出現空閑資源
????waitTime????int64????????????????//LQ:?等待空閑資源總共花了多少時間
????idleTimeout?int64????????????????//LQ:?最多允許資源空閑多長時間,超過則close空閑資源
}
type?fifoWrapper?struct?{
????resource?Resource
????timeUsed?time.timeUsed?????????????//LQ:?用于控制超時,初始值為資源上次進入池內的時間(見Put函數)
}
抱歉都快到電視劇的第N集了才看到golang里面的重量級組件channel,簡單的理解可以認為channel是個
支持生產者消費者模型的同步隊列,而且是個不需要銷毀的隊列,golang會自動將不再使用的channel當垃圾回收掉。
其實池也是生產者消費者模型。所以需要外部提供生產資源的方法,也就是上面的Factory接口。這里的可以簡單理解為 c語言里面的函數指針。 RoundRobin的實現邏輯是這樣的,如果池內有資源,則可以Get成功,如果沒有,但還有容量,則用Factory創建一個資源。 如果池內已經沒有空閑資源,則傻等,知道有空閑資源可用位置。為了了解資源池的運作狀態,還記錄了等待的客戶有多少。 總共等了多少時間,有了這些就可以方便的評估RoundRobin的效果。源代碼也是按照這個思路來寫的。不多說,上代碼,詳細內容見標注的代碼注釋 ??
//?Get?will?return?the?next?available?resource.?If?none?is?available,?and?capacity//?has?not?been?reached,?it?will?create?a?new?one?using?the?factory.?Otherwise,
//?it?will?indefinitely?wait?till?the?next?resource?becomes?available.
func?(self?*RoundRobin)?Get()?(resource?Resource,?err?error)?{
????????return?self.get(true)
}
//?TryGet?will?return?the?next?available?resource.?If?none?is?available,?and?capacity
//?has?not?been?reached,?it?will?create?a?new?one?using?the?factory.?Otherwise,
//?it?will?return?nil?with?no?error.
func?(self?*RoundRobin)?TryGet()?(resource?Resource,?err?error)?{
????????return?self.get(false)
}
func?(self?*RoundRobin)?get(wait?bool)?(resource?Resource,?err?error)?{
????????self.mu.Lock()
????????defer?self.mu.Unlock()
????????//?Any?waits?in?this?loop?will?release?the?lock,?and?it?will?be
????????//?reacquired?before?the?waits?return.
????????for?{
????????????????select?{
????????????????case?fw?:=?<-self.resources:
????????????????????????//?Found?a?free?resource?in?the?channel
????????????????????????if?self.idleTimeout?>?0?&&?fw.timeUsed.Add(self.idleTimeout).Sub(time.Now())?<?0?{
????????????????????????????????//?resource?has?been?idle?for?too?long.?Discard?&?go?for?next.
????????????????????????????????go?fw.resource.Close()
????????????????????????????????self.size--
????????????????????????????????continue
????????????????????????}
????????????????????????return?fw.resource,?nil
????????????????default:
????????????????????????//?resource?channel?is?empty
????????????????????????if?self.size?>=?int64(cap(self.resources))?{
????????????????????????????????//?The?pool?is?full
????????????????????????????????if?wait?{
????????????????????????????????????????start?:=?time.Now()
????????????????????????????????????????self.available.Wait() //沒有空閑資源了,等著吧,不如上一版本的代碼自然啊
????????????????????????????????????????self.recordWait(start)
????????????????????????????????????????continue
????????????????????????????????}
????????????????????????????????return?nil,?nil
????????????????????????}
????????????????????????//?Pool?is?not?full.?Create?a?resource.
????????????????????????if?resource,?err?=?self.waitForCreate();?err?==?nil?{
????????????????????????????????//?Creation?successful.?Account?for?this?by?incrementing?size.
????????????????????????????????self.size++
????????????????????????}
????????????????????????return?resource,?err
????????????????}
????????}
????????panic("unreachable")
}
func?(self?*RoundRobin)?recordWait(start?time.Time)?{
????????self.waitCount++
????????self.waitTime?+=?time.Now().Sub(start)
}
//LQ:?這里的increment和decrement應該是多余的,沒看明白作者是什么目的,和驚群有啥關系
//為了避免self.factory()比較耗時,執行self.factory時unlock還是有必要的
func?(self?*RoundRobin)?waitForCreate()?(resource?Resource,?err?error)?{
????????//?Prevent?thundering?herd:?increment?size?before?creating?resource,?and?decrement?after.
????????self.size++
????????self.mu.Unlock()
????????defer?func()?{
????????????????self.mu.Lock()
????????????????self.size--
????????}()
????????return?self.factory()
}
??
在代碼注釋中可以看到,為了避免驚群效應,這里采用的方式是先increment,本人也不太明白,為什么這樣能避免驚群效應。
還請熟悉的朋友不吝賜教。看完了Get發現排隊等待是那么的自然,一行代碼的事情。再來看Put函數,我們會發現喚醒也是那么的簡潔。
?//?Put?will?return?a?resource?to?the?pool.?You?MUST?return?every?resource?to?the?pool,
//?even?if?it's?closed.?If?a?resource?is?closed,?Put?will?discard?it.?Thread?synchronization
//?between?Close()?and?IsClosed()?is?the?caller's?responsibility.
func?(self?*RoundRobin)?Put(resource?Resource)?{
????????self.mu.Lock()
????????defer?self.mu.Unlock()
????????defer?self.available.Signal()????????//LQ:?排隊的兄弟該醒醒了
????????if?self.size?>?int64(cap(self.resources))?{
????????????????go?resource.Close()
????????????????self.size--
????????}?else?if?resource.IsClosed()?{
????????????????self.size--
????????}?else?{
????????????????self.resources?<-?fifoWrapper{resource,?time.Now()}
????????}
}
??
接下來的系列將會分析其它依賴RoundRobin的幾個池的實現,然后分析vitess的各種功能是如何實現的。休息,休息一會兒。?
?
轉載于:https://www.cnblogs.com/gongaut/archive/2012/04/06/2434648.html
總結
以上是生活随笔為你收集整理的vitess源码阅读笔记cache系列之用go实现通用资源池的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Timus 1018 树形DP
- 下一篇: 如何使用jquery的Highchart