日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

grpc简单使用 java_gRPC学习记录(四)-官方Demo - Java 技术驿站-Java 技术驿站

發布時間:2024/7/19 java 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 grpc简单使用 java_gRPC学习记录(四)-官方Demo - Java 技术驿站-Java 技术驿站 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

了解proto3后,接下來看官方Demo作為訓練,這里建議看一遍之后自己動手搭建出來,一方面鞏固之前的知識,一方面是對整個流程更加熟悉.

官方Demo地址: https://github.com/grpc/grpc-java

例子是一個簡單的路由映射的應用,它允許客戶端獲取路由特性的信息,生成路由的總結,以及交互路由信息,如服務器和其他客戶端的流量更新.

1.1定義服務

也就是寫proto文件

//指定proto3格式

syntax = "proto3";

//一些生成代碼的設置

option java_multiple_files = true;//以外部類模式生成

option java_package = "cn.mrdear.route";//所在包名

option java_outer_classname = "RouteProto";//最外層類名稱

//定義服務

service RouteGuide{

//得到指定點的feature

//一個 簡單 RPC , 客戶端使用存根發送請求到服務器并等待響應返回,就像平常的函數調用一樣。

rpc GetFeature(Point) returns (Feature) {}

//獲取一個矩形內的點

//一個 服務器端流式 RPC , 客戶端發送請求到服務器,拿到一個流去讀取返回的消息序列。 客戶端讀取返回的流,

//直到里面沒有任何消息。從例子中可以看出,通過在 響應 類型前插入 stream 關鍵字,可以指定一個服務器端的流方法。

rpc ListFeatures(Rectangle) returns (stream Feature){}

//記錄該點

//一個 客戶端流式 RPC , 客戶端寫入一個消息序列并將其發送到服務器,同樣也是使用流。一旦客戶端完成寫入消息,

//它等待服務器完成讀取返回它的響應。通過在 請求 類型前指定 stream 關鍵字來指定一個客戶端的流方法。

rpc RecordRoute(stream Point) returns (RouteSummary){}

//路由交流

//一個 雙向流式 RPC 是雙方使用讀寫流去發送一個消息序列。兩個流獨立操作,因此客戶端和服務器

//可以以任意喜歡的順序讀寫:比如, 服務器可以在寫入響應前等待接收所有的客戶端消息,或者可以交替 的讀取和寫入消息,

//或者其他讀寫的組合。每個流中的消息順序被預留。你可以通過在請求和響應前加 stream 關鍵字去制定方法的類型。

rpc RouteChat(stream RouteNote) returns (stream RouteNote){}

}

//代表經緯度

message Point {

int32 latitude = 1;

int32 longitude = 2;

}

//由兩個點確定的一個方塊

message Rectangle{

Point lo = 1;

Point hi = 2;

}

//某一位置的名稱

message Feature {

string name = 1;

Point location = 2;

}

// Not used in the RPC. Instead, this is here for the form serialized to disk.

message FeatureDatabase {

repeated Feature feature = 1;

}

//給某一點發送消息

message RouteNote{

Point location = 1;

string message = 2;

}

//記錄收到的信息

message RouteSummary{

int32 point_count = 1;

int32 feture_count = 2;

int32 distance = 3;

int32 elapsed_time = 4;

}

執行mvn compile生成如下代碼:

1.2編寫RouteGuideService

該類就是這個項目所提供給外部的功能.該類需要繼承RouteGuideGrpc.RouteGuideImplBase,這個類提供了我們所定義分服務接口,繼承后覆蓋需要實現的自定義方法.

簡單 RPC

簡單RPC和普通方法調用形式差不多,客戶端傳來一個實體,服務端返回一個實體.

@Override

public void getFeature(Point request, StreamObserver responseObserver) {

System.out.println("getFeature得到的請求參數: " + request.toString());

// responseObserver.onError(); 代表請求出錯

responseObserver.onNext(checkFeature(request));//包裝返回信息

responseObserver.onCompleted();//結束一次請求

}

//找到復核的feature

private Feature checkFeature(Point location) {

for (Feature feature : features) {

if (feature.getLocation().getLatitude() == location.getLatitude()

&& feature.getLocation().getLongitude() == location.getLongitude()) {

return feature;

}

}

// No feature was found, return an unnamed feature.

return Feature.newBuilder().setName("").setLocation(location).build();

}

其中StreamObserver是一個應答觀察者,用于封裝返回的信息,服務器把該信息傳給客戶端.請求結束要調用onCompleted()方法.

服務器端流式 RPC

在proto文件中聲明了stream,但是從接口上看不出來和簡單RPC的區別,代碼中最主要的區別是多次調用responseObserver.onNext()的方法,最后完成時寫回數據.

