日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

从源码透析gRPC调用原理

發(fā)布時(shí)間:2025/3/15 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 从源码透析gRPC调用原理 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

導(dǎo)語(yǔ)

gRPC是什么,不用多說了。

gRPC如何用,也不用多說了 。

但是,gRPC是如何work的,清楚的理解其調(diào)用邏輯,對(duì)于我們更好、更深入的使用gRPC很有必要。因此我們必須深度解析下gRPC的實(shí)現(xiàn)邏輯,在本文中,將分別從客戶端和服務(wù)端來說明gRPC的實(shí)現(xiàn)原理。

準(zhǔn)備條件

本文將以gRPC Github上helloword代碼作為一個(gè)完整的項(xiàng)目示例作為介紹的基礎(chǔ),在展開分析之前,簡(jiǎn)單介紹下作為gRPC的文件結(jié)構(gòu):

greeter_client greeter_server helloworld mock_helloworld

在這里,我們只需要關(guān)注前三個(gè)文件夾的內(nèi)容。

其中,greet_client和greet_server文件中分別是grpc客戶端和服務(wù)端的業(yè)務(wù)調(diào)用代碼,包含了一個(gè)標(biāo)準(zhǔn)的gRPC調(diào)用過程。helloworld中包含了是protobuf的協(xié)議文件和生成的helloworld.pb.go文件(至于pb的協(xié)議和*.pb.go文件的生成等內(nèi)容,不作為本文的介紹范圍,不再贅述)。

客戶端

