一、簡介
我們之前已經完成了對于api模塊的開發,也就是已經生成了基礎的類和對應的接口,現在我們需要完成的是client和server端的開發。其實如同thrift一樣,現在要做的就是實現我們之前定義的service里面的hello方法,里面寫我們的業務邏輯,然后通過grpc的server發布暴露出去給客戶端使用。
// 定義服務
service HelloService{/* 簡單rpc,參數為HelloRequest類型,返回類型為HelloResponse */rpc hello(HelloRequest) returns (HelloResponse){}
}
ok,我們就先來實現serve模塊,然后再實現client模塊。
二、server模塊
我們創建一個新的模塊叫做rpc-grpc-service,并且在其pom依賴中引入api這個公共模塊。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.levi</groupId><artifactId>rpc</artifactId><version>1.0-SNAPSHOT</version></parent><artifactId>rpc-grpc-service</artifactId><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>com.levi</groupId><artifactId>rpc-grpc-api</artifactId><version>1.0-SNAPSHOT</version></dependency></dependencies></project>
1、業務實現
ok,我們前面說過了,我們其實要實現的業務接口或者說重寫的接口實現方法是HelloServiceGrpc.HelloServiceImplBase這個Base的內部類,在這個里面我們覆蓋我們的業務方法。
我們重新來回顧一下我們的這個接口和方法。
// 定義請求接口參數
message HelloRequest{string name = 1;
}// 定義接口響應參數
message HelloResponse{string result = 1;
}// 定義服務
service HelloService{/* 簡單rpc,參數為HelloRequest類型,返回類型為HelloResponse */rpc hello(HelloRequest) returns (HelloResponse){}
}
我們看到我們的請求類里面是一個參數name,響應類里面是一個參數result,并且接口的方法叫做hello。ok,我們就來實現覆蓋這個方法。
/ 服務端實現類
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {/*1. 接受client提交的參數 request.getParameter()2. 業務處理 service+dao 調用對應的業務功能。3. 提供返回值*/@Overridepublic void hello(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloResponse> responseObserver) {}
}
因為HelloServiceGrpc.HelloServiceImplBase不是一個接口,所以我們要繼承然后覆蓋方法。并且我們看到這個方法和我們當初定義的略有不同
第一個參數沒毛病就是HelloProto.HelloRequest,但是這個方法沒有返回值,他是個void。這就是grpc的規范,他的返回是通過第二個參數
StreamObserver<HelloProto.HelloResponse> responseObserver來給客戶端返回的,因為grpc有流式的返回,所以它是通過這個返回的,
如果弄成返回值就不方便以流的形式不斷的推給客戶端了。而且responseObserver的泛型就是我們定義的返回類型HelloProto.HelloResponse。
于是我們就來實現這個方法。
/ 服務端實現類
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {/*1. 接受client提交的參數 request.getParameter()2. 業務處理 service+dao 調用對應的業務功能。3. 提供返回值*/@Overridepublic void hello(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloResponse> responseObserver) {//1.接受client的請求參數,獲取我們定義的那個屬性nameString name = request.getName();//2.業務處理System.out.println("name parameter "+name);//3.封裝響應//3.1 創建相應對象的構造者HelloProto.HelloResponse.Builder builder = HelloProto.HelloResponse.newBuilder();//3.2 填充數據,填充返回值內容builder.setResult("hello method invoke ok");//3.3 封裝響應HelloProto.HelloResponse helloResponse = builder.build();// 4. 響應clientresponseObserver.onNext(helloResponse);// 5. 響應完成responseObserver.onCompleted();}
}
2、服務發布
現在我們實現了我們的業務,我們就要把這個服務接口發布出去給客戶端做rpc調用。
我們定義一個服務類,然后實現server,并且暴露端口。
package com.levi;import com.levi.service.HelloServiceImpl;
import io.grpc.Server;
import io.grpc.ServerBuilder;import java.io.IOException;public class GrpcServer1 {public static void main(String[] args) throws IOException, InterruptedException {//1. 綁定端口ServerBuilder<?> serverBuilder = ServerBuilder.forPort(9000);//2. 發布服務,這里可能會發布很多業務,我們這里就是一個HelloServiceImpl,實際可能還會有別的業務serverBuilder.addService(new HelloServiceImpl());//serverBuilder.addService(new UserServiceImpl());//3. 創建服務對象Server server = serverBuilder.build();// 啟動服務server.start();// 阻塞等待客戶端的連接訪問,底層其實就是nettyserver.awaitTermination();;}
}
此時我們就暴露出去我們的一個helo的業務實現了。
三、client模塊
我們創建一個名為rpc-grpc-client的模塊,并且引入api公共模塊。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.levi</groupId><artifactId>rpc</artifactId><version>1.0-SNAPSHOT</version></parent><artifactId>rpc-grpc-client</artifactId><properties><maven.compiler.source>11</maven.compiler.source><maven.compiler.target>11</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>com.levi</groupId><artifactId>rpc-grpc-api</artifactId><version>1.0-SNAPSHOT</version></dependency></dependencies></project>
我們現在已經把服務端的東西暴露在了9000這個端口,現在就可以在客戶端通過grpc的stub代理來訪問了。
package com.levi;import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;// client通過代理對象完成遠端對象的調用
public class GrpcClient1 {public static void main(String[] args) {//1.創建通信的管道,usePlaintext以普通文本進行訪問ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();//2.獲得代理對象 stub進行調用try {// 我們這里以阻塞的形式調用,也就是一直等返回值回來才往下走,其實這里就是獲取的rpc調用的代理類,grpc給我們提供的就是stub這個,本質是一個東西HelloServiceGrpc.HelloServiceBlockingStub helloService = HelloServiceGrpc.newBlockingStub(managedChannel);//3. 完成RPC調用//3.1 準備參數HelloProto.HelloRequest.Builder builder = HelloProto.HelloRequest.newBuilder();builder.setName("hello");HelloProto.HelloRequest helloRequest = builder.build();//3.1 進行功能rpc調用,獲取相應的內容,像本地調用那樣調用遠程服務HelloProto.HelloResponse helloResponse = helloService.hello(helloRequest);String result = helloResponse.getResult();System.out.println("result = " + result);} catch (Exception e) {throw new RuntimeException(e);}finally {// 4. 關閉通道managedChannel.shutdown();}}
}
成功返回。
四、多值傳遞
我們之前在學習proto語法的時候提到過一個關鍵字repeated關鍵字,我們當時說被這個關鍵字修飾的屬性是一個集合類型的字段,grpc會為它生成集合類型的get set方法,我們來做一個測試。
1、proto編寫
我們重新定義一個proto文件的message和方法。
// 定義proto文件版本號
syntax = "proto3";// 生成一個java類即可
option java_multiple_files = false;
// 生成的java類的包名
option java_package = "com.levi";
// 外部類,這里就是HelloProto,實際開發你可以有多個proto管理不同業務類,然后各自的外部類都可以。比如OrderService就是Order.proto 外部類就是OrderProto
option java_outer_classname = "HelloProto";// 定義請求接口參數
message HelloRequest{string name = 1;
}// 定義接口響應參數
message HelloResponse{string result = 1;
}message ManyHelloRequest{repeated string names = 1;
}message ManyHelloResponse{repeated string result = 1;
}// 定義服務
service HelloService{/* 簡單rpc,參數為HelloRequest類型,返回類型為HelloResponse */rpc hello(HelloRequest) returns (HelloResponse){}/* 服務端流式rpc,參數為ManyHelloRequest類型,返回類型為ManyHelloResponse */rpc manyHello(ManyHelloRequest) returns (ManyHelloResponse){}
}
我們在原來的基礎上添加
message ManyHelloRequest{repeated string names = 1;
}message ManyHelloResponse{repeated string result = 1;
}
和一個
rpc manyHello(ManyHelloRequest) returns (ManyHelloResponse){}
旨在請求多個名字,返回也是多個。我們來編譯結果。
我們看到message里面沒問題,生成了我們要的。當然之前的那個也保留了。
相應的service里面的也沒問題。我們就來修改一下我們的實現。當然為了保險最好刷新一下server和client的pom,重新引入一下api模塊,idea有時候會抽風。
2、server端改寫
package com.levi.service;import com.google.protobuf.ProtocolStringList;
import com.levi.HelloProto;
import com.levi.HelloServiceGrpc;
import io.grpc.stub.StreamObserver;
import org.slf4j.LoggerFactory;import java.util.ArrayList;
import java.util.List;// 服務端實現類
public class HelloServiceImpl extends HelloServiceGrpc.HelloServiceImplBase {private static final String RES_PREFIX = "server#";@Overridepublic void manyHello(HelloProto.ManyHelloRequest request, StreamObserver<HelloProto.ManyHelloResponse> responseObserver) {//1.接受client的請求參數,我們看到此時就是一個nameList的集合了,因為它被repeated修飾了,當然他的類型是ProtocolStringList,是grpc自己的類型ProtocolStringList requestNamesList = request.getNamesList();//2.業務處理System.out.println("請求參數為:" + requestNamesList);// 給返回值的name都加一個前綴List<String> responseNamesList = new ArrayList<>();for (String requestName : requestNamesList) {responseNamesList.add(RES_PREFIX + requestName);}//3.封裝響應//3.1 創建相應對象的構造者HelloProto.ManyHelloResponse.Builder builder = HelloProto.ManyHelloResponse.newBuilder();//3.2 填充數據,多個值要通過addAllResult,或者是下標的方式添加builder.addAllResult(responseNamesList);
// for (int i = 0; i < requestNamesList.size(); i++) {
// builder.setResult(i, requestNamesList.get(i));
// }//3.3 封裝響應HelloProto.ManyHelloResponse helloResponse = builder.build();// 4. 響應clientresponseObserver.onNext(helloResponse);// 5. 響應完成responseObserver.onCompleted();}/*1. 接受client提交的參數 request.getParameter()2. 業務處理 service+dao 調用對應的業務功能。3. 提供返回值*/@Overridepublic void hello(HelloProto.HelloRequest request, StreamObserver<HelloProto.HelloResponse> responseObserver) {//1.接受client的請求參數String name = request.getName();//2.業務處理System.out.println("name parameter "+name);//3.封裝響應//3.1 創建相應對象的構造者HelloProto.HelloResponse.Builder builder = HelloProto.HelloResponse.newBuilder();//3.2 填充數據builder.setResult("hello method invoke ok");//3.3 封裝響應HelloProto.HelloResponse helloResponse = builder.build();// 4. 響應clientresponseObserver.onNext(helloResponse);// 5. 響應完成responseObserver.onCompleted();}
}
然后服務端不用改,還是暴露注冊出去HelloServiceImpl。
package com.levi;import com.levi.service.HelloServiceImpl;
import io.grpc.Server;
import io.grpc.ServerBuilder;import java.io.IOException;public class GrpcServer1 {public static void main(String[] args) throws IOException, InterruptedException {//1. 綁定端口ServerBuilder<?> serverBuilder = ServerBuilder.forPort(9000);//2. 發布服務serverBuilder.addService(new HelloServiceImpl());//serverBuilder.addService(new UserServiceImpl());//3. 創建服務對象Server server = serverBuilder.build();// 啟動服務server.start();// 阻塞等待server.awaitTermination();;}
}
此時我們需要來修改客戶端代碼。
3、cilent端改寫
package com.levi;import com.google.protobuf.ProtocolStringList;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;import java.util.List;// client通過代理對象完成遠端對象的調用
public class GrpcClient2 {public static void main(String[] args) {//1.創建通信的管道ManagedChannel managedChannel = ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext().build();//2.獲得代理對象 stub進行調用try {// 我們這里以阻塞的形式調用,也就是一直等返回值回來才往下走HelloServiceGrpc.HelloServiceBlockingStub helloService = HelloServiceGrpc.newBlockingStub(managedChannel);//3. 完成RPC調用//3.1 準備參數HelloProto.ManyHelloRequest.Builder builder = HelloProto.ManyHelloRequest.newBuilder();// 多值參數要這樣添加或者以下標形式builder.addAllNames(List.of("levi","tom","jerry"));HelloProto.ManyHelloRequest helloRequest = builder.build();//3.1 進行功能rpc調用,獲取相應的內容,像本地調用那樣調用遠程服務HelloProto.ManyHelloResponse helloResponse = helloService.manyHello(helloRequest);ProtocolStringList resultList = helloResponse.getResultList();System.out.println("resultList = " + resultList);} catch (Exception e) {throw new RuntimeException(e);}finally {// 4. 關閉通道managedChannel.shutdown();}}
}
我們啟動服務端,然后客戶端去請求。
沒問題,這就是多值repeated關鍵字的使用方式。
五、關于服務端響應
我們在服務端響應客戶端的時候用的是一個StreamObserver<HelloProto.ManyHelloResponse> responseObserver這個類給客戶端響應的。我們簡單解釋一下這個操作。
public void manyHello(HelloProto.ManyHelloRequest request, StreamObserver<HelloProto.ManyHelloResponse> responseObserver) {...... 省略無關代碼// 4. 響應client,這里其實就是把數據返回給了客戶端responseObserver.onNext(helloResponse);// 5. 響應完成,這個操作其實是給這個通道設置一個標識,告訴客戶端服務端這邊傳完了,客戶端就會拿到數據開始繼續往下走// 如果沒有這個通知,客戶端會一直緩存服務端的數據不會做解析返回。客戶端也一直阻塞著。客戶端會監聽這個通知事件。responseObserver.onCompleted();}
相應的其實客戶端給服務端也會有類似的操作,因為grpc是雙向流,勢必涉及客戶端給服務端的操作。這個等我們后面再說。