java如何通过grpc连接etcd_grpc通过 etcd 实现服务发现与注册-源码分析
介紹
下面介紹 jupiter-0.2.7 版本中 grpc 通過 etcd 實現服務發現與注冊。
服務發現與注冊的實現解析
服務注冊
服務注冊的流程圖:
etcd的服務注冊代碼模塊在 jupiter/pkg/registry/etcdv3 中。
下面讓我們來看看實際的代碼
// Registry register/unregister service
// registry impl should control rpc timeout
type Registry interface {
RegisterService(context.Context, *server.ServiceInfo) error
UnregisterService(context.Context, *server.ServiceInfo) error
ListServices(context.Context, string, string) ([]*server.ServiceInfo, error)
WatchServices(context.Context, string, string) (chan Endpoints, error)
io.Closer
}
復制代碼
在 pkg/registry/registry.go 中定義了注冊服務對象的接口。不同的服務只要實現了這些接口,jupiter 就能使用。
首先我們來看看注冊方法
// RegisterService register service to registry
func (reg *etcdv3Registry) RegisterService(ctx context.Context, info *server.ServiceInfo) error {
err := reg.registerBiz(ctx, info)
...
}
// 業務信息注冊
func (reg *etcdv3Registry) registerBiz(ctx context.Context, info *server.ServiceInfo) error {
...
// 提交信息到 etcd
_, err := reg.client.Put(readCtx, key, val, opOptions...)
...
}
復制代碼
這里主要的部分是 reg.client.Put()? 將服務信息提交到 etcd 中。其中的租約機制我會在之后單獨寫一篇文章介紹。這里主要還是關注如何注冊。
源碼中還有個 registerMetric()?方法,這個方法的目的是將服務信息在提交到etcd的 prometheus 前綴目錄下,用于服務監控,用的也是 client.Put() 方法。這里具體就不展示代碼了,感興趣的同學可以去源碼庫中查看。
服務退出
// 刪除服務
func (reg *etcdv3Registry) unregister(ctx context.Context, key string) error {
...
// 刪除服務信息
_, err := reg.client.Delete(ctx, key)
...
}
復制代碼
這里通過 client.Delete()? 方法將服務信息從 etcd 中刪除掉。
獲取服務列表
// ListServices list service registered in registry with name `name`
func (reg *etcdv3Registry) ListServices(ctx context.Context, name string, scheme string) (services []*server.ServiceInfo, err error) {
// 服務信息key的前綴
target := fmt.Sprintf("/%s/%s/providers/%s://", reg.Prefix, name, scheme)
// 獲取相關前綴的所有信息
getResp, getErr := reg.client.Get(ctx, target, clientv3.WithPrefix())
...
}
復制代碼
通過 client.Get()? 方法獲取到相同前綴的服務信息。
服務信息變動監控
// WatchServices watch service change event, then return address list
func (reg *etcdv3Registry) WatchServices(ctx context.Context, name string, scheme string) (chan registry.Endpoints, error) {
prefix := fmt.Sprintf("/%s/%s/", reg.Prefix, name)
// 通過etcd客戶端創建一個監控通道
watch, err := reg.client.WatchPrefix(context.Background(), prefix)
if err != nil {
return nil, err
}
...
xgo.Go(func() {
// 不斷接收etcd發送過來的變動事件
for event := range watch.C() {
switch event.Type {
case mvccpb.PUT:
updateAddrList(al, prefix, scheme, event.Kv)
case mvccpb.DELETE:
deleteAddrList(al, prefix, scheme, event.Kv)
}
out := al.DeepCopy()
fmt.Printf("al => %p\n", al.Nodes)
fmt.Printf("snapshot => %p\n", out.Nodes)
select {
// 將更新后的服務信息發送出去,接收方是 resolver
case addresses
default:
xlog.Warnf("invalid")
}
}
})
// 返回一個地址通道,用于傳遞
return addresses, nil
}
復制代碼
WatchServices()? 方法主要是監控信息的變動事件,并將變動后的服務信息重新返回給 resolver。具體思路是通過 etcdClient.Watch()? 方法創建一個監控通道,然后放入一個 goroutine來不斷接收 etcd 推送過來的事件,維護本地的服務信息,并通過 resolver 最終返回到 grpclb 負載均衡器進行服務地址信息的更新。
服務發現
服務發現流程圖:
grpc 的 resolver 模塊定義了兩個接口
// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
Scheme() string
}
// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
ResolveNow(ResolveNowOptions)
Close()
}
復制代碼
首先我們來看看 Builder 接口的具體實現
type baseBuilder struct {
name string
reg? registry.Registry
}
// Build ...
func (b *baseBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
endpoints, err := b.reg.WatchServices(context.Background(), target.Endpoint, "grpc")
if err != nil {
return nil, err
}
var stop = make(chan struct{})
xgo.Go(func() {
for {
select {
case endpoint :=
var state = resolver.State{
Addresses: make([]resolver.Address, 0),
...
}
for _, node := range endpoint.Nodes {
...
state.Addresses = append(state.Addresses, address)
}
cc.UpdateState(state)
case
return
}
}
})
return &baseResolver{
stop: stop,
}, nil
}
復制代碼
這里Build 方法主要是通過 Registry 模塊獲得監控服務通道,然后將更新的服務信息再更新到 grpcClient 中去,保證 grpcClient 的負載均衡器的服務地址永遠都是最新的。
如何將Builder的具體實現注冊到 grpc 中
import "google.golang.org/grpc/resolver"
// Register ...
func Register(name string, reg registry.Registry) {
resolver.Register(&baseBuilder{
name: name,
reg:? reg,
})
}
復制代碼
將 Registry模塊注入到 Builder 對象中,然后注入到 grpc 的 resolver 模塊中去。這樣 grpcClient 在實際運行中就會調用 etcd 的服務發現功能了。
grpc 如何使用服務與發現的源碼解析
這里在介紹一下jupiter框架在實際項目中如何使用服務發現與注冊。
服務注冊
func (app *Application) startServers() error {
var eg errgroup.Group
// start multi servers
for _, s := range app.servers {
s := s
eg.Go(func() (err error) {
_ = app.registerer.RegisterService(context.TODO(), s.Info())
defer app.registerer.UnregisterService(context.TODO(), s.Info())
...
})
}
return eg.Wait()
}
eng := engine.NewEngine()
eng.SetRegistry(compound_registry.New(
etcdv3_registry.StdConfig("default").Build(),
))
復制代碼
在框架的 Application 模塊中已經實現了服務的自動注冊與刪除。一般使用框架時不需要再調用。項目使用中只需要在創建 Application 對象時,將注冊中心信息注入即可。
服務發現
// 服務發現需要初始化,拿到etcd中服務的信息
func (eng *Engine) initResolver() error {
resolver.Register("etcd", etcdv3.StdConfig("default").Build())
return nil
}
復制代碼
服務發現也是類型的將注冊中心信息注入即可。
文章系列
總結
以上是生活随笔為你收集整理的java如何通过grpc连接etcd_grpc通过 etcd 实现服务发现与注册-源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 小米商城怎么学生认证
- 下一篇: bloomfilter的java实现,B