日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

gRPC学习记录(四)--官方Demo

發布時間:2025/3/15 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 gRPC学习记录(四)--官方Demo 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

了解proto3后,接下來看官方Demo作為訓練,這里建議看一遍之后自己動手搭建出來,一方面鞏固之前的知識,一方面是對整個流程更加熟悉.

官方Demo地址: https://github.com/grpc/grpc-java
例子是一個簡單的路由映射的應用,它允許客戶端獲取路由特性的信息,生成路由的總結,以及交互路由信息,如服務器和其他客戶端的流量更新.

1.1定義服務

也就是寫proto文件

//指定proto3格式 syntax = "proto3"; //一些生成代碼的設置 option java_multiple_files = true;//以外部類模式生成 option java_package = "cn.mrdear.route";//所在包名 option java_outer_classname = "RouteProto";//最外層類名稱//定義服務 service RouteGuide{//得到指定點的feature//一個 簡單 RPC , 客戶端使用存根發送請求到服務器并等待響應返回,就像平常的函數調用一樣。rpc GetFeature(Point) returns (Feature) {}//獲取一個矩形內的點//一個 服務器端流式 RPC , 客戶端發送請求到服務器,拿到一個流去讀取返回的消息序列。 客戶端讀取返回的流,//直到里面沒有任何消息。從例子中可以看出,通過在 響應 類型前插入 stream 關鍵字,可以指定一個服務器端的流方法。rpc ListFeatures(Rectangle) returns (stream Feature){}//記錄該點//一個 客戶端流式 RPC , 客戶端寫入一個消息序列并將其發送到服務器,同樣也是使用流。一旦客戶端完成寫入消息,//它等待服務器完成讀取返回它的響應。通過在 請求 類型前指定 stream 關鍵字來指定一個客戶端的流方法。rpc RecordRoute(stream Point) returns (RouteSummary){}//路由交流//一個 雙向流式 RPC 是雙方使用讀寫流去發送一個消息序列。兩個流獨立操作,因此客戶端和服務器//可以以任意喜歡的順序讀寫:比如, 服務器可以在寫入響應前等待接收所有的客戶端消息,或者可以交替 的讀取和寫入消息,//或者其他讀寫的組合。每個流中的消息順序被預留。你可以通過在請求和響應前加 stream 關鍵字去制定方法的類型。rpc RouteChat(stream RouteNote) returns (stream RouteNote){} }//代表經緯度 message Point {int32 latitude = 1;int32 longitude = 2; } //由兩個點確定的一個方塊 message Rectangle{Point lo = 1;Point hi = 2; } //某一位置的名稱 message Feature {string name = 1;Point location = 2; }// Not used in the RPC. Instead, this is here for the form serialized to disk. message FeatureDatabase {repeated Feature feature = 1; } //給某一點發送消息 message RouteNote{Point location = 1;string message = 2; }//記錄收到的信息 message RouteSummary{int32 point_count = 1;int32 feture_count = 2;int32 distance = 3;int32 elapsed_time = 4; }

執行mvn compile生成如下代碼:

Paste_Image.png

1.2編寫RouteGuideService

該類就是這個項目所提供給外部的功能.該類需要繼承RouteGuideGrpc.RouteGuideImplBase,這個類提供了我們所定義分服務接口,繼承后覆蓋需要實現的自定義方法.

簡單 RPC
簡單RPC和普通方法調用形式差不多,客戶端傳來一個實體,服務端返回一個實體.

