日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

带入gRPC:gRPC Streaming, Client and Server

發布時間:2025/3/18 编程问答 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 带入gRPC:gRPC Streaming, Client and Server 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

帶入gRPC:gRPC Streaming, Client and Server

原文地址:帶入gRPC:gRPC Streaming, Client and Server

項目地址:go-grpc-example

前言

本章節將介紹 gRPC 的流式,分為三種類型:

  • Server-side streaming RPC:服務器端流式 RPC
  • Client-side streaming RPC:客戶端流式 RPC
  • Bidirectional streaming RPC:雙向流式 RPC

任何技術,因為有痛點,所以才有了存在的必要性。如果您想要了解 gRPC 的流式調用,請繼續

gRPC Streaming 是基于 HTTP/2 的,后續章節再進行詳細講解

為什么不用 Simple RPC

流式為什么要存在呢,是 Simple RPC 有什么問題嗎?通過模擬業務場景,可得知在使用 Simple RPC 時,有如下問題:

  • 數據包過大造成的瞬時壓力
  • 接收數據包時,需要所有數據包都接受成功且正確后,才能夠回調響應,進行業務處理(無法客戶端邊發送,服務端邊處理)

為什么用 Streaming RPC

  • 大規模數據包
  • 實時場景

模擬場景

每天早上 6 點,都有一批百萬級別的數據集要同從 A 同步到 B,在同步的時候,會做一系列操作(歸檔、數據分析、畫像、日志等)。這一次性涉及的數據量確實大

在同步完成后,也有人馬上會去查閱數據,為了新的一天籌備。也符合實時性。

兩者相較下,這個場景下更適合使用 Streaming RPC

gRPC

在講解具體的 gRPC 流式代碼時,會著重在第一節講解,因為三種模式其實是不同的組合。希望你能夠注重理解,舉一反三,其實都是一樣的知識點 ?

目錄結構

$ tree go-grpc-example go-grpc-example ├── client │?? ├── simple_client │?? │?? └── client.go │?? └── stream_client │?? └── client.go ├── proto │?? ├── search.proto │?? └── stream.proto └── server├── simple_server│?? └── server.go└── stream_server└── server.go

增加 stream_server、stream_client 存放服務端和客戶端文件,proto/stream.proto 用于編寫 IDL

IDL

在 proto 文件夾下的 stream.proto 文件中,寫入如下內容:

syntax = "proto3";package proto;service StreamService {rpc List(StreamRequest) returns (stream StreamResponse) {};rpc Record(stream StreamRequest) returns (StreamResponse) {};rpc Route(stream StreamRequest) returns (stream StreamResponse) {}; }message StreamPoint {string name = 1;int32 value = 2; }message StreamRequest {StreamPoint pt = 1; }message StreamResponse {StreamPoint pt = 1; }

注意關鍵字 stream,聲明其為一個流方法。這里共涉及三個方法,對應關系為

  • List:服務器端流式 RPC
  • Record:客戶端流式 RPC
  • Route:雙向流式 RPC

基礎模板 + 空定義

Server

package mainimport ("log""net""google.golang.org/grpc"pb "github.com/EDDYCJY/go-grpc-example/proto")type StreamService struct{}const (PORT = "9002" )func main() {server := grpc.NewServer()pb.RegisterStreamServiceServer(server, &StreamService{})lis, err := net.Listen("tcp", ":"+PORT)if err != nil {log.Fatalf("net.Listen err: %v", err)}server.Serve(lis) }func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {return nil }func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {return nil }func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {return nil }

寫代碼前,建議先將 gRPC Server 的基礎模板和接口給空定義出來。若有不清楚可參見上一章節的知識點

Client

package mainimport ("log""google.golang.org/grpc"pb "github.com/EDDYCJY/go-grpc-example/proto" )const (PORT = "9002" )func main() {conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())if err != nil {log.Fatalf("grpc.Dial err: %v", err)}defer conn.Close()client := pb.NewStreamServiceClient(conn)err = printLists(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: List", Value: 2018}})if err != nil {log.Fatalf("printLists.err: %v", err)}err = printRecord(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: Record", Value: 2018}})if err != nil {log.Fatalf("printRecord.err: %v", err)}err = printRoute(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: Route", Value: 2018}})if err != nil {log.Fatalf("printRoute.err: %v", err)} }func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error {return nil }func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {return nil }func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {return nil }

一、Server-side streaming RPC:服務器端流式 RPC

服務器端流式 RPC,顯然是單向流,并代指 Server 為 Stream 而 Client 為普通 RPC 請求

簡單來講就是客戶端發起一次普通的 RPC 請求,服務端通過流式響應多次發送數據集,客戶端 Recv 接收數據集。大致如圖:

Server

func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {for n := 0; n <= 6; n++ {err := stream.Send(&pb.StreamResponse{Pt: &pb.StreamPoint{Name: r.Pt.Name,Value: r.Pt.Value + int32(n),},})if err != nil {return err}}return nil }

在 Server,主要留意 stream.Send 方法。它看上去能發送 N 次?有沒有大小限制?

type StreamService_ListServer interface {Send(*StreamResponse) errorgrpc.ServerStream }func (x *streamServiceListServer) Send(m *StreamResponse) error {return x.ServerStream.SendMsg(m) }

通過閱讀源碼,可得知是 protoc 在生成時,根據定義生成了各式各樣符合標準的接口方法。最終再統一調度內部的 SendMsg 方法,該方法涉及以下過程:

  • 消息體(對象)序列化
  • 壓縮序列化后的消息體
  • 對正在傳輸的消息體增加 5 個字節的 header
  • 判斷壓縮+序列化后的消息體總字節長度是否大于預設的 maxSendMessageSize(預設值為 math.MaxInt32),若超出則提示錯誤
  • 寫入給流的數據集

Client

func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error {stream, err := client.List(context.Background(), r)if err != nil {return err}for {resp, err := stream.Recv()if err == io.EOF {break}if err != nil {return err}log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)}return nil }

