gRPC服務發現

基于 etcd 實現的服務發現,按照非規范化的 etcd key 實現,詳細見代碼注釋。

package discoveryimport ("context""encoding/json""fmt""go.etcd.io/etcd/api/v3/mvccpb"clientv3 "go.etcd.io/etcd/client/v3""google.golang.org/grpc/resolver""strings""time"
)// gRPC 的服務一般會使用 protobuf 作為數據傳輸的介質
// gRPC 服務定義在 proto 的文件中,例如:service RoutingService {}
// protoc 將 proto 后綴文件轉為 go 文件,文件內自動生成了 gRPC 服務的描述信息、服務注冊的函數、客戶端聲明的函數等內容
// 如下,它們的格式是固定的,注意函數的參數
// 服務描述信息:RoutingService_ServiceDesc,格式:服務名_ServiceDesc
// 服務注冊函數:RegisterRoutingServiceServer,格式:Register你服務名Server
// 客戶端聲明函數:NewRoutingServiceClient,格式:New服務名Client
// 其中客戶端聲明函數的參數是 gRPC 連接,返回值是 gRPC 服務的客戶端接口,這樣就可以調用客戶端接口定義的 rpc 方法了
// gRPC 連接不會與某個 gRPC 服務綁定,它只是一個連接。
// 獲取 gRPC 連接的方式如下兩種,第一個參數就是 gRPC 服務的地址,可以寫死 ip + port,也可以使用服務發現來獲取 gRPC 服務的地址。
// grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName))
// grpc.Dial(fmt.Sprintf("%s:///%s", scheme, serviceName))(廢棄)
// 服務發現是實現 Builder 和 Resolver 接口,Builder 用于創建 Resolver 實例,Resolver 用于解析服務地址。
// Builder 的 Scheme 方法返回值是 與 grpc.NewClient 中的 scheme 對應
// Builder 的 Build 第一個參數 target.Endpoint() 得到的結果是 grpc.NewClient 中的 serviceName,Build 方法的觸發分情況:
// grpc.NewClient 聲明不會觸發 Build 方法,首次調用 rpc 方法時觸發 Build
// grpc.Dial 聲明會觸發 Build 方法,但已經廢棄了
// Resolver 的 ResolveNow 方法是 gRPC 主動調用的,我們可以使用它動態去 etcd 中獲取服務地址,也可以不實現它,自定義服務發現的邏輯// 服務發現的實現方式:
// 假如我們有三個應用,user-center、device-center、網關,user-center 和 device-center 暴露了很多 gRPC 服務,網關需要調用它們的服務
// 假如我們使用 etcd 作為注冊中心,同時規范化 etcd 的 key ,例如:grpc/services/{serviceName}/實例ID
// grpc/services/user-center/實例1
// grpc/services/user-center/實例2
// grpc/services/device-center/實例1
// grpc/services/device-center/實例2
// 網關中分別實現 Builder 和 Resolver,并將 Builder 的實例注冊在 grpc 的地址解析中,resolver.Register(Builder實現的實例)
// 獲取 user-center 和 device-center 的 grpc 連接
// grpc.NewClient(fmt.Sprintf("%s:///%s", "grpc", "user-center"))
// grpc.NewClient(fmt.Sprintf("%s:///%s", "grpc", "device-center"))
// 當 gRPC 連接建立時,gRPC 會調用 Builder 的 Build 方法,我們獲取 target.Endpoint() 就是 serviceName
// 這樣 fmt.Sprintf("grpc/services/%s", serviceName) 獲取 serviceName 的 etcd 的 key 前綴
// 如:grpc/services/user-center/
// Build 方法中按前綴匹配查詢 etcd 的數據,這樣就獲取到了 user-center 的所有實例的地址,再同步到 Resolver 中
// 如上就實現了規范化 etcd 的 key 前綴的服務發現,不管有多少個應用,代碼中只需要一個服務發現的實例// 如果沒有規范化 etcd 的 key 前綴,那么我們需要為各個服務聲明不同的 scheme,每個 scheme 對應一個服務發現的實例
// Builder 的實現必須包含 etcd 的 key 前綴 ,不能利用 target.Endpoint() 去實現服務發現
// 如:ServiceDiscovery 實現了 Builder
// type ServiceDiscovery struct {
//		serverKey string
// }
// grpc/services/user-center/ 固定寫死賦值給 serverKey
// 聲明 ServiceDiscovery { serverKey },注冊 resolver.Register(ServiceDiscovery實例)
// grpc.NewClient(fmt.Sprintf("%s:///%s", "user", "user-center"))// 普通 rpc 調用時,服務端掛掉:
// 服務發現找不到數據時:rpc error: code = Unavailable desc = no children to pick from
// 服務掛掉但etcd/服務發現還有數據:rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 10.202.160.190:6888: connect: connection refused"
// 服務重啟后客戶端連接可以恢復
// 當某個服務節點不可用時,可以自動連接到可用的服務節點// 流式 rpc 調用,服務端掛掉:
// 客戶端發送方:EOF
// 客戶端接收方:rpc error: code = Unavailable desc = error reading from server: EOF
// 服務重啟后客戶端連接不可恢復,必須重新建立連接// ServiceDiscovery is a gRPC resolver that uses etcd for service discovery.
// 配合 grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName) 來使用
// Build 方法的 target.Endpoint() 就是 serviceName
type ServiceDiscovery struct {scheme     string // 自定義的 grpc 服務的 scheme,例如:grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName)serviceKey string // etcd 中服務的 key 前綴,例如:grpc/maxwell-ai/GatewayInfoService/1.0/etcdClient *clientv3.Client
}// ServiceResolver is a gRPC resolver that resolves service addresses from etcd.
// 一個 scheme 對應一個 ServiceResolver,當 grpc 建立連接時觸發 ServiceDiscovery 的 Build 方法
// 注意:
// grpc.NewClient 不會觸發 Build 方法
// grpc.Dial 會觸發 Build 方法,但已經廢棄了
type ServiceResolver struct {scheme     stringserviceKey stringtarget  resolver.Targetclient  *clientv3.Clientcc      resolver.ClientConnaddrMap map[string]resolver.Addressclosed  chan struct{}
}func NewServiceDiscovery(scheme string, serviceKey string, etcdClient *clientv3.Client) *ServiceDiscovery {return &ServiceDiscovery{scheme:     scheme,serviceKey: serviceKey,etcdClient: etcdClient,}
}// Build creates a new ServiceDiscovery resolver.
// grpc.NewClient 不會觸發 Build 方法
// grpc.Dial 會觸發 Build 方法,但已經廢棄了
// target: grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName) 中的 serviceName 就是 target
func (s *ServiceDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {// 創建服務解析器serviceResolver := &ServiceResolver{target:     target,cc:         cc,scheme:     s.scheme,serviceKey: s.serviceKey,client:     s.etcdClient,closed:     make(chan struct{}),addrMap:    make(map[string]resolver.Address),}// 首次拉取所有數據if err := serviceResolver.rePull(); err != nil {return nil, err}// 開啟 watcher 監聽 etcd 中的服務地址變化go serviceResolver.watcher()return serviceResolver, nil
}// Scheme returns the scheme of the resolver.
// scheme 是 grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName)
func (s *ServiceDiscovery) Scheme() string {return s.scheme
}// ResolveNow is called by gRPC to resolve the service address immediately.
// grpc 主動調用去解析服務地址,這里可以實現從 etcd 獲取服務地址的邏輯
// 但是不在這里實現,因為這里實現有同步和異步從 etcd 中查詢數據
// 同步會阻塞
// 異步會開啟很多 goroutine,可能會導致 goroutine 泄漏
func (s *ServiceResolver) ResolveNow(options resolver.ResolveNowOptions) {}func (s *ServiceResolver) Close() {close(s.closed)
}func (s *ServiceResolver) rePull() error {ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)defer cancelFunc()resp, err := s.client.Get(ctx, s.serviceKey, clientv3.WithPrefix())if err != nil {return err}s.addrMap = make(map[string]resolver.Address)for _, ev := range resp.Kvs {key := strings.TrimPrefix(string(ev.Key), s.serviceKey)s.addServer(key, ev.Value)}s.syncToGrpc()return nil
}func (s *ServiceResolver) addServer(key string, value []byte) {var si ServiceInfoif err := json.Unmarshal(value, &si); err != nil {return}s.addrMap[key] = resolver.Address{Addr: fmt.Sprintf("%s:%d", si.Ip, si.Port),}
}func (s *ServiceResolver) delServer(key string) {if _, ok := s.addrMap[key]; ok {delete(s.addrMap, key)}
}func (s *ServiceResolver) syncToGrpc() {addrSlice := make([]resolver.Address, 0, 10)for _, v := range s.addrMap {addrSlice = append(addrSlice, v)}err := s.cc.UpdateState(resolver.State{Addresses: addrSlice})if err != nil {return}
}func (s *ServiceResolver) watcher() {rePull := falsefor {select {case <-s.closed:returndefault:}if rePull {if err := s.rePull(); err != nil {time.Sleep(5 * time.Second)continue}}rch := s.client.Watch(context.Background(), s.serviceKey, clientv3.WithPrefix())loop:for {select {case <-s.closed:returncase resp, ok := <-rch:if !ok {rePull = truebreak loop}for _, ev := range resp.Events {key := strings.TrimPrefix(string(ev.Kv.Key), s.serviceKey)switch ev.Type {case mvccpb.PUT:s.addServer(key, ev.Kv.Value)case mvccpb.DELETE:s.delServer(key)}}s.syncToGrpc()}}}
}

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/87429.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/87429.shtml
英文地址,請注明出處:http://en.pswp.cn/web/87429.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

