grpc 传递上下文_grpc 源码笔记 02:ClientConn
上篇筆記中梳理了一把 resolver 和 balancer,這里順著前面的流程走一遍入口的 ClientConn 對(duì)象。
ClientConn
// ClientConn represents a virtual connection to a conceptual endpoint, to
// perform RPCs.
//
// A ClientConn is free to have zero or more actual connections to the endpoint
// based on configuration, load, etc. It is also free to determine which actual
// endpoints to use and may change it every RPC, permitting client-side load
// balancing.
//
// A ClientConn encapsulates a range of functionality including name
// resolution, TCP connection establishment (with retries and backoff) and TLS
// handshakes. It also handles errors on established connections by
// re-resolving the name and reconnecting.
type ClientConn struct {ctx context.Contextcancel context.CancelFunctarget stringparsedTarget resolver.Targetauthority stringdopts dialOptionscsMgr *connectivityStateManagerbalancerBuildOpts balancer.BuildOptionsblockingpicker *pickerWrappermu sync.RWMutexresolverWrapper *ccResolverWrappersc *ServiceConfigconns map[*addrConn]struct{}// Keepalive parameter can be updated if a GoAway is received.mkp keepalive.ClientParameterscurBalancerName stringbalancerWrapper *ccBalancerWrapperretryThrottler atomic.ValuefirstResolveEvent *grpcsync.EventchannelzID int64 // channelz unique identification numberczData *channelzData
}首先是 ctx 和 cancel 兩個(gè)字段,之前好像有看到什么最佳實(shí)戰(zhàn)說(shuō)不要把 context 字段放在 struct 里傳遞而要放在 func 里傳遞,但是這里確實(shí)屬于一個(gè)非常合理的場(chǎng)景:管理連接的生命周期,這個(gè) ctx 和 cancel 都是來(lái)自建立連接時(shí)的 DialContext,標(biāo)準(zhǔn)庫(kù)的 net.Conn 的結(jié)構(gòu)體中也有同樣的兩個(gè)字段,這樣請(qǐng)求上下文中建立的連接,可以在請(qǐng)求結(jié)束時(shí)安全釋放掉。ClientConn 中派生出的 goroutine,也能通過(guò) cancel 函數(shù)安全地關(guān)閉掉。
target、parsedTarget、authority、dopts 似乎都屬于比較原始的參數(shù)。
csMgr 用于管理 ClientConn 總體的連接狀態(tài),先放一下,后面詳細(xì)看。
resolverWrapper、conns、curBalancerName、balancerWrapper、firstResolveEvent 跟名字解析、負(fù)載均衡相關(guān),上一篇筆記中簡(jiǎn)單看過(guò)一點(diǎn)。retryThrottler 大約是重試的退避策略,還沒(méi)有了解過(guò)。
sc *ServiceConfig 是服務(wù)端給出的服務(wù)參數(shù)信息,大約是 maxRequestMessageBytes、timeout 之類(lèi)的控制信息,可以具體到接口級(jí)別。mkp keepalive.ClientParameters 也是參數(shù)信息,與 keepalive 相關(guān)。
channelzID 和 czData 與 channelz 的信息相關(guān),channelz 是 grpc 內(nèi)部的一些埋點(diǎn)監(jiān)控性質(zhì)的信息,大體上是一個(gè)異步的 AddTraceEvent 然后匯聚數(shù)值,看代碼的時(shí)候應(yīng)該可以忽略這部分。
ClientConn 與 resolverWrapper / balancerWrapper 的交互
clientConn 與 resolver / balancer 之間的交互在上一篇筆記中簡(jiǎn)單梳理過(guò),好處是接口比較明確,所以交互比較清晰。clientConn 與 resolverWrapper / balancerWrapper 之間的交互都是具體的方法,手工梳理一下。
resolverWrapper 對(duì) clientConn 的調(diào)用有 updateResolverState。
clientConn 對(duì) resolverWrapper 的調(diào)用有 resolveNow。
clientConn 對(duì) balancerWrapper 的調(diào)用有:
- resolveError:調(diào)用來(lái)自 clientConn 的 updateResolverState 方法,該方法是被 resolverWrapper 所調(diào)用的。
- handleSubConnStateChange,調(diào)用來(lái)自 clientConn 的 handleSubConnStateChange 方法,該方法又是被 addrConn 的 updateConnectivityState 調(diào)用的。
- updateClientConnState,調(diào)用來(lái)自 clientConn 的 updateResolverState,用于傳遞名字解析的更新。
balancerWrapper 對(duì) clientConn 的調(diào)用有:
- newAddrConn、removeAddrConn:大體上與 NewSubConn 和 RemoveSubConn 相映射,addrConn 是具體的 SubConn 的實(shí)現(xiàn)。
- blockingPicker.updatePicker、csMgr.updateState:皆在 UpdateBalancerState 時(shí)調(diào)用,將 balancer.State 中的 picker 與總連接狀態(tài)設(shè)置給 clientConn。
- resolveNow:來(lái)自 ResolveNow,向 clientConn 發(fā)起 resolver 的解析。
畫(huà)一張圖:
交互的過(guò)程感覺(jué)有點(diǎn)像 k8s 那種偵聽(tīng)結(jié)構(gòu)體的字段變動(dòng)做收斂邏輯的意思,比如 resolver 給出后端地址、ServiceConfig、附加元信息的 State 結(jié)構(gòu)體,ClientConn 跟 balancer 都拿這一個(gè)結(jié)構(gòu)體中自己關(guān)心的字段做自己的邏輯,整個(gè)流程都異步做。
這張圖里只有 handleSubConnStateChange 的來(lái)源沒(méi)標(biāo)注。它是來(lái)自 addrConn 的回調(diào),后面再展開(kāi)梳理。
ClientConn 的初始化
名字解析與負(fù)載均衡都是持續(xù)動(dòng)態(tài)刷新的過(guò)程,那么整個(gè)流程是怎樣啟動(dòng)的?裁剪一下 DialContext 函數(shù):
// DialContext creates a client connection to the given target. By default, it's
// a non-blocking dial (the function won't wait for connections to be
// established, and connecting happens in the background). To make it a blocking
// dial, use WithBlock() dial option.
//
// In the non-blocking case, the ctx does not act against the connection. It
// only controls the setup steps.
//
// In the blocking case, ctx can be used to cancel or expire the pending
// connection. Once this function returns, the cancellation and expiration of
// ctx will be noop. Users should call ClientConn.Close to terminate all the
// pending operations after this function returns.
//
// The target name syntax is defined in
// <https://github.com/grpc/grpc/blob/master/doc/naming.md>.
// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {cc := &ClientConn{target: target,csMgr: &connectivityStateManager{},conns: make(map[*addrConn]struct{}),dopts: defaultDialOptions(),blockingpicker: newPickerWrapper(),czData: new(channelzData),firstResolveEvent: grpcsync.NewEvent(),}cc.retryThrottler.Store((*retryThrottler)(nil))cc.ctx, cc.cancel = context.WithCancel(context.Background())for _, opt := range opts {opt.apply(&cc.dopts)}// 好像是初始化什么鉤子chainUnaryClientInterceptors(cc)chainStreamClientInterceptors(cc)defer func() {if err != nil {cc.Close()}}()if channelz.IsOn() {// ... 初始化 channelz}if !cc.dopts.insecure {// ... tlz 相關(guān)參數(shù)檢查}if cc.dopts.defaultServiceConfigRawJSON != nil {// ... 解析參數(shù)指定的默認(rèn) ServiceConfig 的 JSON}cc.mkp = cc.dopts.copts.KeepaliveParamsif cc.dopts.copts.Dialer == nil {// ... 默認(rèn) Dialer 函數(shù)}if cc.dopts.copts.UserAgent != "" {cc.dopts.copts.UserAgent += " " + grpcUA} else {cc.dopts.copts.UserAgent = grpcUA}// 配置 Dial 的超時(shí)if cc.dopts.timeout > 0 {var cancel context.CancelFuncctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)defer cancel()}// 退出函數(shù)時(shí),如果 DialContext 的 ctx 如果中途撤銷(xiāo)或者超時(shí)了,則返回 ctx.Err()defer func() {select {case <-ctx.Done():conn, err = nil, ctx.Err()default:}}()// 從 scChan 中偵聽(tīng)接收 serviceConfig 信息scSet := falseif cc.dopts.scChan != nil {// Try to get an initial service config.select {case sc, ok := <-cc.dopts.scChan:if ok {cc.sc = &scscSet = true}default:}}// 默認(rèn)取指數(shù)退避if cc.dopts.bs == nil {cc.dopts.bs = backoff.DefaultExponential}// 根據(jù)名字的 Scheme 選擇 resolverBuilder// Determine the resolver to use.cc.parsedTarget = parseTarget(cc.target)grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)if resolverBuilder == nil {// .. 如果沒(méi)有找到則按默認(rèn)的 resolverBuilder}creds := cc.dopts.copts.TransportCredentials// .. 初始化 cc.authority// 阻塞等待 scChanif cc.dopts.scChan != nil && !scSet {// Blocking wait for the initial service config.select {case sc, ok := <-cc.dopts.scChan:if ok {cc.sc = &sc}case <-ctx.Done():return nil, ctx.Err()}}if cc.dopts.scChan != nil {go cc.scWatcher()}// 初始化 balancervar credsClone credentials.TransportCredentialsif creds := cc.dopts.copts.TransportCredentials; creds != nil {credsClone = creds.Clone()}cc.balancerBuildOpts = balancer.BuildOptions{DialCreds: credsClone,CredsBundle: cc.dopts.copts.CredsBundle,Dialer: cc.dopts.copts.Dialer,ChannelzParentID: cc.channelzID,Target: cc.parsedTarget,}// Build the resolver.rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)if err != nil {return nil, fmt.Errorf("failed to build resolver: %v", err)}cc.mu.Lock()cc.resolverWrapper = rWrappercc.mu.Unlock()// A blocking dial blocks until the clientConn is ready.if cc.dopts.block {for {s := cc.GetState()if s == connectivity.Ready {break} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {if err = cc.blockingpicker.connectionError(); err != nil {terr, ok := err.(interface {Temporary() bool})if ok && !terr.Temporary() {return nil, err}}}if !cc.WaitForStateChange(ctx, s) {// ctx got timeout or canceled.return nil, ctx.Err()}}}return cc, nil
}cc.dopts.scChan 這里有一些邏輯,再就是在 dopts.block 時(shí),有主動(dòng)等連接的邏輯。
順著 cc.dopts.scChan 找過(guò)去,發(fā)現(xiàn)參數(shù)定義的 dialoptions 里面有這一段:
// WithServiceConfig returns a DialOption which has a channel to read the
// service configuration.
//
// Deprecated: service config should be received through name resolver or via
// WithDefaultServiceConfig, as specified at
// <https://github.com/grpc/grpc/blob/master/doc/service_config.md>. Will be
// removed in a future 1.x release.
func WithServiceConfig(c <-chan ServiceConfig) DialOption {return newFuncDialOption(func(o *dialOptions) {o.scChan = c})
}說(shuō) scChan 這個(gè)字段要廢棄了,要么換 WithDefaultServiceConfig 傳一個(gè)默認(rèn)的 json,要么通過(guò) resolver 的 UpdateState 中 State 結(jié)構(gòu)體里的 ServiceConfig 字段去動(dòng)態(tài)拿。
ServiceConfig 比想象中更神通廣大一點(diǎn),ClientConn 中有個(gè) applyServiceConfigAndBalancer 方法,甚至?xí)鶕?jù)動(dòng)態(tài)下發(fā)的 ServiceConfig 來(lái)調(diào)用 switchBalancer 動(dòng)態(tài)切換 balancer 策略。
csMgr 與 WaitForStateChange
回去單獨(dú)看一下 cc.dopts.block 的邏輯:
// A blocking dial blocks until the clientConn is ready.if cc.dopts.block {for {s := cc.GetState()if s == connectivity.Ready {break} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {if err = cc.blockingpicker.connectionError(); err != nil {terr, ok := err.(interface {Temporary() bool})if ok && !terr.Temporary() {return nil, err}}}if !cc.WaitForStateChange(ctx, s) {// ctx got timeout or canceled.return nil, ctx.Err()}}}大約是一個(gè)死循環(huán)連接狀態(tài)直到 Ready 為止,ClientConn 的連接狀態(tài)來(lái)自 cc.csMgr 做管理,而 csMgr 中的連接狀態(tài)來(lái)自 balancer 對(duì) ClientConn 的 UpdateState 的回調(diào)。balancer 的連接狀態(tài)是對(duì)多個(gè)連接的連接狀態(tài)的匯聚,大約是只要有一個(gè)連接 Ready,便將 balancer 的連接狀態(tài)視為 Ready。之前看 balancer 做匯聚連接狀態(tài)還不大清楚這個(gè)的用處,現(xiàn)在看應(yīng)該主要是為 WaitForStateChange 這個(gè)方法服務(wù)的,而且這個(gè)方法是公共方法,是 ClientConn 的對(duì)外 API。
工程上如果開(kāi)啟 cc.dopts.block,似乎配合一個(gè) cc.dopts.timeout 比較好,這樣能超時(shí)退出。
csMgr 主要做的事情是輔助 ClientConn 實(shí)現(xiàn) connectivity.Reporter 接口,尤其是 WaitForStateChange 方法:
// connectivityStateManager keeps the connectivity.State of ClientConn.
// This struct will eventually be exported so the balancers can access it.
type connectivityStateManager struct {mu sync.Mutexstate connectivity.StatenotifyChan chan struct{}channelzID int64
}// ...// updateState updates the connectivity.State of ClientConn.
// If there's a change it notifies goroutines waiting on state change to
// happen.
func (csm *connectivityStateManager) updateState(state connectivity.State) {csm.mu.Lock()defer csm.mu.Unlock()if csm.state == connectivity.Shutdown {return}if csm.state == state {return}csm.state = stateif channelz.IsOn() {// ...}if csm.notifyChan != nil {// There are other goroutines waiting on this channel.close(csm.notifyChan)csm.notifyChan = nil}
}func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {csm.mu.Lock()defer csm.mu.Unlock()if csm.notifyChan == nil {csm.notifyChan = make(chan struct{})}return csm.notifyChan
}// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
// ctx expires. A true value is returned in former case and false in latter.
// This is an EXPERIMENTAL API.
func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {ch := cc.csMgr.getNotifyChan()if cc.csMgr.getState() != sourceState {return true}select {case <-ctx.Done():return falsecase <-ch:return true}
}notifyChan 這個(gè) channel 僅通過(guò) close 做廣播性的通知。每當(dāng) state 狀態(tài)變化會(huì)惰性產(chǎn)生新的 notifyChan,當(dāng)這個(gè) notifyChan 被關(guān)閉時(shí)就意味著狀態(tài)有變化了,起到一個(gè)類(lèi)似條件變量的作用。
blockingpicker
除了 balancerWrapper、resolverWrapper,ClientConn 中還有一個(gè) pickerWrapper 類(lèi)型的 blockingPicker 字段,本體也是同樣主要是并發(fā)同步為主。
// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
// actions and unblock when there's a picker update.
type pickerWrapper struct {mu sync.Mutexdone boolblockingCh chan struct{}picker balancer.V2Picker// The latest connection error. TODO: remove when V1 picker is deprecated;// balancer should be responsible for providing the error.*connErr
}type connErr struct {mu sync.Mutexerr error
}大約是初始化時(shí)生成一個(gè) blockingCh,隨后每當(dāng) updatePickerV2 改動(dòng) picker 時(shí),則關(guān)閉舊 blockingCh 同時(shí)生成一個(gè)新的 blockingCh。
pickerWrapper 對(duì)外的主要功能入口是 pick 方法,先看它的注釋:
// pick returns the transport that will be used for the RPC.
// It may block in the following cases:
// - there's no picker
// - the current picker returns ErrNoSubConnAvailable
// - the current picker returns other errors and failfast is false.
// - the subConn returned by the current picker is not READY
// When one of these situations happens, pick blocks until the picker gets updated.
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, func(balancer.DoneInfo), error) {// ...這些阻塞唯有 balancer 生成新的 picker 對(duì)象交給 ClientConn 才能解除。實(shí)現(xiàn)風(fēng)格上,與 WaitForStateChange 類(lèi)似,每當(dāng)狀態(tài)變化時(shí)關(guān)閉舊 chan、生成新 chan,上鎖確保狀態(tài)變化與更替 chan 兩步操作的原子性,對(duì)方阻塞等待 chan 的關(guān)閉。
picker.Pick() 方法本身是線(xiàn)程安全的,不是很清楚每個(gè) SubConn 能否被多個(gè) goroutine 使用,后面再確認(rèn)一下這點(diǎn)。
先看到這里,下面是 addrConn,也就是 SubConn 的實(shí)現(xiàn)。
總結(jié)
以上是生活随笔為你收集整理的grpc 传递上下文_grpc 源码笔记 02:ClientConn的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: python提高照片分辨率怎么调_实拍1
- 下一篇: php读取西门子plc_基于Socket