首先,我們以Github官網(wǎng)上的example為示例來一覽gRPC client端的使用,從而跟蹤其調(diào)用的邏輯個(gè)原理。總的來看,調(diào)用的過程基本就是分為三步:

  • 創(chuàng)建connection
  • 創(chuàng)建業(yè)務(wù)客戶端實(shí)例
  • 調(diào)用RPC接口
{...// 創(chuàng)建connectionconn, err := grpc.Dial(address, grpc.WithInsecure())if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()// 創(chuàng)建clientc := pb.NewGreeterClient(conn)// 調(diào)用RPC接口name := defaultNamer, err := c.SayHello(context.TODO(), &pb.HelloRequest{Name: name})if err != nil {log.Fatalf("could not greet: %v", err)}... }

創(chuàng)建connection

通過grpc.Dial()接口創(chuàng)建了一個(gè)ClientConn類型實(shí)例。

Dial()函數(shù)的第一個(gè)參數(shù)作為endpoint我們就不多說了,同時(shí)Dial()還接受變長(zhǎng)參數(shù)DialOption。DialOption是一個(gè)接口類型,在grpc中存在著多種返回了DialOption類型的函數(shù),這些返回了DialOption類型的函數(shù),例如編解碼、負(fù)載均衡策略等,一些函數(shù)聲明示例如下:

func WithBalancer() DialOption func WithInsecure() DialOption func WithCodec() DialOption

根據(jù)client的需求,調(diào)用方在調(diào)用Dial()的時(shí)候可以將這些函數(shù)作為參數(shù)傳入Dial()中。

在Dial()中,首先是會(huì)根據(jù)參數(shù)進(jìn)行一系列的初始化和賦值操作,就不在這里列出說明,而對(duì)于這些DailOption參數(shù),在Dial()中最終實(shí)現(xiàn)對(duì)grpc.ClientConn的成員變量dopts中的CallOption進(jìn)行了賦值。

通過Dial()的調(diào)用,grpc已經(jīng)建立了到服務(wù)端的鏈接,同時(shí)也會(huì)附帶一些諸如負(fù)載均衡、證書檢查、Backoff等策略的執(zhí)行(如果有進(jìn)行配置的話)。

創(chuàng)建客戶端實(shí)例

創(chuàng)建業(yè)務(wù)client實(shí)例,在使用gRPC的時(shí)候,我們都知道其傳遞協(xié)議是protobuf。

而NewGreeterClient()則是通過對(duì)pb協(xié)議生成的代碼接口,存在于helloworld.pb.go中,該函數(shù)主要是返回了一個(gè)greeterClient類型的實(shí)例。

調(diào)用RPC請(qǐng)求

SayHello()中的RPC接口也是存在于根據(jù)pb協(xié)議生成的helloworld.pb.go文件中。

SayHello()除了接受一個(gè)context存儲(chǔ)上下文信息和一個(gè)request類型參數(shù),同時(shí)也支持一個(gè)CallOption類型的變量。關(guān)于CallOption在上文中有提到,其本身也是一個(gè)接口,其中before()用于在請(qǐng)求發(fā)送之前設(shè)置參數(shù),而after()則是在請(qǐng)求調(diào)用完畢之后提取信息。通過對(duì)這兩個(gè)函數(shù)的調(diào)用,方便的實(shí)現(xiàn)了在請(qǐng)求前后的一些參數(shù)設(shè)置的功能:

type CallOption interface {before(*callInfo) errorafter(*callInfo) }

任何一個(gè)我們我們上文說到了返回值為DialOption的函數(shù),大部分都有一個(gè)對(duì)應(yīng)的結(jié)構(gòu)實(shí)現(xiàn)了CallOption,諸如上面的WithCodec(),其對(duì)應(yīng)的結(jié)構(gòu)為:

type CustomCodecCallOption struct {Codec Codec }func (o CustomCodecCallOption) before(c *callInfo) error {c.codec = o.Codecreturn nil } func (o CustomCodecCallOption) after(c *callInfo) {}

回到SayHello()函數(shù)的邏輯中來,該函數(shù)最終會(huì)調(diào)用grpc中的call.go中的invoke函數(shù)來執(zhí)行具體的操作。

在invoke()函數(shù)中,newClientStream()會(huì)首先獲取傳輸層Trasport結(jié)構(gòu)的實(shí)例并包裝到一個(gè)ClientStream實(shí)例中返回,隨后將RPC請(qǐng)求通過SendMsg()接口發(fā)送出去,注意,由于SendMsg()并不會(huì)等待服務(wù)端收到數(shù)據(jù),因此還需要通過RecvMsg()同步接收收到的回復(fù)消息(關(guān)于SendMsg()和RecvMsg()中的具體發(fā)送和接收數(shù)據(jù)邏輯,不在贅述,可以去源碼再詳細(xì)了解)。

// pb.go文件 func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {out := new(HelloReply)err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)if err != nil {return nil, err}return out, nil }...// grpc/grpc.go/call.go文件 func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)if err != nil {return err}if err := cs.SendMsg(req); err != nil {return err}return cs.RecvMsg(reply) }

服務(wù)端

對(duì)于Server端,我們同樣地根據(jù)Github上的官網(wǎng)示例來展開說明。總的來看,grpc在server端的調(diào)用邏輯如下,基本就是分為四步:

  • 創(chuàng)建端口監(jiān)聽listener
  • 創(chuàng)建server實(shí)例
  • 注冊(cè)服務(wù)(并未真正開始服務(wù))
  • 啟動(dòng)服務(wù)端
{ ...// 創(chuàng)建listenerlis, err := net.Listen("tcp", port)if err != nil {log.Fatalf("failed to listen: %v", err)}// 創(chuàng)建server示例s := grpc.NewServer()// 注冊(cè)服務(wù)pb.RegisterGreeterServer(s, &server{})reflection.Register(s)// 啟動(dòng)服務(wù)端監(jiān)聽if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}... }

創(chuàng)建監(jiān)聽端口

創(chuàng)建listener,不用多介紹了,就是創(chuàng)建了一個(gè)監(jiān)聽tcp端口的Listener實(shí)例。

創(chuàng)建服務(wù)端實(shí)例

NewServer()方法創(chuàng)建了一個(gè)grpc.Server實(shí)例,其函數(shù)內(nèi)部會(huì)對(duì)該實(shí)例進(jìn)行一系列初始化賦值操作。該接口與客戶端中的Dial()接口類似,可以接受多個(gè)ServerOption入?yún)?#xff0c;在helloworld的示例中并未傳入任務(wù)參數(shù),一個(gè)簡(jiǎn)單那的示例如下:

svr := grpc.NewServer(grpc.CustomCodec(proxy.Codec()))

在grpc中,也存在了多種類似于CustomCodec()這樣返回值類型為ServerOption的函數(shù),從而滿足調(diào)用方在需要求進(jìn)行傳參賦值:

func CustomCodec() ServerOption func MaxConcurrentStreams() ServerOption func UnknownServiceHandler() ServerOption

服務(wù)注冊(cè)

RegisterGreeterServer()是由helloworld.pb.go生成的接口,其主要調(diào)用了grpc的RegisterService() 來注冊(cè)當(dāng)前service及其實(shí)現(xiàn)。

grpc.RegisterService()接收一個(gè)參數(shù)類型為ServiceDesc的實(shí)例_Greeter_serviceDesc用以歲service的描述說明,同時(shí)接收一個(gè)service實(shí)例作注冊(cè)進(jìn)來。其中_Greeter_serviceDesc是由pb生成的對(duì)業(yè)務(wù)RPC接口的描述,如下所示:

// helloworld.pb.go func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {s.RegisterService(&_Greeter_serviceDesc, srv) }var _Greeter_serviceDesc = grpc.ServiceDesc{ServiceName: "helloworld.Greeter",HandlerType: (*GreeterServer)(nil),Methods: []grpc.MethodDesc{{MethodName: "SayHello",Handler: _Greeter_SayHello_Handler,},},Streams: []grpc.StreamDesc{},Metadata: "helloworld.proto", }

我們可以看到,在grpc.ServiceDesc中對(duì)Methods變量進(jìn)行了賦值。其中Methods包含了一個(gè)RPC接口名到handler的映射數(shù)組,描述了當(dāng)前service支持的所有的方法,MethodName即為調(diào)用的RPC接口名,而handler的值_Greeter_SayHello_Handler()也是由pb生成的方法,在其內(nèi)部通過注冊(cè)進(jìn)來的service實(shí)例,實(shí)現(xiàn)了對(duì)我們的業(yè)務(wù)函數(shù)SayHello()進(jìn)行了調(diào)用:

func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {in := new(HelloRequest)if err := dec(in); err != nil {return nil, err}if interceptor == nil {return srv.(GreeterServer).SayHello(ctx, in)}info := &grpc.UnaryServerInfo{Server: srv,FullMethod: "/helloworld.Greeter/SayHello",}handler := func(ctx context.Context, req interface{}) (interface{}, error) {return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest))}return interceptor(ctx, in, info, handler) }