基于Linux的Spark本地模式環境搭建實驗指南

一、實驗目的 掌握Spark本地模式的安裝與配置方法驗證Spark本地環境是否搭建成功了解Spark基本操作和運行原理 二、實驗環境準備 操作系統&#xff1a;Linux&#xff08;推薦ubuntu&#xff09;Java環境&#xff1a;JDK 1.8或以上版本內存&#xff1a;至少4GB&#xff08;推…

數學建模_時間序列

什么是時間序列時間序列預測方法/模型條件&#xff1a;非白噪音平穩平穩性評估不平穩變成平穩然后用ARIMA模型確定p,qAR模型(ARMA特例)MA模型(ARMA特例)ARMA模型(普適)灰色模型神經網絡/LSTM組合預測模型向量數據預測結果和為1的情況什么是時間序列 省略具體圖形例子 時間序列…

linux用rpm包升級sudo包為sudo-1.9.17-2版本

rpm下載地址&#xff1a; https://www.sudo.ws/dist/packages/1.9.17p1/ 備注&#xff1a;其他壓縮包下載地址&#xff1a;https://www.sudo.ws/download.html sudo-1.9.17-2.el7.x86_64.rpm 檢查一下&#xff0c;本地sudo版本&#xff0c;執行&#xff1a;sudo -V 或者sudo -…

