在前幾篇文章中,我們已經掌握了 Protobuf 的基礎語法、高級特性和序列化反序列化操作。本篇文章將深入講解 gRPC 與 Protobuf 的集成,重點介紹如何通過 .proto
文件定義服務接口,并在 Go 和 Java 中實現 gRPC 服務與客戶端的完整交互流程。我們將通過詳細代碼示例和分步解析,幫助你徹底掌握微服務架構中的通信設計。
一、gRPC 簡介與核心概念
1. 什么是 gRPC?
gRPC 是一個高性能、開源的遠程過程調用(RPC)框架,基于 HTTP/2 協議 和 Protobuf 數據格式 構建。它支持多種語言,并提供了同步/異步調用、流式通信等特性。
2. gRPC 的核心優勢
特性 | 描述 |
---|---|
高效通信 | 基于二進制協議(Protobuf),比 JSON 更快、更小 |
多語言支持 | 支持 Go、Java、Python、C++、Rust 等 |
雙向流式通信 | 支持客戶端/服務端流式數據傳輸 |
自動代碼生成 | 通過?.proto ?文件自動生成客戶端和服務端代碼 |
強大的工具鏈 | 提供調試工具(如?grpcurl )、插件系統 |
二、通過?.proto
?定義 gRPC 服務
1. 示例?.proto
?文件
syntax = "proto3";package user;//新版本有了下面的option go_package 這里的pacage就可以去掉了(當然留著也不影響)
option go_package = "/user;user"; // 指定生成的 Go 包路徑(生成源碼的路徑和包名,前面是路徑后面是包名,可以自己定義)
//option go_package = ".;user"; //這個可以生成在當前目錄下// 定義服務接口
service UserService {// 1. 單向調用(Unary RPC)rpc GetUser (GetUserRequest) returns (UserResponse);// 2. 服務端流式調用(Server Streaming)rpc ListUsers (ListUsersRequest) returns (stream UserResponse);// 3. 客戶端流式調用(Client Streaming)rpc CreateUsers (stream CreateUserRequest) returns (CreateUsersResponse);// 4. 雙向流式調用(Bidirectional Streaming)rpc UpdateUsers (stream UpdateUserRequest) returns (stream UserResponse);
}// 消息定義
message GetUserRequest {int32 id = 1;
}message UserResponse {int32 id = 1;string name = 2;string email = 3;
}message ListUsersRequest {string filter = 1;
}message CreateUserRequest {string name = 1;string email = 2;
}message CreateUsersResponse {int32 count = 1;
}message UpdateUserRequest {int32 id = 1;string name = 2;
}
要注意下面這里有了變化(以后會講解為什么要用option go_package):?
package user;//新版本有了下面的option go_package 這里的pacage就可以去掉了(當然留著也不影響)
option go_package = "/user;user"; // 指定生成的 Go 包路徑(生成源碼的路徑和包名,前面是路徑后面是包名,可以自己定義)
//option go_package = ".;user"; //這個可以生成在當前目錄下
?
三、生成 gRPC 代碼
1. 安裝 gRPC 工具
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
Java
protoc --java_out=. \--grpc-java_out=. \--plugin=protoc-gen-grpc-java=protoc-gen-grpc-java \user.proto
2. 生成代碼命令
Go
protoc --go_out=. --go-grpc_out=. user.proto//protoc --go_out=. --go-grpc_out=. user.proto
//這個命令使用了兩個輸出插件:--go_out=. 和 --go-grpc_out=.。它分別調用了 Go 相關的 Protobuf 插件和 gRPC Go 插件來生成對應的目標文件。其中:
//--go_out=. 表示使用 Go 的 Protobuf 編譯插件生成對應的 Go 文件。
//--go-grpc_out=. 表示使用 Go 的 gRPC 編譯插件生成 gRPC 服務相關的 Go 文件。
Java
protoc --java_out=. --grpc-java_out=. user.proto
四、Go 實現 gRPC 服務端與客戶端
1. 服務端代碼詳解
package mainimport ("context""fmt""log""net"pb "./user_go_proto""google.golang.org/grpc"
)type userService struct {pb.UnimplementedUserServiceServer
}// 單向調用
func (s *userService) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.UserResponse, error) {return &pb.UserResponse{Id: req.Id,Name: "Alice",Email: "alice@example.com",}, nil
}// 服務端流式調用
func (s *userService) ListUsers(req *pb.ListUsersRequest, stream pb.UserService_ListUsersServer) error {users := []*pb.UserResponse{{Id: 1, Name: "Alice", Email: "alice@example.com"},{Id: 2, Name: "Bob", Email: "bob@example.com"},}for _, user := range users {if err := stream.Send(user); err != nil {return err}}return nil
}// 客戶端流式調用
func (s *userService) CreateUsers(stream pb.UserService_CreateUsersServer) error {count := 0for {req, err := stream.Recv()if err == io.EOF {break}if err != nil {return err}count++}return stream.SendAndClose(&pb.CreateUsersResponse{Count: int32(count)})
}// 雙向流式調用
func (s *userService) UpdateUsers(stream pb.UserService_UpdateUsersServer) error {for {req, err := stream.Recv()if err == io.EOF {break}if err != nil {return err}resp := &pb.UserResponse{Id: req.Id,Name: req.Name,}if err := stream.Send(resp); err != nil {return err}}return nil
}func main() {lis, err := net.Listen("tcp", ":50051")if err != nil {log.Fatalf("failed to listen: %v", err)}s := grpc.NewServer()pb.RegisterUserServiceServer(s, &userService{})log.Printf("Server listening at %v", lis.Addr())if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}
}
代碼解析
- 服務端實現:通過?
pb.RegisterUserServiceServer
?注冊服務。 - 流式處理:通過?
stream
?接口處理雙向通信。 - 錯誤處理:捕獲?
io.EOF
?結束流式調用。
2. 客戶端代碼詳解
package mainimport ("context""fmt""log"pb "./user_go_proto""google.golang.org/grpc"
)func main() {conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()client := pb.NewUserServiceClient(conn)// 單向調用resp, err := client.GetUser(context.Background(), &pb.GetUserRequest{Id: 1})if err != nil {log.Fatalf("could not get user: %v", err)}fmt.Printf("User: %v\n", resp)// 服務端流式調用stream, err := client.ListUsers(context.Background(), &pb.ListUsersRequest{Filter: "IT"})if err != nil {log.Fatalf("could not list users: %v", err)}for {user, err := stream.Recv()if err == io.EOF {break}if err != nil {log.Fatalf("error receiving user: %v", err)}fmt.Printf("Received: %v\n", user)}// 客戶端流式調用stream2, err := client.CreateUsers(context.Background())if err != nil {log.Fatalf("could not create users: %v", err)}for i := 0; i < 3; i++ {if err := stream2.Send(&pb.CreateUserRequest{Name: fmt.Sprintf("User %d", i),Email: fmt.Sprintf("user%d@example.com", i),}); err != nil {log.Fatalf("error sending user: %v", err)}}resp2, err := stream2.CloseAndRecv()if err != nil {log.Fatalf("error closing stream: %v", err)}fmt.Printf("Created %d users\n", resp2.Count)// 雙向流式調用stream3, err := client.UpdateUsers(context.Background())if err != nil {log.Fatalf("could not update users: %v", err)}for i := 0; i < 3; i++ {if err := stream3.Send(&pb.UpdateUserRequest{Id: int32(i),Name: fmt.Sprintf("Updated User %d", i),}); err != nil {log.Fatalf("error sending update: %v", err)}resp3, err := stream3.Recv()if err != nil {log.Fatalf("error receiving update: %v", err)}fmt.Printf("Updated: %v\n", resp3)}
}
代碼解析
- 連接建立:通過?
grpc.Dial
?連接服務端。 - 流式調用:通過?
stream.Send()
?和?stream.Recv()
?實現雙向通信。 - 錯誤處理:捕獲?
io.EOF
?結束流式調用。
五、Java 實現 gRPC 服務端與客戶端
1. 服務端代碼詳解
import user.UserServiceGrpc;
import user.GetUserRequest;
import user.UserResponse;
import user.ListUsersRequest;
import user.CreateUserRequest;
import user.CreateUsersResponse;
import user.UpdateUserRequest;import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;public class UserServiceServer {public static void main(String[] args) throws IOException, InterruptedException {final Server server = ServerBuilder.forPort(50051).addService(new UserServiceImpl()).build();server.start();System.out.println("Server started at port 50051");final CountDownLatch latch = new CountDownLatch(1);server.awaitTermination();}static class UserServiceImpl extends UserServiceGrpc.UserServiceImplBase {// 單向調用@Overridepublic void getUser(GetUserRequest request, StreamObserver<UserResponse> responseObserver) {UserResponse response = UserResponse.newBuilder().setId(request.getId()).setName("Alice").setEmail("alice@example.com").build();responseObserver.onNext(response);responseObserver.onCompleted();}// 服務端流式調用@Overridepublic void listUsers(ListUsersRequest request, StreamObserver<UserResponse> responseObserver) {UserResponse user1 = UserResponse.newBuilder().setId(1).setName("Alice").setEmail("alice@example.com").build();UserResponse user2 = UserResponse.newBuilder().setId(2).setName("Bob").setEmail("bob@example.com").build();responseObserver.onNext(user1);responseObserver.onNext(user2);responseObserver.onCompleted();}// 客戶端流式調用@Overridepublic StreamObserver<CreateUserRequest> createUsers(StreamObserver<CreateUsersResponse> responseObserver) {return new StreamObserver<>() {int count = 0;@Overridepublic void onNext(CreateUserRequest request) {count++;}@Overridepublic void onError(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void onCompleted() {CreateUsersResponse response = CreateUsersResponse.newBuilder().setCount(count).build();responseObserver.onNext(response);responseObserver.onCompleted();}};}// 雙向流式調用@Overridepublic StreamObserver<UpdateUserRequest> updateUsers(StreamObserver<UserResponse> responseObserver) {return new StreamObserver<>() {@Overridepublic void onNext(UpdateUserRequest request) {UserResponse response = UserResponse.newBuilder().setId(request.getId()).setName(request.getName()).build();responseObserver.onNext(response);}@Overridepublic void onError(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void onCompleted() {responseObserver.onCompleted();}};}}
}
代碼解析
- 服務端實現:通過繼承?
UserServiceGrpc.UserServiceImplBase
?實現接口。 - 流式處理:通過?
StreamObserver
?處理雙向通信。 - 錯誤處理:通過?
onError
?捕獲異常。
2. 客戶端代碼詳解
import user.UserServiceGrpc;
import user.GetUserRequest;
import user.UserResponse;
import user.ListUsersRequest;
import user.CreateUserRequest;
import user.CreateUsersResponse;
import user.UpdateUserRequest;import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.StatusRuntimeException;import java.util.concurrent.TimeUnit;public class UserServiceClient {public static void main(String[] args) {ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();UserServiceGrpc.UserServiceBlockingStub blockingStub = UserServiceGrpc.newBlockingStub(channel);// 單向調用GetUserRequest request = GetUserRequest.newBuilder().setId(1).build();try {UserResponse response = blockingStub.getUser(request);System.out.println("User: " + response.getName());} catch (StatusRuntimeException e) {e.printStackTrace();}// 服務端流式調用ListUsersRequest listRequest = ListUsersRequest.newBuilder().setFilter("IT").build();UserServiceGrpc.UserServiceStub asyncStub = UserServiceGrpc.newStub(channel);asyncStub.listUsers(listRequest, new StreamObserver<>() {@Overridepublic void onNext(UserResponse user) {System.out.println("Received: " + user.getName());}@Overridepublic void onError(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void onCompleted() {System.out.println("Stream completed.");}});// 客戶端流式調用UserServiceGrpc.UserServiceStub createStub = UserServiceGrpc.newStub(channel);createStub.createUsers(new StreamObserver<>() {@Overridepublic void onNext(CreateUsersResponse response) {System.out.println("Created " + response.getCount() + " users");}@Overridepublic void onError(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void onCompleted() {System.out.println("Create stream completed.");}}).forEachRemaining(user -> {if (user != null) {System.out.println("Sending: " + user.getName());}});// 雙向流式調用UserServiceGrpc.UserServiceStub updateStub = UserServiceGrpc.newStub(channel);StreamObserver<UpdateUserRequest> requestStream = updateStub.updateUsers(new StreamObserver<>() {@Overridepublic void onNext(UserResponse response) {System.out.println("Updated: " + response.getName());}@Overridepublic void onError(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void onCompleted() {System.out.println("Update stream completed.");}});for (int i = 0; i < 3; i++) {UpdateUserRequest updateRequest = UpdateUserRequest.newBuilder().setId(i).setName("Updated User " + i).build();requestStream.onNext(updateRequest);}requestStream.onCompleted();try {channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}}
}
代碼解析
- 連接建立:通過?
ManagedChannelBuilder
?連接服務端。 - 流式調用:通過?
StreamObserver
?實現雙向通信。 - 錯誤處理:通過?
onError
?捕獲異常。
六、多語言交互的最佳實踐
1. 保持?.proto
?文件統一
- 所有語言共享同一個?
.proto
?文件,確保接口定義一致。 - 使用?
protoc
?生成對應語言的代碼。
2. 版本控制
- 在?
.proto
?文件中添加版本注釋:// Version 1.0.0 message User {string name = 1; }
3. 依賴管理
- 使用?
go mod
?或?Maven
?管理依賴,確保不同語言的代碼版本一致。
注意:
這篇文章中使用的Go和Java 實現 gRPC 服務端與客戶端的例子是二者分開用的,而不是混合語言,其實在這里我更想做的是Go和Java放在一起使用,比如Go做服務端,Java做客戶端。原因是我覺得Go更適合grpc,所以大家著重看Go的講解即可。如果要混合的話也是以Go為主,Java為輔。
這次沒有使用多語言的原因是,突然混合在一起的話怕大家不好理解,我在其他文章中也有講解跨語言使用的例子,大家有興趣的可以去看看。
七、總結
在本文中,我們詳細講解了 gRPC 與 Protobuf 的深度集成,包括:
- 通過?
.proto
?文件定義服務接口 - 在 Go 和 Java 中實現服務端與客戶端
- 單向、流式通信的完整代碼示例
- 多語言交互的最佳實踐
通過這些內容,你已經能夠構建高性能、可擴展的微服務系統,并在不同語言之間實現無縫通信。gRPC 與 Protobuf 的結合是現代分布式系統的基石,希望這篇文章能幫助你更自信地在項目中應用這些技術。