@Override

public void listFeatures(Rectangle request, StreamObserver responseObserver) {

int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());

int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());

int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());

int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());

for (Feature feature : features) {

//如果不存在則繼續

if (!RouteGuideUtil.exists(feature)) {

continue;

}

int lat = feature.getLocation().getLatitude();

int lon = feature.getLocation().getLongitude();

if (lon >= left && lon <= right && lat >= bottom && lat <= top) {

//找到符合的就寫入

responseObserver.onNext(feature);

}

}

//最后標識完成

responseObserver.onCompleted();

}

客戶端流式 RPC

服務端就需要一直監控客戶端寫入情況,因此需要一個StreamObserver接口,其中onNext方法會在客戶端每次寫入時調用,當寫入完畢時調用onCompleted()方法.具體還要到后面客戶端調用分析.

@Override

public StreamObserver recordRoute(StreamObserver responseObserver) {

return new StreamObserver() {

int pointCount;

int featureCount;

int distance;

Point previous;

long startTime = System.nanoTime();

//客戶端每寫入一個Point,服務端就會調用該方法

@Override

public void onNext(Point point) {

System.out.println("recordRoute得到的請求參數: " + point.toString());

pointCount++;

if (RouteGuideUtil.exists(checkFeature(point))) {

featureCount++;

}

if (previous != null) {

distance += calcDistance(previous, point);

}

previous = point;

}

@Override

public void onError(Throwable throwable) {

throwable.printStackTrace();

System.err.println("Encountered error in recordRoute");

}

//客戶端寫入結束時調用

@Override

public void onCompleted() {

long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);

responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)

.setFetureCount(featureCount)

.setDistance(distance)

.setElapsedTime((int) seconds).build());

responseObserver.onCompleted();

}

};

}

雙向流式 RPC

和客戶端流式RPC差不多.

@Override

public StreamObserver routeChat(StreamObserver responseObserver) {

return new StreamObserver() {

@Override

public void onNext(RouteNote note) {

List notes = getOrCreateNotes(note.getLocation());

for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {

responseObserver.onNext(prevNote);

}

notes.add(note);

}

@Override

public void onError(Throwable t) {

t.printStackTrace();

System.err.println("Encountered error in routeChat");

}

@Override

public void onCompleted() {

responseObserver.onCompleted();

}

};

}

1.3創建服務端

和Helloworld一樣的形式,最主要的是addService(new RouteGuideService(features)),這里把需要注冊的服務給注冊上.

public class RouteGuideServer {

private final int port;//服務端端口

private final Server server;//服務器

public RouteGuideServer(int port) throws IOException {

this.port = port;

//獲取初始化數據

List features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());

//初始化Server參數

server = ServerBuilder.forPort(port)

//添加指定服務

.addService(new RouteGuideService(features))

.build();

}

/**

* 啟動服務

*/

public void start() throws IOException {

server.start();

System.out.println("Server started, listening on " + port);

//程序退出時關閉資源

Runtime.getRuntime().addShutdownHook(new Thread(() -> {

System.err.println("*** shutting down gRPC server since JVM is shutting down");

RouteGuideServer.this.stop();

System.err.println("*** server shut down");

}));

}

/**

* 關閉服務

*/

public void stop() {

if (server != null) {

server.shutdown();

}

}

/**

* 使得server一直處于運行狀態

*/

private void blockUntilShutdown() throws InterruptedException {

if (server != null) {

server.awaitTermination();

}

}

public static void main(String[] args) throws IOException, InterruptedException {

RouteGuideServer server = new RouteGuideServer(50051);

server.start();

server.blockUntilShutdown();

}

}

1.4編寫客戶端

客戶端需要一個channel和一個存根blockingStub或者asyncStub根據業務需要選擇同步或者異步.

private final ManagedChannel channel;//grpc信道,需要指定端口和地址

private final RouteGuideGrpc.RouteGuideBlockingStub blockingStub;//阻塞/同步存根

private final RouteGuideGrpc.RouteGuideStub asyncStub;//非阻塞,異步存根

public RouteGuideClient(String host,int port) {

//創建信道

channel = ManagedChannelBuilder.forAddress(host, port)

.usePlaintext(true)

.build();

//創建存根

blockingStub = RouteGuideGrpc.newBlockingStub(channel);

asyncStub = RouteGuideGrpc.newStub(channel);

}

/**

* 關閉方法

*/

public void shutdown() throws InterruptedException {

channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);

}

簡單grpc

和調用普通方法形式差不多.

