gRPC的那些事 - streaming
gRPC是一個(gè)高性能、通用的開(kāi)源RPC框架,其由Google主要面向移動(dòng)應(yīng)用開(kāi)發(fā)并基于HTTP/2協(xié)議標(biāo)準(zhǔn)而設(shè)計(jì),基于ProtoBuf(Protocol Buffers)序列化協(xié)議開(kāi)發(fā),且支持眾多開(kāi)發(fā)語(yǔ)言。 gRPC提供了一種簡(jiǎn)單的方法來(lái)精確地定義服務(wù)和為iOS、Android和后臺(tái)支持服務(wù)自動(dòng)生成可靠性很強(qiáng)的客戶(hù)端功能庫(kù)。 客戶(hù)端充分利用高級(jí)流和鏈接功能,從而有助于節(jié)省帶寬、降低的TCP鏈接次數(shù)、節(jié)省CPU使用、和電池壽命。
gRPC具有以下重要特征:
強(qiáng)大的IDL特性 RPC使用ProtoBuf來(lái)定義服務(wù),ProtoBuf是由Google開(kāi)發(fā)的一種數(shù)據(jù)序列化協(xié)議,性能出眾,得到了廣泛的應(yīng)用。
支持多種語(yǔ)言 支持C++、Java、Go、Python、Ruby、C#、Node.js、Android Java、Objective-C、PHP等編程語(yǔ)言。 3.基于HTTP/2標(biāo)準(zhǔn)設(shè)計(jì)
gRPC已經(jīng)應(yīng)用在Google的云服務(wù)和對(duì)外提供的API中。
gRPC開(kāi)發(fā)起來(lái)非常的簡(jiǎn)單,你可以閱讀 一個(gè)?helloworld 的例子來(lái)了解它的基本開(kāi)發(fā)流程 (本系列文章以Go語(yǔ)言的開(kāi)發(fā)為例)。
最基本的開(kāi)發(fā)步驟是定義?proto?文件, 定義請(qǐng)求 Request 和 響應(yīng) Response 的格式,然后定義一個(gè)服務(wù) Service, Service可以包含多個(gè)方法。
基本的gRPC開(kāi)發(fā)很多文章都介紹過(guò)了,官方也有相關(guān)的文檔,這個(gè)系列的文章也就不介紹這些基礎(chǔ)的開(kāi)發(fā),而是想通過(guò)代碼演示gRPC更深入的開(kāi)發(fā)。 作為這個(gè)系列的第一篇文章,想和大家分享一下gRPC流式開(kāi)發(fā)的知識(shí)。
gRPC的流可以分為三類(lèi), 客戶(hù)端流式發(fā)送、服務(wù)器流式返回以及客戶(hù)端/服務(wù)器同時(shí)流式處理, 也就是單向流和雙向流。 下面針對(duì)這三種情況分別通過(guò)例子介紹。
服務(wù)器流式響應(yīng)
通過(guò)使用流(streaming),你可以向服務(wù)器或者客戶(hù)端發(fā)送批量的數(shù)據(jù), 服務(wù)器和客戶(hù)端在接收這些數(shù)據(jù)的時(shí)候,可以不必等所有的消息全收到后才開(kāi)始響應(yīng),而是接收到第一條消息的時(shí)候就可以及時(shí)的響應(yīng), 這顯然比以前的類(lèi)HTTP 1.1的方式更快的提供響應(yīng),從而提高性能。
比如有一批記錄個(gè)人收入數(shù)據(jù),客戶(hù)端流式發(fā)送給服務(wù)器,服務(wù)器計(jì)算出每個(gè)人的個(gè)人所得稅,將結(jié)果流式發(fā)給客戶(hù)端。這樣客戶(hù)端的發(fā)送可以和服務(wù)器端的計(jì)算并行之行,從而減少服務(wù)的延遲。這只是一個(gè)簡(jiǎn)單的例子,你可以利用流來(lái)實(shí)現(xiàn)RPC調(diào)用的異步執(zhí)行,將客戶(hù)端的調(diào)用和服務(wù)器端的執(zhí)行并行的處理,
當(dāng)前gRPC通過(guò) HTTP2 協(xié)議傳輸,可以方便的實(shí)現(xiàn) streaming 功能。 如果你對(duì)gRPC如何通過(guò) HTTP2 傳輸?shù)母信d趣, 你可以閱讀這篇文章?gRPC over HTTP2, 它描述了 gRPC 通過(guò) HTTP2 傳輸?shù)牡蛯痈袷健equest 和 Response 的格式如下:
- Request → Request-Headers *Length-Prefixed-Message EOS
- Response → (Response-Headers *Length-Prefixed-Message Trailers) / Trailers-Only
要實(shí)現(xiàn)服務(wù)器的流式響應(yīng),只需在proto中的方法定義中將響應(yīng)前面加上stream標(biāo)記, 如下圖中SayHello1方法,HelloReply前面加上stream標(biāo)識(shí)。
| 123456789101112131415161718192021 | syntax = "proto3";package pb;import "github.com/gogo/protobuf/gogoproto/gogo.proto";// The greeting service definition.service Greeter {// Sends a greetingrpc SayHello1 (HelloRequest) returns (stream HelloReply) {}}// The request message containing the user's name.message HelloRequest {string name = 1;}// The response message containing the greetingsmessage HelloReply {string message = 1;} |
這個(gè)例子中我使用gogo來(lái)生成更有效的protobuf代碼,當(dāng)然你也可以使用原生的工具生成。
| 12 | GOGO_ROOT=${GOPATH}/src/github.com/gogo/protobufprotoc -I.:${GOPATH}/src --gogofaster_out=plugins=grpc:. helloworld.proto |
生成的代碼就已經(jīng)包含了流的處理,所以和普通的gRPC代碼差別不是很大, 只需要注意的服務(wù)器端代碼的實(shí)現(xiàn)要通過(guò)流的方式發(fā)送響應(yīng)。
| 12345678 | func (s *server) SayHello1(in *pb.HelloRequest, gs pb.Greeter_SayHello1Server) error {name := in.Name for i := 0; i < 100; i++ {gs.Send(&pb.HelloReply{Message: "Hello " + name + strconv.Itoa(i)})} return nil} |
和普通的gRPC有什么區(qū)別?
普通的gRPC是直接返回一個(gè)HelloReply對(duì)象,而流式響應(yīng)你可以通過(guò)Send方法返回多個(gè)HelloReply對(duì)象,對(duì)象流序列化后流式返回。
查看它低層的實(shí)現(xiàn)其實(shí)是使用ServerStream.SendMsg實(shí)現(xiàn)的。
| 123456789 | type Greeter_SayHello1Server interface {Send(*HelloReply) errorgrpc.ServerStream}func (x *greeterSayHello1Server) Send(m *HelloReply) error { return x.ServerStream.SendMsg(m)} |
對(duì)于客戶(hù)端,我們需要關(guān)注兩個(gè)方面有沒(méi)有變化, 一是發(fā)送請(qǐng)求,一是讀取響應(yīng)。下面是客戶(hù)端的代碼:
| 123456789101112131415161718192021222324 | conn, err := grpc.Dial(*address, grpc.WithInsecure()) if err != nil {log.Fatalf("faild to connect: %v", err)} defer conn.Close()c := pb.NewGreeterClient(conn)stream, err := c.SayHello1(context.Background(), &pb.HelloRequest{Name: *name})if err != nil {log.Fatalf("could not greet: %v", err)}for {reply, err := stream.Recv() if err == io.EOF { break} if err != nil {log.Printf("failed to recv: %v", err)}log.Printf("Greeting: %s", reply.Message)} |
發(fā)送請(qǐng)求看起來(lái)沒(méi)有太大的區(qū)別,只是返回結(jié)果不再是一個(gè)單一的HelloReply對(duì)象,而是一個(gè)Stream。這和服務(wù)器端代碼正好對(duì)應(yīng),通過(guò)調(diào)用stream.Recv()返回每一個(gè)HelloReply對(duì)象, 直到出錯(cuò)或者流結(jié)束(io.EOF)。
可以看出,生成的代碼提供了往/從流中方便的發(fā)送/讀取對(duì)象的能力,而這一切, gRPC都幫你生成好了。
客戶(hù)端流式發(fā)送
客戶(hù)端也可以流式的發(fā)送對(duì)象,當(dāng)然這些對(duì)象也和上面的一樣,都是同一類(lèi)型的對(duì)象。
首先還是要在proto文件中定義,與上面的定義類(lèi)似,在請(qǐng)求的前面加上stream標(biāo)識(shí)。
| 12345678910111213141516171819202122 | syntax = "proto3";package pb;import "github.com/gogo/protobuf/gogoproto/gogo.proto";option (gogoproto.unmarshaler_all) = true;// The greeting service definition.service Greeter {rpc SayHello2 (stream HelloRequest) returns (HelloReply) {}}// The request message containing the user's name.message HelloRequest {string name = 1;}// The response message containing the greetingsmessage HelloReply {string message = 1;} |
注意這里我們只標(biāo)記了請(qǐng)求是流式的, 響應(yīng)還是以前的樣子。
生成相關(guān)的代碼后, 客戶(hù)端的代碼為:
| 12345678910111213141516171819 | func sayHello2(c pb.GreeterClient) { var err errorstream, err := c.SayHello2(context.Background()) for i := 0; i < 100; i++ { if err != nil {log.Printf("failed to call: %v", err) break}stream.Send(&pb.HelloRequest{Name: *name + strconv.Itoa(i)})}reply, err := stream.CloseAndRecv() if err != nil {fmt.Printf("failed to recv: %v", err)}log.Printf("Greeting: %s", reply.Message)} |
這里的調(diào)用c.SayHello2并沒(méi)有直接穿入請(qǐng)求參數(shù),而是返回一個(gè)stream,通過(guò)這個(gè)stream的Send發(fā)送,我們可以將對(duì)象流式發(fā)送。這個(gè)例子中我們發(fā)送了100個(gè)請(qǐng)求。
客戶(hù)端讀取的方法是stream.CloseAndRecv(),讀取完畢會(huì)關(guān)閉這個(gè)流的發(fā)送,這個(gè)方法返回最終結(jié)果。注意客戶(hù)端只負(fù)責(zé)關(guān)閉流的發(fā)送。
服務(wù)器端的代碼如下:
| 123456789101112131415161718 | func (s *server) SayHello2(gs pb.Greeter_SayHello2Server) error { var names []string for {in, err := gs.Recv() if err == io.EOF {gs.SendAndClose(&pb.HelloReply{Message: "Hello " + strings.Join(names, ",")}) return nil} if err != nil {log.Printf("failed to recv: %v", err) return err}names = append(names, in.Name)} return nil} |
服務(wù)器端收到每條消息都進(jìn)行了處理,這里的處理簡(jiǎn)化為增加到一個(gè)slice中。一旦它檢測(cè)的客戶(hù)端關(guān)閉了流的發(fā)送,它則把最終結(jié)果發(fā)送給客戶(hù)端,通過(guò)關(guān)閉這個(gè)流。流的關(guān)閉通過(guò)io.EOF這個(gè)error來(lái)區(qū)分。
雙向流
將上面兩個(gè)例子整合,就是雙向流的例子。 客戶(hù)端流式發(fā)送,服務(wù)器端流式響應(yīng),所有的發(fā)送和讀取都是流式處理的。
proto中的定義如下, 請(qǐng)求和響應(yīng)的前面都加上了stream標(biāo)識(shí):
| 123456789101112131415161718192021 | syntax = "proto3";package pb;import "github.com/gogo/protobuf/gogoproto/gogo.proto";// The greeting service definition.service Greeter {rpc SayHello3 (stream HelloRequest) returns (stream HelloReply) {}}// The request message containing the user's name.message HelloRequest {string name = 1;}// The response message containing the greetingsmessage HelloReply {string message = 1;} |
客戶(hù)端的代碼:
| 12345678910111213141516171819202122232425 | func sayHello3(c pb.GreeterClient) { var err errorstream, err := c.SayHello3(context.Background()) if err != nil {log.Printf("failed to call: %v", err) return} var i int64 for {stream.Send(&pb.HelloRequest{Name: *name + strconv.FormatInt(i, 10)}) if err != nil {log.Printf("failed to send: %v", err) break}reply, err := stream.Recv() if err != nil {log.Printf("failed to recv: %v", err) break}log.Printf("Greeting: %s", reply.Message)i++}} |
通過(guò)stream.Send發(fā)送請(qǐng)求,通過(guò)stream.Recv讀取響應(yīng)。客戶(hù)端可以通過(guò)CloseSend方法關(guān)閉發(fā)送流。
服務(wù)器端代碼也是通過(guò)Send發(fā)送響應(yīng),通過(guò)Recv響應(yīng):
| 12345678910111213141516 | func (s *server) SayHello3(gs pb.Greeter_SayHello3Server) error { for {in, err := gs.Recv() if err == io.EOF { return nil} if err != nil {log.Printf("failed to recv: %v", err) return err}gs.Send(&pb.HelloReply{Message: "Hello " + in.Name})} return nil} |
這基本上"退化"成一個(gè)TCP的client和server的架構(gòu)。
在實(shí)際的應(yīng)用中,你可以根據(jù)你的場(chǎng)景來(lái)使用單向流還是雙向流。
http://colobu.com/2017/04/06/dive-into-gRPC-streaming/
總結(jié)
以上是生活随笔為你收集整理的gRPC的那些事 - streaming的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: TENSORFLOW GUIDE: EX
- 下一篇: gRPC初体验