日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java如何通过grpc连接etcd_grpc通过 etcd 实现服务发现与注册-源码分析

發布時間:2023/12/2 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java如何通过grpc连接etcd_grpc通过 etcd 实现服务发现与注册-源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

介紹

下面介紹 jupiter-0.2.7 版本中 grpc 通過 etcd 實現服務發現與注冊。

服務發現與注冊的實現解析

服務注冊

服務注冊的流程圖:

etcd的服務注冊代碼模塊在 jupiter/pkg/registry/etcdv3 中。

下面讓我們來看看實際的代碼

// Registry register/unregister service

// registry impl should control rpc timeout

type Registry interface {

RegisterService(context.Context, *server.ServiceInfo) error

UnregisterService(context.Context, *server.ServiceInfo) error

ListServices(context.Context, string, string) ([]*server.ServiceInfo, error)

WatchServices(context.Context, string, string) (chan Endpoints, error)

io.Closer

}

復制代碼

在 pkg/registry/registry.go 中定義了注冊服務對象的接口。不同的服務只要實現了這些接口,jupiter 就能使用。

首先我們來看看注冊方法

// RegisterService register service to registry

func (reg *etcdv3Registry) RegisterService(ctx context.Context, info *server.ServiceInfo) error {

err := reg.registerBiz(ctx, info)

...

}

// 業務信息注冊

func (reg *etcdv3Registry) registerBiz(ctx context.Context, info *server.ServiceInfo) error {

...

// 提交信息到 etcd

_, err := reg.client.Put(readCtx, key, val, opOptions...)

...

}

復制代碼

這里主要的部分是 reg.client.Put()? 將服務信息提交到 etcd 中。其中的租約機制我會在之后單獨寫一篇文章介紹。這里主要還是關注如何注冊。

源碼中還有個 registerMetric()?方法,這個方法的目的是將服務信息在提交到etcd的 prometheus 前綴目錄下,用于服務監控,用的也是 client.Put() 方法。這里具體就不展示代碼了,感興趣的同學可以去源碼庫中查看。

服務退出

// 刪除服務

func (reg *etcdv3Registry) unregister(ctx context.Context, key string) error {

...

// 刪除服務信息

_, err := reg.client.Delete(ctx, key)

...

}

復制代碼

這里通過 client.Delete()? 方法將服務信息從 etcd 中刪除掉。

獲取服務列表

// ListServices list service registered in registry with name `name`

func (reg *etcdv3Registry) ListServices(ctx context.Context, name string, scheme string) (services []*server.ServiceInfo, err error) {

// 服務信息key的前綴

target := fmt.Sprintf("/%s/%s/providers/%s://", reg.Prefix, name, scheme)

// 獲取相關前綴的所有信息

getResp, getErr := reg.client.Get(ctx, target, clientv3.WithPrefix())

...

}

復制代碼

通過 client.Get()? 方法獲取到相同前綴的服務信息。

服務信息變動監控

// WatchServices watch service change event, then return address list

func (reg *etcdv3Registry) WatchServices(ctx context.Context, name string, scheme string) (chan registry.Endpoints, error) {

prefix := fmt.Sprintf("/%s/%s/", reg.Prefix, name)

// 通過etcd客戶端創建一個監控通道

watch, err := reg.client.WatchPrefix(context.Background(), prefix)

if err != nil {

return nil, err

}

...

xgo.Go(func() {

// 不斷接收etcd發送過來的變動事件

for event := range watch.C() {

switch event.Type {

case mvccpb.PUT:

updateAddrList(al, prefix, scheme, event.Kv)

case mvccpb.DELETE:

deleteAddrList(al, prefix, scheme, event.Kv)

}

out := al.DeepCopy()

fmt.Printf("al => %p\n", al.Nodes)

fmt.Printf("snapshot => %p\n", out.Nodes)

select {

// 將更新后的服務信息發送出去,接收方是 resolver

case addresses

default:

xlog.Warnf("invalid")

}

}

})

// 返回一個地址通道,用于傳遞

return addresses, nil

}

復制代碼

WatchServices()? 方法主要是監控信息的變動事件,并將變動后的服務信息重新返回給 resolver。具體思路是通過 etcdClient.Watch()? 方法創建一個監控通道,然后放入一個 goroutine來不斷接收 etcd 推送過來的事件,維護本地的服務信息,并通過 resolver 最終返回到 grpclb 負載均衡器進行服務地址信息的更新。

服務發現

服務發現流程圖:

grpc 的 resolver 模塊定義了兩個接口

// Builder creates a resolver that will be used to watch name resolution updates.

type Builder interface {

Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)

Scheme() string

}

// Resolver watches for the updates on the specified target.

// Updates include address updates and service config updates.

type Resolver interface {

ResolveNow(ResolveNowOptions)

Close()

}

復制代碼

首先我們來看看 Builder 接口的具體實現

type baseBuilder struct {

name string

reg? registry.Registry

}

// Build ...

func (b *baseBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {

endpoints, err := b.reg.WatchServices(context.Background(), target.Endpoint, "grpc")

if err != nil {

return nil, err

}

var stop = make(chan struct{})

xgo.Go(func() {

for {

select {

case endpoint :=

var state = resolver.State{

Addresses: make([]resolver.Address, 0),

...

}

for _, node := range endpoint.Nodes {

...

state.Addresses = append(state.Addresses, address)

}

cc.UpdateState(state)

case

return

}

}

})

return &baseResolver{

stop: stop,

}, nil

}

復制代碼

這里Build 方法主要是通過 Registry 模塊獲得監控服務通道,然后將更新的服務信息再更新到 grpcClient 中去,保證 grpcClient 的負載均衡器的服務地址永遠都是最新的。

如何將Builder的具體實現注冊到 grpc 中

import "google.golang.org/grpc/resolver"

// Register ...

func Register(name string, reg registry.Registry) {

resolver.Register(&baseBuilder{

name: name,

reg:? reg,

})

}

復制代碼

將 Registry模塊注入到 Builder 對象中,然后注入到 grpc 的 resolver 模塊中去。這樣 grpcClient 在實際運行中就會調用 etcd 的服務發現功能了。

grpc 如何使用服務與發現的源碼解析

這里在介紹一下jupiter框架在實際項目中如何使用服務發現與注冊。

服務注冊

func (app *Application) startServers() error {

var eg errgroup.Group

// start multi servers

for _, s := range app.servers {

s := s

eg.Go(func() (err error) {

_ = app.registerer.RegisterService(context.TODO(), s.Info())

defer app.registerer.UnregisterService(context.TODO(), s.Info())

...

})

}

return eg.Wait()

}

eng := engine.NewEngine()

eng.SetRegistry(compound_registry.New(

etcdv3_registry.StdConfig("default").Build(),

))

復制代碼

在框架的 Application 模塊中已經實現了服務的自動注冊與刪除。一般使用框架時不需要再調用。項目使用中只需要在創建 Application 對象時,將注冊中心信息注入即可。

服務發現

// 服務發現需要初始化,拿到etcd中服務的信息

func (eng *Engine) initResolver() error {

resolver.Register("etcd", etcdv3.StdConfig("default").Build())

return nil

}

復制代碼

服務發現也是類型的將注冊中心信息注入即可。

文章系列

總結

以上是生活随笔為你收集整理的java如何通过grpc连接etcd_grpc通过 etcd 实现服务发现与注册-源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。