public void getFeature(int lat,int lon){

System.out.println("start getFeature");

Point request = Point.newBuilder()

.setLatitude(lat)

.setLongitude(lon)

.build();

Feature feature;

try {

//同步阻塞調用

feature = blockingStub.getFeature(request);

System.out.println("getFeature服務端返回 :" + feature);

} catch (StatusRuntimeException e) {

System.out.println("RPC failed " +e.getStatus());

}

}

調用代碼:

public static void main(String[] args) throws InterruptedException {

RouteGuideClient client = new RouteGuideClient("localhost", 50051);

try {

client.getFeature(409146138, -746188906);//成功案例

client.getFeature(0, 0);//失敗案例

} finally {

client.shutdown();

}

}

客戶端日志

服務端日志(參數都為0的時候,這邊并沒拿到參數)

服務器端流式 RPC

和簡單RPC差不多,只不過返回的是一個集合類.

//2.服務端流式RPC

public void listFeatures(int lowLat, int lowLon, int hiLat, int hiLon){

System.out.println("start listFeatures");

Rectangle request =

Rectangle.newBuilder()

.setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())

.setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();

Iterator features;

try {

features = blockingStub.listFeatures(request);

for (int i = 1; features.hasNext(); i++) {

Feature feature = features.next();

System.out.println("getFeature服務端返回 :" + feature);

}

} catch (Exception e) {

System.out.println("RPC failed " +e.getMessage());

}

}

客戶端日志:

服務端日志:

客戶端流式 RPC

該種方式兩遍都是異步操作,所以需要互相監聽,也因此需要使用阻塞存根.服務端監聽Point的寫入,客戶端監聽RouteSummary的寫回.

public void recordRoute(List features, int numPoints) throws InterruptedException {

System.out.println("start recordRoute");

final CountDownLatch finishLatch = new CountDownLatch(1);

//建一個應答者接受返回數據

StreamObserver responseObserver = new StreamObserver() {

@Override

public void onNext(RouteSummary summary) {

System.out.println("recordRoute服務端返回 :" + summary);

}

@Override

public void onError(Throwable t) {

System.out.println("RecordRoute Failed");

finishLatch.countDown();

}

@Override

public void onCompleted() {

System.out.println("RecordRoute finish");

finishLatch.countDown();

}

};

//客戶端寫入操作

StreamObserver requestObserver = asyncStub.recordRoute(responseObserver);

Random random = new Random();

try {

for (int i = 0; i < numPoints; ++i) {

int index = random.nextInt(features.size());

Point point = features.get(index).getLocation();

System.out.println("客戶端寫入point:" + point);

requestObserver.onNext(point);

Thread.sleep(random.nextInt(1000) + 500);

if (finishLatch.getCount() == 0) {

return;

}

}

} catch (RuntimeException e) {

requestObserver.onError(e);

throw e;

}

//標識已經寫完

requestObserver.onCompleted();

// Receiving happens asynchronously

if (!finishLatch.await(1, TimeUnit.MINUTES)) {

System.out.println("recordRoute can not finish within 1 minutes");

}

}

客戶端日志:

服務端日志:

雙向流式 RPC

和客戶端流式RPC比較接近,同樣都需要雙方監控.

public CountDownLatch routeChat() {

System.out.println("start routeChat");

final CountDownLatch finishLatch = new CountDownLatch(1);

//寫入監聽

StreamObserver requestObserver =

//寫回監聽

asyncStub.routeChat(new StreamObserver() {

//服務端每寫回一個操作就調用

@Override

public void onNext(RouteNote note) {

System.out.println("服務端寫回: " + note);

}

@Override

public void onError(Throwable t) {

t.printStackTrace();

System.out.println("RouteChat Failed:");

finishLatch.countDown();

}

@Override

public void onCompleted() {

System.out.println("Finished RouteChat");

finishLatch.countDown();

}

});

try {

RouteNote[] requests =

{newNote("First message", 0, 0), newNote("Second message", 0, 1),

newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};

for (RouteNote request : requests) {

System.out.println("客戶端寫入:" + request);

requestObserver.onNext(request);

}

} catch (RuntimeException e) {

requestObserver.onError(e);

throw e;

}

//標識寫完

requestObserver.onCompleted();

return finishLatch;

}

這里調用需要特殊處理下;

CountDownLatch finishLatch = client.routeChat();

if (!finishLatch.await(1, TimeUnit.MINUTES)) {

System.out.println("routeChat can not finish within 1 minutes");

}

客戶端日志:

服務端日志:

官方Demo之后,入門算結束,接下來就要看詳細的官方文檔,然后在項目中使用,這個過程會遇到不少問題,解決這些問題就是對這個技術的熟練.

附錄:

總結

以上是生活随笔為你收集整理的grpc简单使用 java_gRPC学习记录(四)-官方Demo - Java 技术驿站-Java 技术驿站的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。