go語言 grpc 攔截器

文章目錄

    • 攔截器
      • 服務端攔截器
        • 一元攔截器
        • 流攔截器
      • 客戶端攔截器
        • 一元攔截器
        • 流攔截`
      • 多個攔截器
  • 代碼倉庫

攔截器

gRPC攔截器(interceptor)是一種函數,它可以在gRPC調用之前和之后執行一些邏輯,例如認證、授權、日志記錄、監控和統計等。攔截器函數是gRPC中非常重要的概念,它允許我們在服務端和客戶端添加自定義邏輯,以滿足業務需求和運維需求。

在gRPC中,攔截器函數通常通過實現grpc.UnaryServerInterceptor和grpc.StreamServerInterceptor接口來定義。UnaryServerInterceptor用于攔截一元RPC請求,而StreamServerInterceptor用于攔截流式RPC請求。在客戶端中,我們可以使用grpc.UnaryClientInterceptor和grpc.StreamClientInterceptor來攔截gRPC調用。

在gRPC中,攔截器函數可以被鏈接起來,形成一個攔截器鏈。在這個攔截器鏈中,每個攔截器函數都可以處理請求并將其轉發給下一個攔截器函數,或者直接返回響應。因此,我們可以在攔截器函數中編寫不同的邏輯,例如實現認證、授權、監控和統計等。
以下是一些常見的gRPC攔截器:

  • 認證和授權攔截器:用于對gRPC調用進行身份驗證和權限控制,例如檢查token、驗證用戶名和密碼、檢查訪問控制列表等;
  • 日志記錄攔截器:用于記錄gRPC調用的日志,例如記錄請求的方法、參數、響應狀態等;
  • 監控和統計攔截器:用于監控gRPC調用的性能和吞吐量,例如記錄調用次數、響應時間、錯誤率等;
  • 緩存攔截器:用于在服務端或客戶端緩存一些數據,例如緩存計算結果、緩存數據庫查詢結果等。

服務端攔截器

在這里插入圖片描述

一元攔截器
package mainimport ("context""flag""log""time""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure"pb "mygrpc/proto/hello" // 引入編譯生成的包
)const (defaultName = "world"
)var (addr = flag.String("addr", "localhost:50051", "the address to connect to")name = flag.String("name", defaultName, "Name to greet")
)func main() {flag.Parse()// 與服務建立連接.conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()// 創建指定服務的客戶端c := pb.NewGreeterClient(conn)// 連接服務器并打印出其響應。ctx, cancel := context.WithTimeout(context.Background(), time.Second)defer cancel()// 調用指定方法r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})if err != nil {log.Fatalf("could not greet: %v", err)}log.Printf("Greeting: %s", r.GetMessage())
}

結果

2023/12/07 14:52:55 ======= [Server Interceptor]  /hello.Greeter/SayHello
2023/12/07 14:52:55  Pre Proc Message : name:"world"
2023/12/07 14:52:55 Received: world
2023/12/07 14:52:55  Post Proc Message : message:"Hello world"
流攔截器

流式攔截器需要對grpc.ServerStream進行包裝,重新實現RecvMsg和SendMsg方法。

func (s *server) SearchOrders(req *pb.HelloRequest, stream pb.Greeter_SearchOrdersServer) error {log.Printf("Recved %v", req.GetName())// 具體返回多少個response根據業務邏輯調整for i := 0; i < 10; i++ {// 通過 send 方法不斷推送數據err := stream.Send(&pb.HelloReply{})if err != nil {log.Fatalf("Send error:%v", err)return err}}return nil
}type wrappedStream struct {// 包裝器流grpc.ServerStream
}
// 接受信息攔截器
func (w *wrappedStream) RecvMsg(m interface{}) error {log.Printf("====== [Server Stream Interceptor Wrapper] Receive a message (Type: %T) at %s", m, time.Now().Format(time.RFC3339))return w.ServerStream.RecvMsg(m)
}
// 發送消息攔截器
func (w *wrappedStream) SendMsg(m interface{}) error {log.Printf("====== [Server Stream Interceptor Wrapper] Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))return w.ServerStream.SendMsg(m)
}func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {return &wrappedStream{s}
}func orderServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {// 前置處理log.Println("====== [Server Stream Interceptor] ", info.FullMethod)// 包裝器流調用 流RPCerr := handler(srv, newWrappedStream(ss))if err != nil {log.Printf("RPC failed with error %v", err)}return err
}
func main() {flag.Parse()lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))if err != nil {log.Fatalf("failed to listen: %v", err)}// 開啟rpcs := grpc.NewServer(grpc.StreamInterceptor(orderServerStreamInterceptor))// 注冊服務pb.RegisterGreeterServer(s, &server{})log.Printf("service listening at %v", lis.Addr())if err := s.Serve(lis); err != nil {log.Fatalf("failed to serve: %v", err)}
}

結果

GOROOT=D:\software\Go #gosetup
GOPATH=D:\software\golibrary #gosetup
D:\software\Go\bin\go.exe build -o C:\Users\29071\AppData\Local\JetBrains\GoLand2023.3\tmp\GoLand\___go_build_mygrpc_service_steamInterceptorservice.exe mygrpc/service/steamInterceptorservice #gosetup
C:\Users\29071\AppData\Local\JetBrains\GoLand2023.3\tmp\GoLand\___go_build_mygrpc_service_steamInterceptorservice.exe
2023/12/07 15:07:48 service listening at [::]:50051
2023/12/07 15:08:07 ====== [Server Stream Interceptor]  /hello.Greeter/searchOrders
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Receive a message (Type: *hello.HelloRequest) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 Recved 開始服務端rpc流測試
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
2023/12/07 15:08:07 ====== [Server Stream Interceptor Wrapper] Send a message (Type: *hello.HelloReply) at 2023-12-07T15:08:07+08:00
Process finished with the exit code -1073741510 (0xC000013A: interrupted by Ctrl+C)

客戶端攔截器

在這里插入圖片描述

一元攔截器
func orderUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {// 前置處理邏輯log.Println("Method : " + method)// 調用invoker 執行遠程方法err := invoker(ctx, method, req, reply, cc, opts...)// 后置處理邏輯log.Println(reply)return err
}func main() {flag.Parse()// 與服務建立連接.conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()),grpc.WithUnaryInterceptor(orderUnaryClientInterceptor)) //添加攔截器if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()// 創建指定服務的客戶端c := pb.NewGreeterClient(conn)// 連接服務器并打印出其響應。ctx, cancel := context.WithTimeout(context.Background(), time.Second) // 設置超時時間為一秒defer cancel()// 調用指定方法r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})if err != nil {log.Fatalf("could not greet: %v", err)}log.Printf("Greeting: %s", r.GetMessage())
}

結果

2023/12/07 16:37:28 Method : /hello.Greeter/SayHello
2023/12/07 16:37:28 message:"Hello world"
2023/12/07 16:37:28 Greeting: Hello worl
流攔截`
type wrappedStream struct {grpc.ClientStream
}func (w *wrappedStream) RecvMsg(m interface{}) error {log.Printf("====== [Client Stream Interceptor] Receive a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))return w.ClientStream.RecvMsg(m)
}func (w *wrappedStream) SendMsg(m interface{}) error {log.Printf("====== [Client Stream Interceptor] Send a message (Type: %T) at %v", m, time.Now().Format(time.RFC3339))return w.ClientStream.SendMsg(m)
}func newWrappedStream(s grpc.ClientStream) grpc.ClientStream {return &wrappedStream{s}
}func clientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {// 前置處理邏輯log.Println("======= [Client Interceptor] ", method)// 調用streamer 來獲取客戶端流s, err := streamer(ctx, desc, cc, method, opts...)if err != nil {return nil, err}return newWrappedStream(s), nil
}func main(){// 注冊攔截器到客戶端流conn,err:=grpc.Dial(address,grpc.WithInsecure(),grpc.WithStreamInterceptor(clientStreamInterceptor))if err != nil {log.Fatalf("did not connect: %v", err)}defer conn.Close()c := pb.NewOrderManagementClient(conn)ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)defer cancel()// 調用客戶端流RPC方法searchStream, _ := c.SearchOrders(ctx, &wrapper.StringValue{Value: "Google"})for {searchOrder, err := searchStream.Recv()if err == io.EOF {log.Print("EOF")break}if err == nil {log.Print("Search Result : ", searchOrder)}}
}

結果

2023/12/07 17:10:43 ====== [Client Stream Interceptor] Send a message (Type: *hello.HelloRequest) at 2023-12-07T17:10:43+08:00
2023/12/07 17:10:43 ====== [Client Stream Interceptor] Send a message (Type: *hello.HelloRequest) at 2023-12-07T17:10:43+08:00
2023/12/07 17:10:43 客戶端流傳輸結束

多個攔截器

在grpc中默認的攔截器不可以傳多個,因為在源碼中,存在一些問題

func chainUnaryClientInterceptors(cc *ClientConn) {interceptors := cc.dopts.chainUnaryIntsif cc.dopts.unaryInt != nil {interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)}var chainedInt UnaryClientInterceptorif len(interceptors) == 0 {chainedInt = nil} else if len(interceptors) == 1 {chainedInt = interceptors[0]} else {chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)}}cc.dopts.unaryInt = chainedInt
}

