基于 etcd 實現的服務發現,按照非規范化的 etcd key 實現,詳細見代碼注釋。
package discoveryimport ("context""encoding/json""fmt""go.etcd.io/etcd/api/v3/mvccpb"clientv3 "go.etcd.io/etcd/client/v3""google.golang.org/grpc/resolver""strings""time"
)// gRPC 的服務一般會使用 protobuf 作為數據傳輸的介質
// gRPC 服務定義在 proto 的文件中,例如:service RoutingService {}
// protoc 將 proto 后綴文件轉為 go 文件,文件內自動生成了 gRPC 服務的描述信息、服務注冊的函數、客戶端聲明的函數等內容
// 如下,它們的格式是固定的,注意函數的參數
// 服務描述信息:RoutingService_ServiceDesc,格式:服務名_ServiceDesc
// 服務注冊函數:RegisterRoutingServiceServer,格式:Register你服務名Server
// 客戶端聲明函數:NewRoutingServiceClient,格式:New服務名Client
// 其中客戶端聲明函數的參數是 gRPC 連接,返回值是 gRPC 服務的客戶端接口,這樣就可以調用客戶端接口定義的 rpc 方法了
// gRPC 連接不會與某個 gRPC 服務綁定,它只是一個連接。
// 獲取 gRPC 連接的方式如下兩種,第一個參數就是 gRPC 服務的地址,可以寫死 ip + port,也可以使用服務發現來獲取 gRPC 服務的地址。
// grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName))
// grpc.Dial(fmt.Sprintf("%s:///%s", scheme, serviceName))(廢棄)
// 服務發現是實現 Builder 和 Resolver 接口,Builder 用于創建 Resolver 實例,Resolver 用于解析服務地址。
// Builder 的 Scheme 方法返回值是 與 grpc.NewClient 中的 scheme 對應
// Builder 的 Build 第一個參數 target.Endpoint() 得到的結果是 grpc.NewClient 中的 serviceName,Build 方法的觸發分情況:
// grpc.NewClient 聲明不會觸發 Build 方法,首次調用 rpc 方法時觸發 Build
// grpc.Dial 聲明會觸發 Build 方法,但已經廢棄了
// Resolver 的 ResolveNow 方法是 gRPC 主動調用的,我們可以使用它動態去 etcd 中獲取服務地址,也可以不實現它,自定義服務發現的邏輯// 服務發現的實現方式:
// 假如我們有三個應用,user-center、device-center、網關,user-center 和 device-center 暴露了很多 gRPC 服務,網關需要調用它們的服務
// 假如我們使用 etcd 作為注冊中心,同時規范化 etcd 的 key ,例如:grpc/services/{serviceName}/實例ID
// grpc/services/user-center/實例1
// grpc/services/user-center/實例2
// grpc/services/device-center/實例1
// grpc/services/device-center/實例2
// 網關中分別實現 Builder 和 Resolver,并將 Builder 的實例注冊在 grpc 的地址解析中,resolver.Register(Builder實現的實例)
// 獲取 user-center 和 device-center 的 grpc 連接
// grpc.NewClient(fmt.Sprintf("%s:///%s", "grpc", "user-center"))
// grpc.NewClient(fmt.Sprintf("%s:///%s", "grpc", "device-center"))
// 當 gRPC 連接建立時,gRPC 會調用 Builder 的 Build 方法,我們獲取 target.Endpoint() 就是 serviceName
// 這樣 fmt.Sprintf("grpc/services/%s", serviceName) 獲取 serviceName 的 etcd 的 key 前綴
// 如:grpc/services/user-center/
// Build 方法中按前綴匹配查詢 etcd 的數據,這樣就獲取到了 user-center 的所有實例的地址,再同步到 Resolver 中
// 如上就實現了規范化 etcd 的 key 前綴的服務發現,不管有多少個應用,代碼中只需要一個服務發現的實例// 如果沒有規范化 etcd 的 key 前綴,那么我們需要為各個服務聲明不同的 scheme,每個 scheme 對應一個服務發現的實例
// Builder 的實現必須包含 etcd 的 key 前綴 ,不能利用 target.Endpoint() 去實現服務發現
// 如:ServiceDiscovery 實現了 Builder
// type ServiceDiscovery struct {
// serverKey string
// }
// grpc/services/user-center/ 固定寫死賦值給 serverKey
// 聲明 ServiceDiscovery { serverKey },注冊 resolver.Register(ServiceDiscovery實例)
// grpc.NewClient(fmt.Sprintf("%s:///%s", "user", "user-center"))// 普通 rpc 調用時,服務端掛掉:
// 服務發現找不到數據時:rpc error: code = Unavailable desc = no children to pick from
// 服務掛掉但etcd/服務發現還有數據:rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 10.202.160.190:6888: connect: connection refused"
// 服務重啟后客戶端連接可以恢復
// 當某個服務節點不可用時,可以自動連接到可用的服務節點// 流式 rpc 調用,服務端掛掉:
// 客戶端發送方:EOF
// 客戶端接收方:rpc error: code = Unavailable desc = error reading from server: EOF
// 服務重啟后客戶端連接不可恢復,必須重新建立連接// ServiceDiscovery is a gRPC resolver that uses etcd for service discovery.
// 配合 grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName) 來使用
// Build 方法的 target.Endpoint() 就是 serviceName
type ServiceDiscovery struct {scheme string // 自定義的 grpc 服務的 scheme,例如:grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName)serviceKey string // etcd 中服務的 key 前綴,例如:grpc/maxwell-ai/GatewayInfoService/1.0/etcdClient *clientv3.Client
}// ServiceResolver is a gRPC resolver that resolves service addresses from etcd.
// 一個 scheme 對應一個 ServiceResolver,當 grpc 建立連接時觸發 ServiceDiscovery 的 Build 方法
// 注意:
// grpc.NewClient 不會觸發 Build 方法
// grpc.Dial 會觸發 Build 方法,但已經廢棄了
type ServiceResolver struct {scheme stringserviceKey stringtarget resolver.Targetclient *clientv3.Clientcc resolver.ClientConnaddrMap map[string]resolver.Addressclosed chan struct{}
}func NewServiceDiscovery(scheme string, serviceKey string, etcdClient *clientv3.Client) *ServiceDiscovery {return &ServiceDiscovery{scheme: scheme,serviceKey: serviceKey,etcdClient: etcdClient,}
}// Build creates a new ServiceDiscovery resolver.
// grpc.NewClient 不會觸發 Build 方法
// grpc.Dial 會觸發 Build 方法,但已經廢棄了
// target: grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName) 中的 serviceName 就是 target
func (s *ServiceDiscovery) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {// 創建服務解析器serviceResolver := &ServiceResolver{target: target,cc: cc,scheme: s.scheme,serviceKey: s.serviceKey,client: s.etcdClient,closed: make(chan struct{}),addrMap: make(map[string]resolver.Address),}// 首次拉取所有數據if err := serviceResolver.rePull(); err != nil {return nil, err}// 開啟 watcher 監聽 etcd 中的服務地址變化go serviceResolver.watcher()return serviceResolver, nil
}// Scheme returns the scheme of the resolver.
// scheme 是 grpc.NewClient(fmt.Sprintf("%s:///%s", scheme, serviceName)
func (s *ServiceDiscovery) Scheme() string {return s.scheme
}// ResolveNow is called by gRPC to resolve the service address immediately.
// grpc 主動調用去解析服務地址,這里可以實現從 etcd 獲取服務地址的邏輯
// 但是不在這里實現,因為這里實現有同步和異步從 etcd 中查詢數據
// 同步會阻塞
// 異步會開啟很多 goroutine,可能會導致 goroutine 泄漏
func (s *ServiceResolver) ResolveNow(options resolver.ResolveNowOptions) {}func (s *ServiceResolver) Close() {close(s.closed)
}func (s *ServiceResolver) rePull() error {ctx, cancelFunc := context.WithTimeout(context.Background(), 5*time.Second)defer cancelFunc()resp, err := s.client.Get(ctx, s.serviceKey, clientv3.WithPrefix())if err != nil {return err}s.addrMap = make(map[string]resolver.Address)for _, ev := range resp.Kvs {key := strings.TrimPrefix(string(ev.Key), s.serviceKey)s.addServer(key, ev.Value)}s.syncToGrpc()return nil
}func (s *ServiceResolver) addServer(key string, value []byte) {var si ServiceInfoif err := json.Unmarshal(value, &si); err != nil {return}s.addrMap[key] = resolver.Address{Addr: fmt.Sprintf("%s:%d", si.Ip, si.Port),}
}func (s *ServiceResolver) delServer(key string) {if _, ok := s.addrMap[key]; ok {delete(s.addrMap, key)}
}func (s *ServiceResolver) syncToGrpc() {addrSlice := make([]resolver.Address, 0, 10)for _, v := range s.addrMap {addrSlice = append(addrSlice, v)}err := s.cc.UpdateState(resolver.State{Addresses: addrSlice})if err != nil {return}
}func (s *ServiceResolver) watcher() {rePull := falsefor {select {case <-s.closed:returndefault:}if rePull {if err := s.rePull(); err != nil {time.Sleep(5 * time.Second)continue}}rch := s.client.Watch(context.Background(), s.serviceKey, clientv3.WithPrefix())loop:for {select {case <-s.closed:returncase resp, ok := <-rch:if !ok {rePull = truebreak loop}for _, ev := range resp.Events {key := strings.TrimPrefix(string(ev.Kv.Key), s.serviceKey)switch ev.Type {case mvccpb.PUT:s.addServer(key, ev.Kv.Value)case mvccpb.DELETE:s.delServer(key)}}s.syncToGrpc()}}}
}