GRPC golang版源码分析之客户端(一)
Table of Contents
- 1. 前言
- 2. 源碼目錄瀏覽
- 3. 客戶端
- 4. 相關鏈接
1?前言
grpc是一個通用的rpc框架,用google實現,當然也有go語言的版本。在工作中主要用到這個庫,所以看看源碼加強自己對框架的了解。目前來說主要分析的都以go版本為主(并沒有看其他語言版本).由于個人水平有限,代碼中的有些思想也是個人揣測,難免有些錯誤,如果發現錯誤,還望幫忙指出。
2?源碼目錄瀏覽
grpc使用protobuf(google的序列化框架)作為通信協議,底層上使用http2作為其傳輸協議,grpc源碼中自己實現了http2的服務端跟客戶端,而并沒有用net/http包。http2有很多特性能夠高效的傳輸數據,具體特點可以看相關鏈接詳細了解。 grpc目錄如下:看名字大概能看出這些目錄中代碼是哪些關系,documentation目錄是存放一些文檔,benchmark是壓測,credentials是驗證,examples是例子,grpclb是負載均衡,grpclog是日志,health是服務健康檢查,metadata是元數據(用戶客戶端給服務端傳送一些特殊數據,具體可以看相關鏈接),naming目錄是提供名字服務需要實現的接口(相當于一個dns),stats是統計信息,transport 傳輸層實現(主要是http2的客戶端與服務端時實現, 不會詳細說這個目錄),還有其他一些比較無關緊要的目錄就不一一介紹了。
3?客戶端
在example目錄中有兩個比較簡單的例子,就先從這里入手吧,
func main() {// Set up a connection to the server.//建立一個鏈接conn, err := grpc.Dial(address, grpc.WithInsecure())if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()c := pb.NewGreeterClient(conn)// Contact the server and print out its response.name := defaultNameif len(os.Args) > 1 {name = os.Args[1]}//調用函數r, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: name})if err != nil {log.Fatalf("could not greet: %v", err)}log.Printf("Greeting: %s", r.Message) }grcp.WithInsecure參數是在鏈接https服務端時不用檢查服務端的證書(要是你相信服務端就不用檢查).Dial函數對服務端建立一個連接, grpc.Dial函數:
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {cc := &ClientConn{target: target,conns: make(map[Address]*addrConn),}cc.ctx, cc.cancel = context.WithCancel(context.Background())defer func() {select {case <-ctx.Done():conn, err = nil, ctx.Err()default:}if err != nil {cc.Close()}}()//設置grpc的各種選項for _, opt := range opts {opt(&cc.dopts)}// Set defaults.if cc.dopts.codec == nil {//默認用protobuf編解碼cc.dopts.codec = protoCodec{}}if cc.dopts.bs == nil {cc.dopts.bs = DefaultBackoffConfig}creds := cc.dopts.copts.TransportCredentials//驗證信息if creds != nil && creds.Info().ServerName != "" {cc.authority = creds.Info().ServerName} else {colonPos := strings.LastIndex(target, ":")if colonPos == -1 {colonPos = len(target)}cc.authority = target[:colonPos]}var ok boolwaitC := make(chan error, 1)//啟動一個goroutine啟動名字服務器(類似dns)go func() {var addrs []Addressif cc.dopts.balancer == nil {// Connect to target directly if balancer is nil.// 如果沒設置負載均衡器,則直接連接addrs = append(addrs, Address{Addr: target})} else {var credsClone credentials.TransportCredentialsif creds != nil {credsClone = creds.Clone()}config := BalancerConfig{DialCreds: credsClone,}//啟動負載均衡服務if err := cc.dopts.balancer.Start(target, config); err != nil {waitC <- errreturn}ch := cc.dopts.balancer.Notify()if ch == nil {// There is no name resolver installed.addrs = append(addrs, Address{Addr: target})} else {addrs, ok = <-chif !ok || len(addrs) == 0 {waitC <- errNoAddrreturn}}}for _, a := range addrs {//給每個地址一個conn,連接池if err := cc.resetAddrConn(a, false, nil); err != nil {waitC <- errreturn}}close(waitC)}()var timeoutCh <-chan time.Timeif cc.dopts.timeout > 0 {timeoutCh = time.After(cc.dopts.timeout)}select {case <-ctx.Done():return nil, ctx.Err()case err := <-waitC:if err != nil {return nil, err}case <-timeoutCh:return nil, ErrClientConnTimeout}// If balancer is nil or balancer.Notify() is nil, ok will be false here.// The lbWatcher goroutine will not be created.if ok {go cc.lbWatcher()}return cc, nil }通過dial這個函數,grpc已經建立了到服務端的連接,啟動了自定義負載平衡(如果有的話). pb.NewGreeterClient這行代碼是通過protoc工具自動生成的,它包一個grpc連接包裹在一個struct內方便調用生成的客戶端grpc調用代碼。接下來grpc客戶端調用SayHello向服務器發送rpc請求。
func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {out := new(HelloReply)//調用實際的發送請求函數err := grpc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, c.cc, opts...)if err != nil {return nil, err}return out, nil }//最后主要是invoke函數 func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {c := defaultCallInfofor _, o := range opts {//調用之前的hookif err := o.before(&c); err != nil {return toRPCErr(err)}}defer func() {for _, o := range opts {//執行完后的hooko.after(&c)}}()//trace相關代碼if EnableTracing {c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)defer c.traceInfo.tr.Finish()c.traceInfo.firstLine.client = trueif deadline, ok := ctx.Deadline(); ok {c.traceInfo.firstLine.deadline = deadline.Sub(time.Now())}c.traceInfo.tr.LazyLog(&c.traceInfo.firstLine, false)// TODO(dsymonds): Arrange for c.traceInfo.firstLine.remoteAddr to be set.defer func() {if e != nil {c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{e}}, true)c.traceInfo.tr.SetError()}}()}//統計相關代碼if stats.On() {ctx = stats.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})begin := &stats.Begin{Client: true,BeginTime: time.Now(),FailFast: c.failFast,}stats.HandleRPC(ctx, begin)}defer func() {//結束后的統計相關代碼if stats.On() {end := &stats.End{Client: true,EndTime: time.Now(),Error: e,}stats.HandleRPC(ctx, end)}}()topts := &transport.Options{Last: true,Delay: false,}for {var (err errort transport.ClientTransportstream *transport.Stream// Record the put handler from Balancer.Get(...). It is called once the// RPC has completed or failed.put func())// TODO(zhaoq): Need a formal spec of fail-fast.//傳輸層的配置callHdr := &transport.CallHdr{Host: cc.authority,Method: method,}if cc.dopts.cp != nil {callHdr.SendCompress = cc.dopts.cp.Type()}gopts := BalancerGetOptions{BlockingWait: !c.failFast,}//得到傳輸成連接,在http2中一個傳輸單位是一個流。t, put, err = cc.getTransport(ctx, gopts)if err != nil {// TODO(zhaoq): Probably revisit the error handling.if _, ok := err.(*rpcError); ok {return err}if err == errConnClosing || err == errConnUnavailable {if c.failFast {return Errorf(codes.Unavailable, "%v", err)}continue}// All the other errors are treated as Internal errors.return Errorf(codes.Internal, "%v", err)}if c.traceInfo.tr != nil {c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)}// 發送請求stream, err = sendRequest(ctx, cc.dopts.codec, cc.dopts.cp, callHdr, t, args, topts)if err != nil {if put != nil {put()put = nil}// Retry a non-failfast RPC when// i) there is a connection error; or// ii) the server started to drain before this RPC was initiated.// 在這兩種情況下重試,1 鏈接錯誤 2 在rpc初始化之前服務端已經開始服務if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {if c.failFast {return toRPCErr(err)}continue}return toRPCErr(err)}//收消息err = recvResponse(ctx, cc.dopts, t, &c, stream, reply)if err != nil {if put != nil {put()put = nil}if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {if c.failFast {return toRPCErr(err)}continue}return toRPCErr(err)}if c.traceInfo.tr != nil {c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true)}//關閉一個http2流t.CloseStream(stream, nil)if put != nil {put()put = nil}//Errorf會判斷返回十分okreturn Errorf(stream.StatusCode(), "%s", stream.StatusDesc())} }在這個函數最主要是兩個函數,一個是sendRequest,一個是recvResponse,首先看看sendRequest函數:
func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHdr *transport.CallHdr, t transport.ClientTransport, args interface{}, opts *transport.Options) (_ *transport.Stream, err error) {// 創建一個http2流stream, err := t.NewStream(ctx, callHdr)if err != nil {return nil, err}defer func() {if err != nil {// If err is connection error, t will be closed, no need to close stream here.if _, ok := err.(transport.ConnectionError); !ok {t.CloseStream(stream, err)}}}()var (cbuf *bytes.BufferoutPayload *stats.OutPayload)//壓縮不為空if compressor != nil {cbuf = new(bytes.Buffer)}//統計if stats.On() {outPayload = &stats.OutPayload{Client: true,}}//編碼并壓縮數據outBuf, err := encode(codec, args, compressor, cbuf, outPayload)if err != nil {return nil, Errorf(codes.Internal, "grpc: %v", err)}//寫入流err = t.Write(stream, outBuf, opts)if err == nil && outPayload != nil {outPayload.SentTime = time.Now()stats.HandleRPC(ctx, outPayload)}// t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method// does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following// recvResponse to get the final status.if err != nil && err != io.EOF {return nil, err}// Sent successfully.return stream, nil }可以看到這個函數相當簡單,做了兩件事情,編碼壓縮數據并發送.再來看看recvResponse函數:
func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) {// Try to acquire header metadata from the server if there is any.defer func() {if err != nil {if _, ok := err.(transport.ConnectionError); !ok {t.CloseStream(stream, err)}}}()c.headerMD, err = stream.Header()if err != nil {return}p := &parser{r: stream}var inPayload *stats.InPayloadif stats.On() {inPayload = &stats.InPayload{Client: true,}}for {//一直讀到流關閉if err = recv(p, dopts.codec, stream, dopts.dc, reply, math.MaxInt32, inPayload); err != nil {if err == io.EOF {break}return}}if inPayload != nil && err == io.EOF && stream.StatusCode() == codes.OK {// TODO in the current implementation, inTrailer may be handled before inPayload in some cases.// Fix the order if necessary.stats.HandleRPC(ctx, inPayload)}c.trailerMD = stream.Trailer()return nil }func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, inPayload *stats.InPayload) error {//接受數據pf, d, err := p.recvMsg(maxMsgSize)if err != nil {return err}if inPayload != nil {inPayload.WireLength = len(d)}if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil {return err}if pf == compressionMade {//解壓d, err = dc.Do(bytes.NewReader(d))if err != nil {return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)}}if len(d) > maxMsgSize {// TODO: Revisit the error code. Currently keep it consistent with java// implementation.return Errorf(codes.Internal, "grpc: received a message of %d bytes exceeding %d limit", len(d), maxMsgSize)}//數據解碼if err := c.Unmarshal(d, m); err != nil {return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)}if inPayload != nil {inPayload.RecvTime = time.Now()inPayload.Payload = m// TODO truncate large payload.inPayload.Data = dinPayload.Length = len(d)}return nil }這里可以看到一個recvRespon可能會處理多個返回,但是確實在同一個for循環中處理的,有點奇怪。客戶端代碼大概就是這個流程。代碼來說不算太復雜。(主要不鉆進http2的實現,剛開始我就去看http2,一頭霧水) 其中還有重要的地方就是負載均衡,通過它我們可以根據算法自動選擇要連接的ip跟地址,還有驗證的使用,放到下一篇吧
4?相關鏈接
總結
以上是生活随笔為你收集整理的GRPC golang版源码分析之客户端(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何学习区块链技术?
- 下一篇: GRPC golang版源码分析之客户端