在 Client,主要留意 stream.Recv() 方法。什么情況下 io.EOF ?什么情況下存在錯誤信息呢?

type StreamService_ListClient interface {Recv() (*StreamResponse, error)grpc.ClientStream }func (x *streamServiceListClient) Recv() (*StreamResponse, error) {m := new(StreamResponse)if err := x.ClientStream.RecvMsg(m); err != nil {return nil, err}return m, nil }

RecvMsg 會從流中讀取完整的 gRPC 消息體,另外通過閱讀源碼可得知:

(1)RecvMsg 是阻塞等待的

(2)RecvMsg 當流成功/結束(調用了 Close)時,會返回 io.EOF

(3)RecvMsg 當流出現任何錯誤時,流會被中止,錯誤信息會包含 RPC 錯誤碼。而在 RecvMsg 中可能出現如下錯誤:

  • io.EOF
  • io.ErrUnexpectedEOF
  • transport.ConnectionError
  • google.golang.org/grpc/codes

同時需要注意,默認的 MaxReceiveMessageSize 值為 1024 1024 4,建議不要超出

驗證

運行 stream_server/server.go:

$ go run server.go

運行 stream_client/client.go:

$ go run client.go 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2018 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2019 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2020 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2021 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2022 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2023 2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2024

二、Client-side streaming RPC:客戶端流式 RPC

客戶端流式 RPC,單向流,客戶端通過流式發起多次 RPC 請求給服務端,服務端發起一次響應給客戶端,大致如圖:

Server

func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {for {r, err := stream.Recv()if err == io.EOF {return stream.SendAndClose(&pb.StreamResponse{Pt: &pb.StreamPoint{Name: "gRPC Stream Server: Record", Value: 1}})}if err != nil {return err}log.Printf("stream.Recv pt.name: %s, pt.value: %d", r.Pt.Name, r.Pt.Value)}return nil }

多了一個從未見過的方法 stream.SendAndClose,它是做什么用的呢?

在這段程序中,我們對每一個 Recv 都進行了處理,當發現 io.EOF (流關閉) 后,需要將最終的響應結果發送給客戶端,同時關閉正在另外一側等待的 Recv

Client

func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {stream, err := client.Record(context.Background())if err != nil {return err}for n := 0; n < 6; n++ {err := stream.Send(r)if err != nil {return err}}resp, err := stream.CloseAndRecv()if err != nil {return err}log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)return nil }

stream.CloseAndRecv 和 stream.SendAndClose 是配套使用的流方法,相信聰明的你已經秒懂它的作用了

驗證

重啟 stream_server/server.go,再次運行 stream_client/client.go:

stream_client:
$ go run client.go 2018/09/24 16:23:03 resp: pj.name: gRPC Stream Server: Record, pt.value: 1
stream_server:
$ go run server.go 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018 2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018

三、Bidirectional streaming RPC:雙向流式 RPC

雙向流式 RPC,顧名思義是雙向流。由客戶端以流式的方式發起請求,服務端同樣以流式的方式響應請求

首個請求一定是 Client 發起,但具體交互方式(誰先誰后、一次發多少、響應多少、什么時候關閉)根據程序編寫的方式來確定(可以結合協程)

假設該雙向流是按順序發送的話,大致如圖:

還是要強調,雙向流變化很大,因程序編寫的不同而不同。雙向流圖示無法適用不同的場景

Server

func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {n := 0for {err := stream.Send(&pb.StreamResponse{Pt: &pb.StreamPoint{Name: "gPRC Stream Client: Route",Value: int32(n),},})if err != nil {return err}r, err := stream.Recv()if err == io.EOF {return nil}if err != nil {return err}n++log.Printf("stream.Recv pt.name: %s, pt.value: %d", r.Pt.Name, r.Pt.Value)}return nil }

Client

func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {stream, err := client.Route(context.Background())if err != nil {return err}for n := 0; n <= 6; n++ {err = stream.Send(r)if err != nil {return err}resp, err := stream.Recv()if err == io.EOF {break}if err != nil {return err}log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)}stream.CloseSend()return nil }

驗證

重啟 stream_server/server.go,再次運行 stream_client/client.go:

stream_server
$ go run server.go 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018 2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
stream_client
$ go run client.go 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 0 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 1 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 2 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 3 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 4 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 5 2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 6

總結

在本文共介紹了三類流的交互方式,可以根據實際的業務場景去選擇合適的方式。會事半功倍哦 ?

系列目錄

  • 帶入gRPC:gRPC及相關介紹
  • 帶入gRPC:gRPC Client and Server
  • 帶入gRPC:gRPC Streaming, Client and Server

總結

以上是生活随笔為你收集整理的带入gRPC:gRPC Streaming, Client and Server的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。