【開源項目】一款真正可修改視頻MD5工具視頻質量不損失

文章目錄 視頻MD5修改工具 ???? 目錄? 功能特點?? 系統要求??? 設計架構?? 技術原理?? 核心代碼1. 視頻MD5修改核心邏輯2. 前端異步處理代碼3. 錯誤處理與日志記錄?? 安裝方法方法一:直接下載方法二:使用本地服務器?? 使用教程基本使用步驟高級使用技巧??…

Day05: Python 中的并發和并行(1)

理解 Python 中的線程和進程 理解線程和進程是實現在 Python 中并發和并行的基礎。這種知識使你能夠編寫能夠看似同時執行多個任務的程序&#xff0c;從而提高性能和響應能力。本課程將深入探討線程和進程的核心概念、它們的區別&#xff0c;以及它們如何為更高級的并發技術奠…

Spring Boot 集成 MinIO 實現分布式文件存儲與管理

Spring Boot 集成 MinIO 實現分布式文件存儲與管理 一、MinIO 簡介 MinIO 是一個高性能的分布式對象存儲服務器&#xff0c;兼容 Amazon S3 API。它具有以下特點&#xff1a; 輕量級且易于部署高性能&#xff08;讀寫速度可達每秒數GB&#xff09;支持數據加密和訪問控制提供…

從小白入門,基于Cursor開發一個前端小程序之Cursor 編程實踐與案例分析

Cursor 編程實踐與案例分析 Cursor 編程實踐與案例分析 1. 什么是 Cursor&#xff1f; Cursor 是一款面向開發者的 AI 編程助手&#xff0c;集成于本地 IDE&#xff0c;支持自然語言與代碼的無縫協作。它不僅能自動補全、重構、查找代碼&#xff0c;還能理解業務上下文&#…

一、如何用MATLAB畫一個三角形 代碼

一、如何用MATLAB畫一個三角形 代碼在MATLAB中繪制三角形可以通過指定三個頂點的坐標并使用 fill 或 patch 函數實現。以下是詳細代碼示例&#xff1a;方法1&#xff1a;使用 fill 函數&#xff08;簡單填充&#xff09;% 定義三角形的三個頂點坐標 (x, y) x [0, 1, 0.5]; % …

Postman自動化測試提取相應body體中的參數

文章目錄Postman自動化測試提取相應body體中的參數1. 示例響應 Body 參數2. 提取響應 Body 參數Postman自動化測試提取相應body體中的參數 上一篇的文中介紹了使用postman自動化測試時從響應的header中提取token參數&#xff0c;很多同學私信問如何從響應體body中提取參數。 有…

