日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

gRPC的那些事 - streaming

發布時間:2025/3/15 编程问答 45 豆豆
生活随笔 收集整理的這篇文章主要介紹了 gRPC的那些事 - streaming 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

gRPC是一個高性能、通用的開源RPC框架,其由Google主要面向移動應用開發并基于HTTP/2協議標準而設計,基于ProtoBuf(Protocol Buffers)序列化協議開發,且支持眾多開發語言。 gRPC提供了一種簡單的方法來精確地定義服務和為iOS、Android和后臺支持服務自動生成可靠性很強的客戶端功能庫。 客戶端充分利用高級流和鏈接功能,從而有助于節省帶寬、降低的TCP鏈接次數、節省CPU使用、和電池壽命。

gRPC具有以下重要特征:
強大的IDL特性 RPC使用ProtoBuf來定義服務,ProtoBuf是由Google開發的一種數據序列化協議,性能出眾,得到了廣泛的應用。
支持多種語言 支持C++、Java、Go、Python、Ruby、C#、Node.js、Android Java、Objective-C、PHP等編程語言。 3.基于HTTP/2標準設計

gRPC已經應用在Google的云服務和對外提供的API中。

gRPC開發起來非常的簡單,你可以閱讀 一個?helloworld 的例子來了解它的基本開發流程 (本系列文章以Go語言的開發為例)。

最基本的開發步驟是定義?proto?文件, 定義請求 Request 和 響應 Response 的格式,然后定義一個服務 Service, Service可以包含多個方法。

基本的gRPC開發很多文章都介紹過了,官方也有相關的文檔,這個系列的文章也就不介紹這些基礎的開發,而是想通過代碼演示gRPC更深入的開發。 作為這個系列的第一篇文章,想和大家分享一下gRPC流式開發的知識。

gRPC的流可以分為三類, 客戶端流式發送、服務器流式返回以及客戶端/服務器同時流式處理, 也就是單向流和雙向流。 下面針對這三種情況分別通過例子介紹。

服務器流式響應

通過使用流(streaming),你可以向服務器或者客戶端發送批量的數據, 服務器和客戶端在接收這些數據的時候,可以不必等所有的消息全收到后才開始響應,而是接收到第一條消息的時候就可以及時的響應, 這顯然比以前的類HTTP 1.1的方式更快的提供響應,從而提高性能。

比如有一批記錄個人收入數據,客戶端流式發送給服務器,服務器計算出每個人的個人所得稅,將結果流式發給客戶端。這樣客戶端的發送可以和服務器端的計算并行之行,從而減少服務的延遲。這只是一個簡單的例子,你可以利用流來實現RPC調用的異步執行,將客戶端的調用和服務器端的執行并行的處理,

當前gRPC通過 HTTP2 協議傳輸,可以方便的實現 streaming 功能。 如果你對gRPC如何通過 HTTP2 傳輸的感興趣, 你可以閱讀這篇文章?gRPC over HTTP2, 它描述了 gRPC 通過 HTTP2 傳輸的低層格式。Request 和 Response 的格式如下:

  • Request → Request-Headers *Length-Prefixed-Message EOS
  • Response → (Response-Headers *Length-Prefixed-Message Trailers) / Trailers-Only

要實現服務器的流式響應,只需在proto中的方法定義中將響應前面加上stream標記, 如下圖中SayHello1方法,HelloReply前面加上stream標識。

123456789101112131415161718192021syntax = "proto3";package pb;import "github.com/gogo/protobuf/gogoproto/gogo.proto";// The greeting service definition.service Greeter {// Sends a greetingrpc SayHello1 (HelloRequest) returns (stream HelloReply) {}}// The request message containing the user's name.message HelloRequest {string name = 1;}// The response message containing the greetingsmessage HelloReply {string message = 1;}

這個例子中我使用gogo來生成更有效的protobuf代碼,當然你也可以使用原生的工具生成。

12GOGO_ROOT=${GOPATH}/src/github.com/gogo/protobufprotoc -I.:${GOPATH}/src --gogofaster_out=plugins=grpc:. helloworld.proto

生成的代碼就已經包含了流的處理,所以和普通的gRPC代碼差別不是很大, 只需要注意的服務器端代碼的實現要通過流的方式發送響應。

12345678func (s *server) SayHello1(in *pb.HelloRequest, gs pb.Greeter_SayHello1Server) error {name := in.Name for i := 0; i < 100; i++ {gs.Send(&pb.HelloReply{Message: "Hello " + name + strconv.Itoa(i)})} return nil}

和普通的gRPC有什么區別?

普通的gRPC是直接返回一個HelloReply對象,而流式響應你可以通過Send方法返回多個HelloReply對象,對象流序列化后流式返回。

查看它低層的實現其實是使用ServerStream.SendMsg實現的。

123456789type Greeter_SayHello1Server interface {Send(*HelloReply) errorgrpc.ServerStream}func (x *greeterSayHello1Server) Send(m *HelloReply) error { return x.ServerStream.SendMsg(m)}

對于客戶端,我們需要關注兩個方面有沒有變化, 一是發送請求,一是讀取響應。下面是客戶端的代碼:

123456789101112131415161718192021222324conn, err := grpc.Dial(*address, grpc.WithInsecure()) if err != nil {log.Fatalf("faild to connect: %v", err)} defer conn.Close()c := pb.NewGreeterClient(conn)stream, err := c.SayHello1(context.Background(), &pb.HelloRequest{Name: *name})if err != nil {log.Fatalf("could not greet: %v", err)}for {reply, err := stream.Recv() if err == io.EOF { break} if err != nil {log.Printf("failed to recv: %v", err)}log.Printf("Greeting: %s", reply.Message)}

發送請求看起來沒有太大的區別,只是返回結果不再是一個單一的HelloReply對象,而是一個Stream。這和服務器端代碼正好對應,通過調用stream.Recv()返回每一個HelloReply對象, 直到出錯或者流結束(io.EOF)。

可以看出,生成的代碼提供了往/從流中方便的發送/讀取對象的能力,而這一切, gRPC都幫你生成好了。

客戶端流式發送

客戶端也可以流式的發送對象,當然這些對象也和上面的一樣,都是同一類型的對象。

首先還是要在proto文件中定義,與上面的定義類似,在請求的前面加上stream標識。

12345678910111213141516171819202122syntax = "proto3";package pb;import "github.com/gogo/protobuf/gogoproto/gogo.proto";option (gogoproto.unmarshaler_all) = true;// The greeting service definition.service Greeter {rpc SayHello2 (stream HelloRequest) returns (HelloReply) {}}// The request message containing the user's name.message HelloRequest {string name = 1;}// The response message containing the greetingsmessage HelloReply {string message = 1;}

注意這里我們只標記了請求是流式的, 響應還是以前的樣子。

生成相關的代碼后, 客戶端的代碼為:

12345678910111213141516171819func sayHello2(c pb.GreeterClient) { var err errorstream, err := c.SayHello2(context.Background()) for i := 0; i < 100; i++ { if err != nil {log.Printf("failed to call: %v", err) break}stream.Send(&pb.HelloRequest{Name: *name + strconv.Itoa(i)})}reply, err := stream.CloseAndRecv() if err != nil {fmt.Printf("failed to recv: %v", err)}log.Printf("Greeting: %s", reply.Message)}

這里的調用c.SayHello2并沒有直接穿入請求參數,而是返回一個stream,通過這個stream的Send發送,我們可以將對象流式發送。這個例子中我們發送了100個請求。

客戶端讀取的方法是stream.CloseAndRecv(),讀取完畢會關閉這個流的發送,這個方法返回最終結果。注意客戶端只負責關閉流的發送。

服務器端的代碼如下:

123456789101112131415161718func (s *server) SayHello2(gs pb.Greeter_SayHello2Server) error { var names []string for {in, err := gs.Recv() if err == io.EOF {gs.SendAndClose(&pb.HelloReply{Message: "Hello " + strings.Join(names, ",")}) return nil} if err != nil {log.Printf("failed to recv: %v", err) return err}names = append(names, in.Name)} return nil}

服務器端收到每條消息都進行了處理,這里的處理簡化為增加到一個slice中。一旦它檢測的客戶端關閉了流的發送,它則把最終結果發送給客戶端,通過關閉這個流。流的關閉通過io.EOF這個error來區分。

雙向流

將上面兩個例子整合,就是雙向流的例子。 客戶端流式發送,服務器端流式響應,所有的發送和讀取都是流式處理的。

proto中的定義如下, 請求和響應的前面都加上了stream標識:

123456789101112131415161718192021syntax = "proto3";package pb;import "github.com/gogo/protobuf/gogoproto/gogo.proto";// The greeting service definition.service Greeter {rpc SayHello3 (stream HelloRequest) returns (stream HelloReply) {}}// The request message containing the user's name.message HelloRequest {string name = 1;}// The response message containing the greetingsmessage HelloReply {string message = 1;}

客戶端的代碼:

12345678910111213141516171819202122232425func sayHello3(c pb.GreeterClient) { var err errorstream, err := c.SayHello3(context.Background()) if err != nil {log.Printf("failed to call: %v", err) return} var i int64 for {stream.Send(&pb.HelloRequest{Name: *name + strconv.FormatInt(i, 10)}) if err != nil {log.Printf("failed to send: %v", err) break}reply, err := stream.Recv() if err != nil {log.Printf("failed to recv: %v", err) break}log.Printf("Greeting: %s", reply.Message)i++}}

通過stream.Send發送請求,通過stream.Recv讀取響應。客戶端可以通過CloseSend方法關閉發送流。

服務器端代碼也是通過Send發送響應,通過Recv響應:

12345678910111213141516func (s *server) SayHello3(gs pb.Greeter_SayHello3Server) error { for {in, err := gs.Recv() if err == io.EOF { return nil} if err != nil {log.Printf("failed to recv: %v", err) return err}gs.Send(&pb.HelloReply{Message: "Hello " + in.Name})} return nil}

這基本上"退化"成一個TCP的client和server的架構。

在實際的應用中,你可以根據你的場景來使用單向流還是雙向流。

http://colobu.com/2017/04/06/dive-into-gRPC-streaming/

總結

以上是生活随笔為你收集整理的gRPC的那些事 - streaming的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。