@Overridepublic void getFeature(Point request, StreamObserver<Feature> responseObserver) {System.out.println("getFeature得到的請求參數: " + request.toString()); // responseObserver.onError(); 代表請求出錯responseObserver.onNext(checkFeature(request));//包裝返回信息responseObserver.onCompleted();//結束一次請求}//找到復核的featureprivate Feature checkFeature(Point location) {for (Feature feature : features) {if (feature.getLocation().getLatitude() == location.getLatitude()&& feature.getLocation().getLongitude() == location.getLongitude()) {return feature;}}// No feature was found, return an unnamed feature.return Feature.newBuilder().setName("").setLocation(location).build();}

其中StreamObserver<Feature>是一個應答觀察者,用于封裝返回的信息,服務器把該信息傳給客戶端.請求結束要調用onCompleted()方法.

服務器端流式 RPC
在proto文件中聲明了stream,但是從接口上看不出來和簡單RPC的區別,代碼中最主要的區別是多次調用responseObserver.onNext()的方法,最后完成時寫回數據.

@Overridepublic void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());for (Feature feature : features) {//如果不存在則繼續if (!RouteGuideUtil.exists(feature)) {continue;}int lat = feature.getLocation().getLatitude();int lon = feature.getLocation().getLongitude();if (lon >= left && lon <= right && lat >= bottom && lat <= top) {//找到符合的就寫入responseObserver.onNext(feature);}}//最后標識完成responseObserver.onCompleted();}

客戶端流式 RPC
服務端就需要一直監控客戶端寫入情況,因此需要一個StreamObserver接口,其中onNext方法會在客戶端每次寫入時調用,當寫入完畢時調用onCompleted()方法.具體還要到后面客戶端調用分析.

@Overridepublic StreamObserver<Point> recordRoute(StreamObserver<RouteSummary> responseObserver) {return new StreamObserver<Point>() {int pointCount;int featureCount;int distance;Point previous;long startTime = System.nanoTime();//客戶端每寫入一個Point,服務端就會調用該方法@Overridepublic void onNext(Point point) {System.out.println("recordRoute得到的請求參數: " + point.toString());pointCount++;if (RouteGuideUtil.exists(checkFeature(point))) {featureCount++;}if (previous != null) {distance += calcDistance(previous, point);}previous = point;}@Overridepublic void onError(Throwable throwable) {throwable.printStackTrace();System.err.println("Encountered error in recordRoute");}//客戶端寫入結束時調用@Overridepublic void onCompleted() {long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount).setFetureCount(featureCount).setDistance(distance).setElapsedTime((int) seconds).build());responseObserver.onCompleted();}};}

雙向流式 RPC
和客戶端流式RPC差不多.

@Overridepublic StreamObserver<RouteNote> routeChat(StreamObserver<RouteNote> responseObserver) {return new StreamObserver<RouteNote>() {@Overridepublic void onNext(RouteNote note) {List<RouteNote> notes = getOrCreateNotes(note.getLocation());for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {responseObserver.onNext(prevNote);}notes.add(note);}@Overridepublic void onError(Throwable t) {t.printStackTrace();System.err.println("Encountered error in routeChat");}@Overridepublic void onCompleted() {responseObserver.onCompleted();}};}

1.3創建服務端

和Helloworld一樣的形式,最主要的是addService(new RouteGuideService(features)),這里把需要注冊的服務給注冊上.

public class RouteGuideServer {private final int port;//服務端端口private final Server server;//服務器public RouteGuideServer(int port) throws IOException {this.port = port;//獲取初始化數據List<Feature> features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());//初始化Server參數server = ServerBuilder.forPort(port)//添加指定服務.addService(new RouteGuideService(features)).build();}/*** 啟動服務*/public void start() throws IOException {server.start();System.out.println("Server started, listening on " + port);//程序退出時關閉資源Runtime.getRuntime().addShutdownHook(new Thread(() -> {System.err.println("*** shutting down gRPC server since JVM is shutting down");RouteGuideServer.this.stop();System.err.println("*** server shut down");}));}/*** 關閉服務*/public void stop() {if (server != null) {server.shutdown();}}/*** 使得server一直處于運行狀態*/private void blockUntilShutdown() throws InterruptedException {if (server != null) {server.awaitTermination();}}public static void main(String[] args) throws IOException, InterruptedException {RouteGuideServer server = new RouteGuideServer(50051);server.start();server.blockUntilShutdown();}}

1.4編寫客戶端

客戶端需要一個channel和一個存根blockingStub或者asyncStub根據業務需要選擇同步或者異步.

private final ManagedChannel channel;//grpc信道,需要指定端口和地址private final RouteGuideGrpc.RouteGuideBlockingStub blockingStub;//阻塞/同步存根private final RouteGuideGrpc.RouteGuideStub asyncStub;//非阻塞,異步存根public RouteGuideClient(String host,int port) {//創建信道channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();//創建存根blockingStub = RouteGuideGrpc.newBlockingStub(channel);asyncStub = RouteGuideGrpc.newStub(channel);}/*** 關閉方法*/public void shutdown() throws InterruptedException {channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);}

簡單grpc
和調用普通方法形式差不多.

public void getFeature(int lat,int lon){System.out.println("start getFeature");Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();Feature feature;try {//同步阻塞調用feature = blockingStub.getFeature(request);System.out.println("getFeature服務端返回 :" + feature);} catch (StatusRuntimeException e) {System.out.println("RPC failed " +e.getStatus());}}

調用代碼:

public static void main(String[] args) throws InterruptedException {RouteGuideClient client = new RouteGuideClient("localhost", 50051);try {client.getFeature(409146138, -746188906);//成功案例client.getFeature(0, 0);//失敗案例} finally {client.shutdown();}}

客戶端日志

Paste_Image.png

服務端日志(參數都為0的時候,這邊并沒拿到參數)

Paste_Image.png

服務器端流式 RPC
和簡單RPC差不多,只不過返回的是一個集合類.

//2.服務端流式RPCpublic void listFeatures(int lowLat, int lowLon, int hiLat, int hiLon){System.out.println("start listFeatures");Rectangle request =Rectangle.newBuilder().setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build()).setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();Iterator<Feature> features;try {features = blockingStub.listFeatures(request);for (int i = 1; features.hasNext(); i++) {Feature feature = features.next();System.out.println("getFeature服務端返回 :" + feature);}} catch (Exception e) {System.out.println("RPC failed " +e.getMessage());}}

客戶端日志:

Paste_Image.png

服務端日志:

Paste_Image.png

客戶端流式 RPC
該種方式兩遍都是異步操作,所以需要互相監聽,也因此需要使用阻塞存根.服務端監聽Point的寫入,客戶端監聽RouteSummary的寫回.

public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {System.out.println("start recordRoute");final CountDownLatch finishLatch = new CountDownLatch(1);//建一個應答者接受返回數據StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {@Overridepublic void onNext(RouteSummary summary) {System.out.println("recordRoute服務端返回 :" + summary);}@Overridepublic void onError(Throwable t) {System.out.println("RecordRoute Failed");finishLatch.countDown();}@Overridepublic void onCompleted() {System.out.println("RecordRoute finish");finishLatch.countDown();}};//客戶端寫入操作StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);Random random = new Random();try {for (int i = 0; i < numPoints; ++i) {int index = random.nextInt(features.size());Point point = features.get(index).getLocation();System.out.println("客戶端寫入point:" + point);requestObserver.onNext(point);Thread.sleep(random.nextInt(1000) + 500);if (finishLatch.getCount() == 0) {return;}}} catch (RuntimeException e) {requestObserver.onError(e);throw e;}//標識已經寫完requestObserver.onCompleted();// Receiving happens asynchronouslyif (!finishLatch.await(1, TimeUnit.MINUTES)) {System.out.println("recordRoute can not finish within 1 minutes");}}

客戶端日志:

Paste_Image.png

服務端日志:

Paste_Image.png

雙向流式 RPC
和客戶端流式RPC比較接近,同樣都需要雙方監控.

public CountDownLatch routeChat() {System.out.println("start routeChat");final CountDownLatch finishLatch = new CountDownLatch(1);//寫入監聽StreamObserver<RouteNote> requestObserver =//寫回監聽asyncStub.routeChat(new StreamObserver<RouteNote>() {//服務端每寫回一個操作就調用@Overridepublic void onNext(RouteNote note) {System.out.println("服務端寫回: " + note);}@Overridepublic void onError(Throwable t) {t.printStackTrace();System.out.println("RouteChat Failed:");finishLatch.countDown();}@Overridepublic void onCompleted() {System.out.println("Finished RouteChat");finishLatch.countDown();}});try {RouteNote[] requests ={newNote("First message", 0, 0), newNote("Second message", 0, 1),newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};for (RouteNote request : requests) {System.out.println("客戶端寫入:" + request);requestObserver.onNext(request);}} catch (RuntimeException e) {requestObserver.onError(e);throw e;}//標識寫完requestObserver.onCompleted();return finishLatch;}

這里調用需要特殊處理下;

CountDownLatch finishLatch = client.routeChat();if (!finishLatch.await(1, TimeUnit.MINUTES)) {System.out.println("routeChat can not finish within 1 minutes");}

客戶端日志:

Paste_Image.png

服務端日志:

Paste_Image.png

官方Demo之后,入門算結束,接下來就要看詳細的官方文檔,然后在項目中使用,這個過程會遇到不少問題,解決這些問題就是對這個技術的熟練.

附錄:

相關代碼: https://github.com/nl101531/JavaWEB



作者:此博廢棄_更新在個人博客
鏈接:https://www.jianshu.com/p/39c9eedba2c2
來源:簡書
著作權歸作者所有。商業轉載請聯系作者獲得授權,非商業轉載請注明出處。

總結

以上是生活随笔為你收集整理的gRPC学习记录(四)--官方Demo的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。