e盾服务端源码_gRPC服务注册发现及负载均衡的实现方案与源码解析
今天聊一下gRPC的服務發現和負載均衡原理相關的話題,不同于Nginx、Lvs或者F5這些服務端的負載均衡策略,gRPC采用的是客戶端實現的負載均衡。什么意思呢,對于使用服務端負載均衡的系統,客戶端會首先訪問負載均衡的域名/IP,再由負載均衡按照策略分發請求到后端具體某個服務節點上。而對于客戶端的負載均衡則是,客戶端從可用的后端服務節點列表中根據自己的負載均衡策略選擇一個節點直連后端服務器。
Etcd軟件包的naming組件里提供了一個命名解析器(naming resolver)結合gRPC本身自帶的RoundRobin 輪詢調度負載均衡器,讓使用者能方便地搭建起一套服務注冊/發現和負載均衡體系。如果輪詢調度滿足不了調度需求或者不想使用Etcd作為服務的注冊中心和命名解析器的話,可以通過寫代碼實現gRPC定義的Resolver和Balancer接口來滿足系統的自定義需求。
本文引用的源碼對應的版本為:gRPC v1.2.x、 Etcd v3.3如果你對gRPC和Etcd還不了解,可以先看看我很早之前寫的gRPC入門和Etcd入門 系列的文章。
gRPC服務注冊發現
先來簡單的說明一下用Etcd實現服務注冊和發現的原理。服務注冊和發現這個流程可以用下面這個示意圖簡單描述出來:
上圖的服務A包含了兩個節點,服務在節點上啟動后,會以包含服務名加節點IP的唯一標識作為Key(比如/service/a/114.128.45.117),服務節點IP和端口信息作為值存儲到Etcd上。這些Key都是帶租約的Key,需要我們的服務自己去定期續租,一旦服務節點本身宕掉,比如node2上的服務宕掉,無法完成續租后,那么它對應的Key:/service/a/114.128.45.117 就會過期,客戶端也就無法再從Etcd上獲取到這個服務節點的信息了。
與此同時客戶端也會利用Etcd的Watch功能監聽以/servive/a為前綴的所有Key的變化,如果有新增或者刪除節點Key的事件發生Etcd都會通過WatchChan發送給客戶端,WatchChan在編程語言上的實現就是Go的Channel。
服務注冊
關于Etcd的服務注冊,官方提供的軟件包里并沒有提供統一的注冊函數供調用。那么我們在新增服務節點后怎么把節點的信息存儲到Etcd上并通知給命名解析器呢?在Etcd源碼包的naming/grpc.go里可以發現提供了一個Update方法,這個Update既能執行添加也能執行刪除操作:
func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ...etcd.OpOption) (err error) {switch nm.Op {case naming.Add:var v []byteif v, err = json.Marshal(nm); err != nil {return status.Error(codes.InvalidArgument, err.Error())}_, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)case naming.Delete:_, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)default:return status.Error(codes.InvalidArgument, "naming: bad naming op")}return err }服務在啟動完成后可以通過Update方法把自己的服務地址和端口Put到自定義的target為前綴的key里,針對上面圖示里的例子,變量target就應該是我們定義的服務名/service/a。一般在具體實踐里都是自己根據系統的需求封裝Update方法完成服務注冊,以及服務節點Key在Etcd上的定期續租,這塊每個公司的實踐都不一樣,我就不放具體的代碼了,一般續租都是通過Etcd租約里的KeepAlive方法實現的(Lease.KeepAlive)。
服務發現
在注冊完新節點、或者是原來的節點停掉后,客戶端是怎么知道的呢?這塊就需要命名解析器Resolver來幫助實現了,Resolver的作用可以理解為從一個字符串映射到一組IP端口等信息。
gRPC對Resolver的接口定義如下:
type Resolver interface {// Resolve creates a Watcher for target.Resolve(target string) (Watcher, error) }命名解析器的Resolve方法會返回一個Watcher,這個Watcher可以監聽命名解析器發來的target(類似上面例子里說的與服務名相對應的Key)對應的后端服務器地址信息變化,通知Balancer對自己維護的地址進行動態地增刪。
Watcher接口的定義如下:
//源碼地址 https://github.com/grpc/grpc-go/blob/v1.2.x/naming/naming.go type Watcher interface {Next() ([]*Update, error)// Close closes the Watcher.Close() }Etcd為這兩個接口都提供了實現:
// 源碼地址:https://github.com/etcd-io/etcd/blob/release-3.3/clientv3/naming/grpc.go// GRPCResolver 實現了grpc的naming.Resolver接口 type GRPCResolver struct {// Client is an initialized etcd client.Client *etcd.Client }func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) {ctx, cancel := context.WithCancel(context.Background())w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel}return w, nil }// 實現了grpc的naming.Watcher接口 type gRPCWatcher struct {c *etcd.Clienttarget stringctx context.Contextcancel context.CancelFuncwch etcd.WatchChanerr error }func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {if gw.wch == nil {// first Next() returns all addressesreturn gw.firstNext()}// process new events on target/*wr, ok := <-gw.wchif !ok {...updates := make([]*naming.Update, 0, len(wr.Events))for _, e := range wr.Events {var jupdate naming.Updatevar err errorswitch e.Type {case etcd.EventTypePut:err = json.Unmarshal(e.Kv.Value, &jupdate)jupdate.Op = naming.Addcase etcd.EventTypeDelete:err = json.Unmarshal(e.PrevKv.Value, &jupdate)jupdate.Op = naming.Deletedefault:continue}if err == nil {updates = append(updates, &jupdate)}}return updates, nil }func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {// 獲取前綴為gw.target的所有Key的值,放到現有數組里resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())if gw.err = err; err != nil {return nil, err}updates := make([]*naming.Update, 0, len(resp.Kvs))for _, kv := range resp.Kvs {var jupdate naming.Updateif err := json.Unmarshal(kv.Value, &jupdate); err != nil {continue}updates = append(updates, &jupdate)}opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}// watch 監聽這些Key的變化,包括前綴相同的新Key的加入gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)return updates, nil }func (gw *gRPCWatcher) Close() { gw.cancel() }這部分GRPCResolver和gRPCWatcher類型的每個方法的功能和起到的作用都和RoundRobin這個gRPC Balancer結合地比較緊密,我準備放到下面和負載均衡的源碼實現一起說明。
負載均衡
首先我們來看一下gRPC對負載均衡的接口定義:
type Balancer interface {Start(target string, config BalancerConfig) errorUp(addr Address) (down func(error))Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error)Notify() <-chan []Address// Close shuts down the balancer.Close() error }在gRPC 客戶端與服務端之間建立連接時調用的Dail方法里可以用WithBalancer方法在DiaplOption里指定負載均衡組件:
client, err := etcd.Client()...resolver := &naming.GRPCResolver{Client: client}b := grpc.RoundRobin(resolver)opt0 := grpc.WithBalancer(b)grpc.Dial(target, opt0 , opt1, ...) // 后面省略了上面的例子使用了gRPC自帶的Balancer實現RoundRobin,RoundRobin除了實現了Balancer接口外自己內置了Resolver用來從名字獲取其后綁定的IP信息以及服務的更新事件(增加刪除服務節點這些事件) 。上面的例子里給RoundRobin指定了Etcd提供的name.GRPCResolver做為它的命名解析器,這個命名解析器就是上一節說的Etcd軟件包里提供的gRPCnaming.Resolver接口實現。
RoundRobin
下面我們研究一下gRPC包里提供的RoundRobin代碼實現,主要關注負載均衡和利用Resolver進行服務發現及節點更新這兩個功能的代碼實現原理
RoundRobin結構體定義如下:
// 源碼在:https://github.com/grpc/grpc-go/blob/v1.2.x/balancer.go type roundRobin struct {r naming.Resolverw naming.Watcheraddrs []*addrInfo // 客戶端可以嘗試連接的所有地址mu sync.MutexaddrCh chan []Address // 用于通知gRPC內部的,客戶端可連接地址的信道next int // index of the next address to return for Get()waitCh chan struct{} // the channel to block when there is no connected address availabledone bool // The Balancer is closed. }- r是命名解析器,可以定義自己的命名解析器,如Etcd命名解析器。如果r為nil,那么Dial中參數target將直接作為可請求地址添加到addrs中。
- w是命名解析器Resolve方法返回的watcher,該watcher可以監聽命名解析器發來的地址信息變化,通知roundRobin對addrs中的地址進行動態的增刪。
- addrs是從命名解析器獲取地址信息數組,數組中每個地址不僅有地址信息,還有gRPC與該地址是否已經創建了ready狀態的連接的標記。
- addrCh是地址數組的Channel,該Channel會在每次命名解析器發來地址信息變化后,將所有地址更新通知到gRPC內部的lbWatcher,lbWatcher是統一管理地址連接狀態的協程,負責新地址的連接與被刪除地址的關閉操作。
- next是roundRobin的Index,即輪詢調度遍歷到addrs數組中的哪個位置了。
- waitCh是當addrs中地址為空時,grpc調用Get()方法希望獲取到一個到target的連接,如果設置了gRPC的failfast為false,那么Get()方法會阻塞在此Channel上,直到有ready的連接。
啟動RoundRobin
啟動RoundRobin就是實現Balancer接口的Start方法,該方法是由一開始通過grpc.WithBalancer把負載均衡器指定給的BalancerWrapperBuilder在創建BalancerWrapper時觸發的:
func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {// 這里觸發Balancer的Start方法bwb.b.Start(opts.Target.Endpoint, BalancerConfig{DialCreds: opts.DialCreds,Dialer: opts.Dialer,})_, pickfirst := bwb.b.(*pickFirst)bw := &balancerWrapper{......}cc.UpdateBalancerState(connectivity.Idle, bw)go bw.lbWatcher() // 監聽Balancer 通知過來的地址變化return bw }Start方法其主要功能就是通過RoundRobin的命名解析器的Resolve方法拿到監聽命名解析器后端變化的Watcher。與此同時還會新建一個addrChan用于向gRPC內部的lbWatcher推送Watcher監聽到的地址變化。
func (rr *roundRobin) Start(target string, config BalancerConfig) error {rr.mu.Lock()defer rr.mu.Unlock()if rr.done {return ErrClientConnClosing}if rr.r == nil {// 如果沒有解析器,那么直接將target加入addrs地址數組rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}})return nil}// Resolve接口會返回一個watcher,watcher可以監聽解析器的地址變化w, err := rr.r.Resolve(target)if err != nil {return err}rr.w = w// 創建一個channel,當watcher監聽到地址變化時,通知grpc內部lbWatcher去連接該地址rr.addrCh = make(chan []Address, 1)// go 創建新協程監聽watcher,監聽地址變化。go func() {for {if err := rr.watchAddrUpdates(); err != nil {return}}}()return nil }創建完addrCh后在Start方法最后會開啟一個goroutine,這個goroutine會不停地循環調用watchAddrUpdates查詢是否有命名解析器的Watcher傳遞過來的更新。
監聽服務端地址的更新
在watchAddrUpdates方法里就是通過上面Start方法里創建的Resolver Watcher的Next方法來監聽Etcd上后端服務節點的更新,這個Watcher的實現就是上面服務發現章節里說的Etcd軟件包里提供的gRPCWatcher類型,它的Next方法里會去通過監聽Etcd上由服務名組成的Key的變化,然后在這里把這些信息傳遞給上面Start方法里創建好的addrChan通道。
func (rr *roundRobin) watchAddrUpdates() error {// watcher的next方法會阻塞,直至有地址變化信息過來,updates即為變化信息updates, err := rr.w.Next()if err != nil {return err}// 對于addrs地址數組的操作,顯然是要加鎖的,因為有多個goroutine在同時操作rr.mu.Lock()defer rr.mu.Unlock()for _, update := range updates {addr := Address{Addr: update.Addr,Metadata: update.Metadata,}switch update.Op {case naming.Add://對于新增類型的地址,注意這里不會重復添加。var exist boolfor _, v := range rr.addrs {if addr == v.addr {exist = truebreak}}if exist {continue}rr.addrs = append(rr.addrs, &addrInfo{addr: addr})case naming.Delete://對于刪除的地址,直接在addrs中刪除就行了for i, v := range rr.addrs {if addr == v.addr {copy(rr.addrs[i:], rr.addrs[i+1:])rr.addrs = rr.addrs[:len(rr.addrs)-1]break}}default:grpclog.Errorln("Unknown update.Op ", update.Op)}}// 這里復制了整個addrs地址數組,然后丟到addrCh channel中通知grpc內部lbWatcher,// lbWatcher會關閉刪除的地址,連接新增的地址。// 連接ready后會有專門的goroutine調用Up方法修改addrs中地址的狀態。open := make([]Address, len(rr.addrs))for i, v := range rr.addrs {open[i] = v.addr}if rr.done {return ErrClientConnClosing}select {case <-rr.addrCh:default:}rr.addrCh <- openreturn nil }建立連接
Up方法是gRPC內部負載均衡的watcher調用的,該watcher會讀全局的連接狀態隊列,改變RoundRobin維護的連接列表的里連接的狀態 (會有單獨的goroutine向目標服務發起連接嘗試,嘗試成功后才會把連接對象的連接狀態改為connected),如果是已連接狀態的連接 ,會調用Up方法來改變addrs地址數組中該地址的狀態為已連接。
func (rr *roundRobin) Up(addr Address) func(error) {rr.mu.Lock()defer rr.mu.Unlock()var cnt int//將地址數組中的addr置為已連接狀態,這樣這個地址就可以被client使用了。for _, a := range rr.addrs {if a.addr == addr {if a.connected {return nil}a.connected = true}if a.connected {cnt++}}// 當有一個可用地址時,之前可能是0個,可能要很多client阻塞在獲取連接地址上,這里通知所有的client有可用連接啦。// 為什么只等于1時通知?因為可用地址數量>1時,client是不會阻塞的。if cnt == 1 && rr.waitCh != nil {close(rr.waitCh)rr.waitCh = nil}//返回禁用該地址的方法return func(err error) {rr.down(addr, err)} }關閉連接
關閉連接使用的是Down方法,這個方法就簡單, 直接找到addr置為不可用就行了。
func (rr *roundRobin) down(addr Address, err error) {rr.mu.Lock()defer rr.mu.Unlock()for _, a := range rr.addrs {if addr == a.addr {a.connected = falsebreak}} }客戶端獲取連接
客戶端在調用gRPC具體Method的Invoke方法里,會去RoundRobin的連接池addrs里獲取連接,如果addrs為空,或者addrs里的地址都不可用,Get()方法會返回錯誤。但是如果設置了failfast = false,Get()方法會阻塞在waitCh這個通道上,直至Up方法給到通知,然后輪詢調度可用的地址。
func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {var ch chan struct{}rr.mu.Lock()if rr.done {rr.mu.Unlock()err = ErrClientConnClosingreturn}if len(rr.addrs) > 0 {// addrs的長度可能變化,如果next值超出了,就置為0,從頭開始調度。if rr.next >= len(rr.addrs) {rr.next = 0}next := rr.next//遍歷整個addrs數組,直到選出一個可用的地址for {a := rr.addrs[next]// next值加一,當然是循環的,到len(addrs)后,變為0next = (next + 1) % len(rr.addrs)if a.connected {addr = a.addrrr.next = nextrr.mu.Unlock()return}if next == rr.next {// 遍歷完一圈了,還沒找到,走下面邏輯break}}}if !opts.BlockingWait { //如果是非阻塞模式,如果沒有可用地址,那么報錯if len(rr.addrs) == 0 {rr.mu.Unlock()err = status.Errorf(codes.Unavailable, "there is no address available")return}// Returns the next addr on rr.addrs for failfast RPCs.addr = rr.addrs[rr.next].addrrr.next++rr.mu.Unlock()return}// Wait on rr.waitCh for non-failfast RPCs.// 如果是阻塞模式,那么需要阻塞在waitCh上,直到Up方法給通知if rr.waitCh == nil {ch = make(chan struct{})rr.waitCh = ch} else {ch = rr.waitCh}rr.mu.Unlock()for {select {case <-ctx.Done():err = ctx.Err()returncase <-ch:rr.mu.Lock()if rr.done {rr.mu.Unlock()err = ErrClientConnClosingreturn}if len(rr.addrs) > 0 {if rr.next >= len(rr.addrs) {rr.next = 0}next := rr.nextfor {a := rr.addrs[next]next = (next + 1) % len(rr.addrs)if a.connected {addr = a.addrrr.next = nextrr.mu.Unlock()return}if next == rr.next {// 遍歷完一圈了,還沒找到,可能剛Up的地址被down掉了,重新等待。break}}}// The newly added addr got removed by Down() again.if rr.waitCh == nil {ch = make(chan struct{})rr.waitCh = ch} else {ch = rr.waitCh}rr.mu.Unlock()}} }總結
整個gRPC基于Etcd實現服務注冊/發現以及負載均衡的流程和關鍵的源碼實現就梳理完了,其實源碼實現的細節遠比我這里列舉的要復雜,這篇文章的目的也是希望能記錄下一學習和實踐gRPC的負載均衡和服務解析時的一些關鍵路徑。另外需要注意的是本文里使用的是gRPC v1.2.x的代碼,在1.3版本后官方包重新調整了目錄和包名,與本文里列舉的源碼以及Balancer的使用上都會有些出入,不過原理還是大致一樣的,只不過每一版都一直在此基礎上演進。
看到這里了,如果喜歡我的文章可以幫我點個贊,我會每周通過技術文章分享我的所學所見和第一手實踐經驗,感謝你的支持。微信搜索關注公眾號「網管叨bi叨」第一時間獲取我的文章推送。總結
以上是生活随笔為你收集整理的e盾服务端源码_gRPC服务注册发现及负载均衡的实现方案与源码解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 怎样用python批量处理文件夹_pyt
- 下一篇: 用户登录查全表好还是用用户名好_外贸人/