當存在多個攔截器時,取的就是第一個攔截器。因此結論是允許傳多個,但并沒有用。

如果真的需要多個攔截器,可以使用 go-grpc-middleware 提供的 grpc.UnaryInterceptor 和 grpc.StreamInterceptor 鏈式方法。核心方法如下

func ChainUnaryClient(interceptors ...grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {n := len(interceptors)if n > 1 {lastI := n - 1return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {var (chainHandler grpc.UnaryInvokercurI         int)chainHandler = func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error {if curI == lastI {return invoker(currentCtx, currentMethod, currentReq, currentRepl, currentConn, currentOpts...)}curI++err := interceptors[curI](currentCtx, currentMethod, currentReq, currentRepl, currentConn, chainHandler, currentOpts...)curI--return err}return interceptors[0](ctx, method, req, reply, cc, chainHandler, opts...)}}...
}

代碼倉庫

https://github.com/onenewcode/mygrpc.git

也可以直接下載綁定的資源。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/208706.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/208706.shtml
英文地址,請注明出處:http://en.pswp.cn/news/208706.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

iOS app切換后臺時添加模糊遮罩層

仿 支付寶 退出后臺后,App整個 增加模糊遮罩層 此處只介紹 在iOS13后 SceneDelegate 下的操作 原理就是 在 App 進入后臺后 在 主window上添加一個 UIVisualEffectView 在進入前臺后移除 直接上代碼: 先聲明: //先聲明 /* blurView */ property (strong, nonatomic) UI…

逆波蘭表達式求解計算器

利用逆波蘭表達式求解計算器有以下幾個步驟: 1. 去掉字符串中的空格 s s.replaceAll(" ", "")2. 講字符串轉換為中序表達式數組 def string_to_infixlist(s):ans []keep_num ""for i in range(len(s)):if s[i].isdigit():if i < len(s)…

docker學習(四、修改容器創建新的鏡像推送到云上)

鏡像是只讀的&#xff0c;容器是可編輯的。Docker鏡像是分層的&#xff0c;支持通過擴展鏡像&#xff0c;創建新的鏡像。 學到這里感覺docker跟git很想~~ 通過docker commit將修改的容器做成新的鏡像 # 將容器做成新的鏡像 docker commit -m"提交備注" -a"作…

【1day】泛微e-office OA系統sms_page.php接口SQL 注入漏洞學習

注:該文章來自作者日常學習筆記,請勿利用文章內的相關技術從事非法測試,如因此產生的一切不良后果與作者無關。 目錄 一、漏洞描述 二、影響版本 三、資產測繪 四、漏洞復現

大創項目推薦 交通目標檢測-行人車輛檢測流量計數 - 大創項目推薦

文章目錄 0 前言1\. 目標檢測概況1.1 什么是目標檢測&#xff1f;1.2 發展階段 2\. 行人檢測2.1 行人檢測簡介2.2 行人檢測技術難點2.3 行人檢測實現效果2.4 關鍵代碼-訓練過程 最后 0 前言 &#x1f525; 優質競賽項目系列&#xff0c;今天要分享的是 &#x1f6a9; 畢業設計…

什么是Nginx反向代理?Nginx反向代理配置指南

Nginx反向代理是一種常見的服務器架構模式&#xff0c;它可以將客戶端請求轉發到多個后端服務器上&#xff0c;從而實現負載均衡、高可用性和安全性。本文將介紹Nginx反向代理的基本概念和配置方法。 什么是Nginx反向代理&#xff1f; 在傳統的Web服務器架構中&#xff0c;客戶…

解決selenium使用.get()報錯:unknown error: unsupported protocol

解決方法 將原來的&#xff1a; url "https://www.baidu.com" browser.get(url)替換為&#xff1a; url "https://www.baidu.com" browser.execute_script(f"window.location.replace({url});") # 直接平替 .get()問題解析 之前運行都是正…

【后端學前端學習記錄】學習計劃

1、個人背景 寫了足夠久的后端了&#xff0c;常用的語言基本上都接觸過&#xff0c;沒有在工作中寫過前端 一直想做一些前端的工作&#xff0c;但是前端技能不足加上自己審美不行&#xff0c;寫出的界面總是很丑 所以一直對前端做不好&#xff0c;也沒有真正下手。 2、動機 種…

Navicat 技術指引 | 連接 GaussDB 分布式

Navicat Premium&#xff08;16.3.3 Windows 版或以上&#xff09;正式支持 GaussDB 分布式數據庫。GaussDB 分布式模式更適合對系統可用性和數據處理能力要求較高的場景。Navicat 工具不僅提供可視化數據查看和編輯功能&#xff0c;還提供強大的高階功能&#xff08;如模型、結…

SLAM ORB-SLAM2(11)單目初始化

SLAM ORB-SLAM2(11)單目初始化 1. 初始化工作1.1. 單應矩陣(Homography Matrix)1.2. 基礎矩陣(Fundamental Matrix)1.3. 本質矩陣(Essential Matrix)1.4. 初始化過程2. 業務流程2.1. 創建單目初始化器2.2. 判斷連續幀的特征點數目2.3. 在兩幀中找匹配的特征點對2.4. 估…

軟件兼容性測試:保障多樣化用戶體驗的重要功能

隨著移動設備和操作系統的快速發展&#xff0c;軟件兼容性測試變得越發重要。這項測試確保軟件在不同平臺、設備和環境下都能夠正常運行&#xff0c;提供一致而穩定的用戶體驗。下面是軟件兼容性測試中的一些關鍵功能&#xff1a; 1. 跨平臺兼容性測試 在不同操作系統上運行的軟…

【flink番外篇】1、flink的23種常用算子介紹及詳細示例(3)-window、distinct、join等

Flink 系列文章 一、Flink 專欄 Flink 專欄系統介紹某一知識點&#xff0c;并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分&#xff0c;比如術語、架構、編程模型、編程指南、基本的…

macOS Big Sur/Mac電腦安裝vscode顯示您沒有權限來打開應用程序‘Visual Studio Code‘ 請聯系您的電腦或網絡管理員問題修復

錯誤方法 首先我以為我的權限不足。&#xff0c;需要去用戶群組里設置。結果根本不是這個的問題。 1.在系統偏好設置->用戶與群組檢查了一下我的用戶是不是管理員 結果發現是管理員 2.根據蘋果提示&#xff0c;右鍵我的文件夾->顯示簡介->最下面的共享與權限 解鎖&…

SAP UI5 walkthrough step5 Controllers

在這個章節&#xff0c;我們要做的是&#xff0c;將之前的text文本展示為一個按鈕&#xff0c;并將聲明綁定在點擊按鈕事件。 因為改的是外觀&#xff0c;所以我們修改的是view.XML webapp/view/App.view.xml <mvc:ViewcontrollerName"ui5.walkthrough.controller.A…

element中el-select多選v-model是對象數組

文章目錄 一、問題二、解決三、最后 一、問題 element中的el-select的v-model一般都是字符串或者字符串數組&#xff0c;但是有些時候后端接口要求該字段要傳對象或者對象數組&#xff0c;如果再轉換一次數據&#xff0c;對于保存配置和回顯都是吃力不討好的事情。如下所示&am…

SpringBoot 項目將jar 部署在服務器引用外部 配置文件

SpringBoot 官方給出了四種方式引用外部配置文件的方式 在jar包的同一目錄下建一個config文件夾&#xff0c;然后把配置文件放到這個文件夾下(最常用)直接把配置文件放到jar包的同級目錄在classpath下建一個config文件夾&#xff0c;然后把配置文件放進去在classpath下直接放配…

圖片整理

Lily上課時使用字母數字圖片教小朋友們學習英語單詞&#xff0c;每次都需要把這些圖片按照大小&#xff08;ASCII碼值從小到大&#xff09;排列收好。請大家給Lily幫忙&#xff0c;通過代碼解決。 Lily使用的圖片使用字符"A"到"Z"、“a"到"z”、…

centos7做gitlab數據災備項目地址指向問題

如果你在 CentOS 7 上使用 GitLab 時&#xff0c;它回復的數據指向了另一個服務器的地址&#xff0c;可能是因為配置文件中的一些設置不正確。 要解決這個問題&#xff0c;可以嘗試以下幾個步驟&#xff1a; 檢查 GitLab 配置文件&#xff1a;打開 GitLab 的配置文件&#xf…

python寫數據進es中

1、自定義inde為&#xff1a;xxxx&#xff0c;data_to_insert也可以自定義函數 from elasticsearch import Elasticsearch from datetime import datetime, timedelta es Elasticsearch([http://es地址1:9200, es地址2:9200, es地址3:9200]) current_date datetime.now() for…

NTP時鐘同步服務器(校時服務器)技術參數分享

NTP時鐘同步服務器&#xff08;校時服務器&#xff09;技術參數分享 網絡校時服務器是一款先進的智能化高精度時鐘同步設備。 網絡校時服務器從 GPS、北斗、GLONASS、Galileo等導航定位衛星系統上獲取標準時間信息&#xff0c;并通過 NTP/SNTP 或其他網絡協議&#xff0c;在網絡…