啟動(dòng)服務(wù)

Serve函數(shù)中開始接收來到listener的請(qǐng)求(實(shí)際上也就是對(duì)listener進(jìn)行了Accept()),并為每一個(gè)請(qǐng)求創(chuàng)建一個(gè)go程來服務(wù)。

Serve函數(shù)的邏輯判斷比較復(fù)雜,但其實(shí)真正的調(diào)用邏輯過程十分簡(jiǎn)單,在下面列出,從而有助于我們的理解。

func (s *Server) Serve(lis net.Listener) error {...for {// 開始接受服務(wù)rawConn, err := lis.Accept()...// 為每一個(gè)請(qǐng)求啟動(dòng)一個(gè)go程來處理鏈接s.serveWG.Add(1)go func() {s.handleRawConn(rawConn)s.serveWG.Done()}()} }func (s *Server) handleRawConn(rawConn net.Conn) {// 鑒權(quán)操作conn, authInfo, err := s.useTransportAuthenticator(rawConn)...// 基于HTTP2,創(chuàng)建一個(gè)ServerTransportst := s.newHTTP2Transport(conn, authInfo)...go func() {s.serveStreams(st)s.removeConn(st)}() }

其中,newHTTP2Transport()的代碼主要部分有一些關(guān)于HTTP2的賦值和初始化操作,存在于internal/transport/http2_server.go中,這兒就不再進(jìn)入介紹http2的具體實(shí)現(xiàn)方式了。而serveStreams()中則主要是調(diào)用了HandleStreams()接口去真正的接受請(qǐng)求流。

func (s *Server) serveStreams(st transport.ServerTransport) {defer st.Close()var wg sync.WaitGroupst.HandleStreams(func(stream *transport.Stream) {wg.Add(1)go func() {defer wg.Done()s.handleStream(st, stream, s.traceInfo(st, stream))}()}, func(ctx context.Context, method string) context.Context {if !EnableTracing {return ctx}tr := trace.New("grpc.Recv."+methodFamily(method), method)return trace.NewContext(ctx, tr)})wg.Wait() }

HandleStreams()中的實(shí)現(xiàn)在grpc-go/internal/transport/handler_server.go文件中。

在HandleStreams()實(shí)現(xiàn)中前面一大部分是對(duì)數(shù)據(jù)流Stream的初始化,數(shù)據(jù)接收以及賦值,詳細(xì)的處理過程大家可以去文件中詳細(xì)的看代碼,這里我們只做邏輯流程的分析。在數(shù)據(jù)流stream接收完畢后,通過注冊(cè)進(jìn)來的server的startStream()來處理數(shù)據(jù)流。

注冊(cè)進(jìn)來的startStream()最終調(diào)用了Server中的startStream()函數(shù),區(qū)分出是unary請(qǐng)求還是stream請(qǐng)求,并分別通過processUnaryRPC()和processStreamingRPC()進(jìn)行區(qū)分處理。對(duì)于兩個(gè)主要的處理函數(shù)processUnaryRPC()和processStreamingRPC(),基本上是一些具體的數(shù)據(jù)接收、編解碼等操作,就不在浪費(fèi)篇幅貼出代碼了。

func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {...// 數(shù)據(jù)流Stream的接受和賦值startStream(s)ht.runStream()close(requestOver)// 等待數(shù)據(jù)讀取完畢req.Body.Close()<-readerDone }func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {...// 判斷Unary RPC還是Streaming RPCif md, ok := srv.md[method]; ok {s.processUnaryRPC(t, stream, srv, md, trInfo)return}if sd, ok := srv.sd[method]; ok {s.processStreamingRPC(t, stream, srv, sd, trInfo)return}...if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)return}... }

最后,簡(jiǎn)單以一個(gè)圖示來展示grpc服務(wù)端的調(diào)用流程:

gRPC Server簡(jiǎn)化調(diào)用流程

總結(jié)

上面的就是關(guān)于gRPC調(diào)用邏輯的分析,gRPC中的代碼十分復(fù)雜,本文只涉及了其調(diào)用邏輯的分析,在分析展示源碼時(shí),省略的一些錯(cuò)誤處理或者數(shù)據(jù)處理的代碼,而側(cè)重于邏輯調(diào)用的過程,從而在使用gRPC的時(shí)候可以更好的理解其原理。

https://cloud.tencent.com/developer/article/1189548

總結(jié)

以上是生活随笔為你收集整理的从源码透析gRPC调用原理的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。