文章目錄
- 攔截器
- 服務端攔截器
- 一元攔截器
- 流攔截器
- 客戶端攔截器
- 一元攔截器
- 流攔截`
- 多個攔截器
- 代碼倉庫
攔截器
gRPC攔截器(interceptor)是一種函數,它可以在gRPC調用之前和之后執行一些邏輯,例如認證、授權、日志記錄、監控和統計等。攔截器函數是gRPC中非常重要的概念,它允許我們在服務端和客戶端添加自定義邏輯,以滿足業務需求和運維需求。
在gRPC中,攔截器函數通常通過實現grpc.UnaryServerInterceptor和grpc.StreamServerInterceptor接口來定義。UnaryServerInterceptor用于攔截一元RPC請求,而StreamServerInterceptor用于攔截流式RPC請求。在客戶端中,我們可以使用grpc.UnaryClientInterceptor和grpc.StreamClientInterceptor來攔截gRPC調用。
在gRPC中,攔截器函數可以被鏈接起來,形成一個攔截器鏈。在這個攔截器鏈中,每個攔截器函數都可以處理請求并將其轉發給下一個攔截器函數,或者直接返回響應。因此,我們可以在攔截器函數中編寫不同的邏輯,例如實現認證、授權、監控和統計等。
以下是一些常見的gRPC攔截器:
- 認證和授權攔截器:用于對gRPC調用進行身份驗證和權限控制,例如檢查token、驗證用戶名和密碼、檢查訪問控制列表等;
- 日志記錄攔截器:用于記錄gRPC調用的日志,例如記錄請求的方法、參數、響應狀態等;
- 監控和統計攔截器:用于監控gRPC調用的性能和吞吐量,例如記錄調用次數、響應時間、錯誤率等;
- 緩存攔截器:用于在服務端或客戶端緩存一些數據,例如緩存計算結果、緩存數據庫查詢結果等。
服務端攔截器
一元攔截器
package mainimport ("context""flag""log""time""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure"pb "mygrpc/proto/hello" // 引入編譯生成的包
)const (defaultName = "world"
)var (addr = flag.String("addr", "localhost:50051", "the address to connect to")name = flag.String("name", defaultName, "Name to greet")
)func main() {flag.Parse()// 與服務建立連接.conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()// 創建指定服務的客戶端c := pb.NewGreeterClient(conn)// 連接服務器并打印出其響應。ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()// 調用指定方法r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})if err != nil {log.Fatalf("could not greet: %v", err)}log.Printf("Greeting: %s", r.GetMessage())
}
結果
2023/12/07 14:52:55 ======= [Server Interceptor] /hello.Greeter/SayHello
2023/12/07 14:52:55 Pre Proc Message : name:"world"
2023/12/07 14:52:55 Received: world
2023/12/07 14:52:55 Post Proc Message : message:"Hello world"
流攔截器
流式攔截器需要對grpc.ServerStream進行包裝,重新實現RecvMsg和SendMsg方法。
func (s *server) SearchOrders(req *pb.HelloRequest, stream pb.Greeter_SearchOrdersServer) error {log.Printf("Recved %v", req.GetName())// 具體返回多少個response根據業務邏輯調整for i := 0; i < 10; i++ {// 通過 send 方法不斷推送數據err := stream.Send(&pb.HelloReply{})if err != nil {log.Fatalf("Send error:%v", err)return err}}return nil
}type wrappedStream struct {// 包裝器流grpc.ServerStream
}
// 接受信息攔截器
func (w *wrappedStream) RecvMsg(m interface{}) error {log.Printf("====== [Server Stream Interceptor Wrapper] Receive a message (Type: %T) at %s", m, time.Now().Format(time.RFC3339))return w.ServerStream.RecvMsg(m)
}
// 發送消息攔截器
func (w *wrappedStream) SendMsg(m interface{}) error {log.Printf("====== [Server Stream Interceptor Wrapper] Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))return w.ServerStream.SendMsg(m)
}func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {return &wrappedStream{s}
}func orderServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {// 前置處理log.Println("====== [Server Stream Interceptor] ", info.FullMethod)// 包裝器流調用 流RPCerr := handler(srv, newWrappedStream(ss))if err != nil {log.Printf("RPC failed with error %v", err)}return err
}
func main() {flag.Parse()lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))if err != nil {log.Fatalf("failed to listen: %v", err)}// 開啟rpcs := grpc.NewServer(grpc.StreamInterceptor(orderServerStreamInterceptor))// 注冊服務pb.RegisterGreeterServer(s, &server{})log.Printf("service listening at %v", lis.Addr())if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}
}
結果
GOROOT=D:\software\Go #gosetup
GOPATH=D:\software\golibrary #gosetup
D:\software\Go\bin\go.exe build -o C:\Users\29071\AppData\Local\JetBrains\GoLand2023.3\tmp\GoLand\___go_build_mygrpc_service_steamInterceptorservice.exe mygrpc/service/steamInterceptorservice #gosetup
C:\Users\29071\AppData\Local\JetBrains\GoLand2023.3\tmp\GoLand\___go_build_mygrpc_service_steamInterceptorservice.exe
2023/12/07 15:07:48 service listening at [::]:50051
2023/12/07 15:08:07 ====== [Server Stream Interceptor] /hello.Greeter/searchOrders
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Receive a message (Type: *hello.HelloRequest) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 Recved 開始服務端rpc流測試
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
Process finished with the exit code -1073741510 (0xC000013A: interrupted by Ctrl+C)
客戶端攔截器
一元攔截器
func orderUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {// 前置處理邏輯log.Println("Method : " + method)// 調用invoker 執行遠程方法err := invoker(ctx, method, req, reply, cc, opts...)// 后置處理邏輯log.Println(reply)return err
}func main() {flag.Parse()// 與服務建立連接.conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()),grpc.WithUnaryInterceptor(orderUnaryClientInterceptor)) //添加攔截器if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()// 創建指定服務的客戶端c := pb.NewGreeterClient(conn)// 連接服務器并打印出其響應。ctx, cancel := context.WithTimeout(context.Background(), time.Second) // 設置超時時間為一秒defer cancel()// 調用指定方法r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})if err != nil {log.Fatalf("could not greet: %v", err)}log.Printf("Greeting: %s", r.GetMessage())
}
結果
2023/12/07 16:37:28 Method : /hello.Greeter/SayHello
2023/12/07 16:37:28 message:"Hello world"
2023/12/07 16:37:28 Greeting: Hello worl
流攔截`
type wrappedStream struct {grpc.ClientStream
}func (w *wrappedStream) RecvMsg(m interface{}) error {log.Printf("====== [Client Stream Interceptor] Receive a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))return w.ClientStream.RecvMsg(m)
}func (w *wrappedStream) SendMsg(m interface{}) error {log.Printf("====== [Client Stream Interceptor] Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))return w.ClientStream.SendMsg(m)
}func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {return &wrappedStream{s}
}func clientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {// 前置處理邏輯log.Println("======= [Client Interceptor] ", method)// 調用streamer 來獲取客戶端流s, err := streamer(ctx, desc, cc, method, opts...)if err != nil {return nil, err}return newWrappedStream(s), nil
}func main(){// 注冊攔截器到客戶端流conn,err:=grpc.Dial(address,grpc.WithInsecure(),grpc.WithStreamInterceptor(clientStreamInterceptor))if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()c := pb.NewOrderManagementClient(conn)ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)defer cancel()// 調用客戶端流RPC方法searchStream, _ := c.SearchOrders(ctx, &wrapper.StringValue{Value: "Google"})for {searchOrder, err := searchStream.Recv()if err == io.EOF {log.Print("EOF")break}if err == nil {log.Print("Search Result : ", searchOrder)}}
}
結果
2023/12/07 17:10:43 ====== [Client Stream Interceptor] Send a message (Type: *hello.HelloRequest) at 2023-12-07T17:10:43+08:00
2023/12/07 17:10:43 ====== [Client Stream Interceptor] Send a message (Type: *hello.HelloRequest) at 2023-12-07T17:10:43+08:00
2023/12/07 17:10:43 客戶端流傳輸結束
多個攔截器
在grpc中默認的攔截器不可以傳多個,因為在源碼中,存在一些問題
func chainUnaryClientInterceptors(cc *ClientConn) {interceptors := cc.dopts.chainUnaryIntsif cc.dopts.unaryInt != nil {interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)}var chainedInt UnaryClientInterceptorif len(interceptors) == 0 {chainedInt = nil} else if len(interceptors) == 1 {chainedInt = interceptors[0]} else {chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)}}cc.dopts.unaryInt = chainedInt
}
當存在多個攔截器時,取的就是第一個攔截器。因此結論是允許傳多個,但并沒有用。
如果真的需要多個攔截器,可以使用 go-grpc-middleware 提供的 grpc.UnaryInterceptor 和 grpc.StreamInterceptor 鏈式方法。核心方法如下
func ChainUnaryClient(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {n := len(interceptors)if n > 1 {lastI := n - 1return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {var (chainHandler grpc.UnaryInvokercurI int)chainHandler = func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error {if curI == lastI {return invoker(currentCtx, currentMethod, currentReq, currentRepl, currentConn, currentOpts...)}curI++err := interceptors[curI](currentCtx, currentMethod, currentReq, currentRepl, currentConn, chainHandler, currentOpts...)curI--return err}return interceptors[0](ctx, method, req, reply, cc, chainHandler, opts...)}}...
}
代碼倉庫
https://github.com/onenewcode/mygrpc.git
也可以直接下載綁定的資源。