vue-39(為復雜 Vue 組件編寫單元測試)

實際練習:為復雜 Vue 組件編寫單元測試 單元測試對于確保復雜 Vue 組件的可靠性和可維護性至關重要。通過隔離和測試代碼的各個單元,您可以在開發過程的早期發現并修復錯誤,從而構建更健壯和可預測的應用程序。本課程重點介紹為復雜 Vue 組件編寫單元測試的實用方面,建立在…

c語言中的函數IV

函數的先后關系 直接把函數放在程序上方是可以的 在實際開發中&#xff0c;我們更希望把main函數放在前面 這樣子直接把自己定義的函數放在main函數下方&#xff0c;編譯會出現warning和error正確的解決方案是&#xff1a;把函數的頭放到main函數上方&#xff0c;這樣就能正常…

大模型Decoder-Only深入解析

Decoder-Only整體結構 我們以模型Llama-3.1-8B-Instruct為例&#xff0c;打印其結構如下&#xff08;后面會慢慢解析每一部分&#xff0c;莫慌&#xff09;&#xff1a; LlamaForCausalLM((model): LlamaModel((embed_tokens): VocabParallelEmbedding(num_embeddings128256,…

web網頁,在線%電商,茶葉,商城,網上商城系統%分析系統demo,于vscode,vue,java,jdk,springboot,mysql數據庫

經驗心得 這也是幫之前一客戶加了幾個功能&#xff0c;需要掌握crud&#xff0c;前后端開發&#xff0c;前后端怎么對接&#xff0c;前后端通訊是以那種格式&#xff0c;把這些掌握后咱們就可以進行網站開發了。后端記好一定要分層開發&#xff0c;不要像老早一起所有代碼寫到一…

MybatisPlus-05.核心功能-條件構造器

一.條件構造器 我們前面使用的MP功能主要是根據id進行操作的&#xff0c;并未涉及到復雜查詢。而根據id所進行的增刪改查操作在MP中都有直接的封裝。但是遇到復雜的查詢條件時&#xff0c;如何使用MP進行操作是我們要考慮的問題。因此MP為我們提供了條件構造器。 在BaseMapper…

ES6從入門到精通:常用知識點

變量聲明ES6引入了let和const替代var。let用于聲明可變的塊級作用域變量&#xff0c;const用于聲明不可變的常量。塊級作用域有效避免了變量提升和污染全局的問題。let name Alice; const PI 3.1415;箭頭函數箭頭函數簡化了函數寫法&#xff0c;且自動綁定當前上下文的this值…

51單片機教程(十一)- 單片機定時器

11、單片機定時器 項目目標 通過定時器/計數器實現流水燈控制。知識要點 定時器的結構。TMOD和TCON;定時/計數器工作方式;定時/計數器編程步驟;1、項目分析 前面的流水燈的時間控制通過空循環語句來實現,定時不是很精確。本章通過用定時器來控制流水燈任務可以實現精確的時…

基于opencv的疲勞駕駛監測系統

博主介紹&#xff1a;java高級開發&#xff0c;從事互聯網行業多年&#xff0c;熟悉各種主流語言&#xff0c;精通java、python、php、爬蟲、web開發&#xff0c;已經做了多年的畢業設計程序開發&#xff0c;開發過上千套畢業設計程序&#xff0c;沒有什么華麗的語言&#xff0…

Vue 2 和 Vue 3 區別

1. 響應式系統原理 Vue 2&#xff1a;利用Object.defineProperty()實現屬性攔截。存在局限性&#xff0c;無法自動監測對象屬性增減&#xff0c;需用Vue.set/delete&#xff1b;數組變異方法要重寫&#xff1b;深層對象遞歸轉換性能差。Vue 3&#xff1a;采用 ES6 Proxy代理對…

mv重命名報錯:-bash:syntax error near unexpected token ‘(‘

文章目錄 一、報錯背景二、解決方法2.1、方法一&#xff1a;文件名加引號2.2、方法二&#xff1a;特殊字符前加\進行轉義 一、報錯背景 在linux上對一文件執行重命名時報錯。原因是該文件名包含空格與括號。 文件名如下&#xff1a; aa &#xff08;1).txt執行命令及報錯如下…

AWS 開源 Strands Agents SDK,簡化 AI 代理開發流程

最近&#xff0c;亞馬遜網絡服務&#xff08;AWS&#xff09;宣布推出 Strands Agents(https://github.com/strands-agents/sdk-python)&#xff0c;這一開源軟件開發工具包&#xff08;SDK&#xff09;采用模型驅動的方法&#xff0c;助力開發者僅用數行代碼即可構建并運行人工…