前話
PolarisMesh(北極星)是騰訊開源的服務治理平臺,致力于解決分布式和微服務架構中的服務管理、流量管理、配置管理、故障容錯和可觀測性問題,針對不同的技術棧和環境提供服務治理的標準方案和最佳實踐。
PolarisMesh 官網:https://polarismesh.cn/#/
PolarisMesh Github:https://github.com/polarismesh/polaris
Polaris-server 作為 PolarisMesh 的控制面,該進程主要負責服務數據、配置數據、治理規則的管理以及下發至北極星 SDK 以及實現了 xDS 的客戶端。
Polaris-server 是如何處理客戶端的服務注冊請求的呢?服務數據是怎么存儲的呢?帶著這個疑問,我們來探究看下 Polaris-server 的啟動流程,看看北極星是實現的。
前期準備
- golang 環境,需要1.17.x +
- 準備 vscode 或者 goland
- 從 github 中 clone 一份 polaris-server 的源碼,這里推薦從 release-vx.y.z 分支中選擇一個分支進行學習,以下文章將基于 release-v1.12.0 分支進行研究。
- 從 github 中 clone 一份 polaris-java 的源碼,這里推薦從 release-vx.y.z 分支中選擇一個分支進行學習,以下文章將基于release-v1.10.0分支進行研究。
正題
注冊數據模型
polaris-java
InstanceRegisterRequest request = new InstanceRegisterRequest();
// 設置實例所屬服務信息
request.setService(service);
// 設置實例所屬服務的命名空間信息
request.setNamespace(namespace);
// 設置實例的 host 信息
request.setHost(host);
// 設置實例的端口信息
request.setPort(port);
// 可選,資源訪問Token,即用戶/用戶組訪問憑據,僅當服務端開啟客戶端鑒權時才需配置
request.setToken(token);
// 設置實例版本
request.setVersion(version);
// 設置實例的協議
request.setProtocol(protocol);
// 設置實例權重
request.setWeight(weight);
// 設置實例的標簽
request.setMetadata(metadata);
// 設置實例地理位置 zone 信息
request.setZone(zone);
// 設置實例地理位置 region 信息
request.setRegion(region);
// 設置實例地理位置 campus 信息
request.setCampus(campus);
// ttl超時時間,如果節點要調用heartbeat上報,則必須填寫,否則會400141錯誤碼,單位:秒
request.setTtl(ttl);
polaris-go
// InstanceRegisterRequest 注冊服務請求
type InstanceRegisterRequest struct {// 必選,服務名Service string// 必選,命名空間Namespace string// 必選,服務監聽host,支持IPv6地址Host string// 必選,服務實例監聽portPort int// 可選,資源訪問Token,即用戶/用戶組訪問憑據,僅當服務端開啟客戶端鑒權時才需配置ServiceToken string// 以下字段可選,默認nil表示客戶端不配置,使用服務端配置// 服務協議Protocol *string// 服務權重,默認100,范圍0-10000Weight *int// 實例提供服務版本號Version *string// 用戶自定義metadata信息Metadata map[string]string// 該服務實例是否健康,默認健康Healthy *bool// 該服務實例是否隔離,默認不隔離Isolate *bool// ttl超時時間,如果節點要調用heartbeat上報,則必須填寫,否則會400141錯誤碼,單位:秒TTL *int// Location 當前注冊實例的地理位置信息,主要用于就近路由Location *Location// 可選,單次查詢超時時間,默認直接獲取全局的超時配置// 用戶總最大超時時間為(1+RetryCount) * TimeoutTimeout *time.Duration// 可選,重試次數,默認直接獲取全局的超時配置RetryCount *int
}
客戶端發起注冊請求
可以先通過官方的 SDK 使用手冊來看看是如何使用SDK的服務注冊。
- polaris-java 服務注冊功能使用
https://polarismesh.cn/docs/%E4%BD%BF%E7%94%A8%E6%8C%87%E5%8D%97/java%E5%BA%94%E7%94%A8%E5%BC%80%E5%8F%91/sdk/%E6%9C%8D%E5%8A%A1%E6%B3%A8%E5%86%8C%E5%8F%91%E7%8E%B0/
- polaris-go 服務注冊功能使用
https://polarismesh.cn/docs/%E4%BD%BF%E7%94%A8%E6%8C%87%E5%8D%97/go%E5%BA%94%E7%94%A8%E5%BC%80%E5%8F%91/sdk/%E6%9C%8D%E5%8A%A1%E6%B3%A8%E5%86%8C%E5%8F%91%E7%8E%B0/
這里我們已 polaris-java 為例,看看 polaris-java 如何將服務實例注冊請求發送至北極星服務端。
發起注冊請求
ProviderAPI providerAPI = DiscoveryAPIFactory.createProviderAPI();
InstanceRegisterRequest registerRequest = new InstanceRegisterRequest();
// 初始化服務實例注冊信息
...
InstanceRegisterResponse registerResp = providerAPI.registerInstance(registerRequest);
通過這個簡單示例代碼可知,服務實例的注冊動作是通過 polaris-java 中 ProviderAPI(負責服務實例注冊相關方法的調用) 的 registerInstance 方法完成。
通過 IDEA 或者 vscode,查看這個方法的具體實現:
@Override
public InstanceRegisterResponse registerInstance(InstanceRegisterRequest req) throws PolarisException {if (req.getTtl() == null) {req.setTtl(DEFAULT_INSTANCE_TTL);}return registerFlow.registerInstance(req, this::doRegister, this::heartbeat);
}
當調用 providerAPI.registerInstance 后,SDK 內部會自動設置實例的 TTL 周期,然后交由 RegisterFlow 這個負責注冊動作的流程編排者執行。因此接著看看這個 RegisterFlow 的定義。
public class RegisterFlow {// 異步注冊header keyprivate static final String HEADER_KEY_ASYNC_REGIS = "async-regis";// 最大連續心跳失敗閾值private static final int HEARTBEAT_FAIL_COUNT_THRESHOLD = 2;// SDK 上下文,包括SDK配置、SDK插件實例管理、內部任務流程編排等等private final SDKContext sdkContext;// 發送實例心跳private final ScheduledThreadPoolExecutor asyncRegisterExecutor;...
}
其實 RegisterFlow 就干兩件事件:
-
發起實例注冊動作
-
內部維護每個實例的心跳上報
public InstanceRegisterResponse registerInstance(InstanceRegisterRequest request, RegisterFunction registerFunction,HeartbeatFunction heartbeatFunction) {// 將注冊請求發送至北極星服務端InstanceRegisterResponse instanceRegisterResponse = registerFunction.doRegister(request,createRegisterV2Header());// 當前實例的注冊管理狀態進行本地保存RegisterState registerState = RegisterStateManager.putRegisterState(sdkContext, request);if (registerState != null) {// 首次存入實例的注冊狀態時,為該實例注冊創建定期心跳上報動作任務registerState.setTaskFuture(asyncRegisterExecutor.scheduleWithFixedDelay(() -> doRunHeartbeat(registerState, registerFunction, heartbeatFunction), request.getTtl(),request.getTtl(), TimeUnit.SECONDS));}return instanceRegisterResponse;
}
來看看 registerFunction.doRegister 的主要流程以及如何將請求發送到服務端。
// com.tencent.polaris.discovery.client.flow.RegisterFlow#registerInstance
private InstanceRegisterResponse doRegister(InstanceRegisterRequest req, Map<String, String> customHeader) {checkAvailable("ProviderAPI");Validator.validateInstanceRegisterRequest(req);// 填充注冊實例的地理位置信息enrichInstanceLocation(req);...// 調用協議插件,發起網絡調用CommonProviderResponse response = serverConnector.registerInstance(request, customHeader);...
}// com.tencent.polaris.plugins.connector.grpc.GrpcConnector#registerInstance
public CommonProviderResponse registerInstance(CommonProviderRequest req, Map<String, String> customHeader)throws PolarisException {...try {waitDiscoverReady();// 從連接池中獲取一個鏈接connection = connectionManager.getConnection(GrpcUtil.OP_KEY_REGISTER_INSTANCE, ClusterType.SERVICE_DISCOVER_CLUSTER);req.setTargetServer(connectionToTargetNode(connection));// 根據 Connection 創建一個 gRPC StubPolarisGRPCGrpc.PolarisGRPCBlockingStub stub = PolarisGRPCGrpc.newBlockingStub(connection.getChannel());...// 向服務端發起 gRPC 請求,完成服務實例的注冊ResponseProto.Response registerInstanceResponse = stub.registerInstance(buildRegisterInstanceRequest(req));...
}
服務端處理注冊請求
當實例注冊請求從北極星 SDK 發出之后,數據流在服務端主要經歷這幾個流程:
- apiserver 層接受 SDK 的注冊請求,將其轉為對應的服務端數據結構。
- apiserver 層將請求傳遞到 resource auth filter 層,進行資源鑒權。
- resource auth filter 完成資源鑒權后,數據流轉到 service 層。
- service 層完成服務注冊請求的合法性檢查以及資源存在性檢查比對。
- 如果資源存在,則直接返回已存在實例的唯一 ID。
- 如果資源不存在,則根據 namespace、service、host、port 計算出實例的唯一 ID。
- 將注冊請求扔進 BatchController 中。
- batch controller 將一批注冊請求轉為 store 層的數據結構。
- batch controller 將請求傳遞到 store 層,store 層選擇具體的 store plugin 實現完成寫操作。
- mysql 插件:將服務實例信息寫入到對應的數據庫表中。
- bolt 插件:將服務實例信息寫入到本地文件中。
apiserver 層
北極星的 apiserver 層進行接收并處理,由于北極星 SDK 和服務端的數據通信走的是 gRPC 協議,因此這里請求就會在基于 gRPC 實現的 apiserver 插件中進行處理。
// RegisterInstance 注冊服務實例
func (g *DiscoverServer) RegisterInstance(ctx context.Context, in *apiservice.Instance) (*apiservice.Response, error) {// 需要記錄操作來源,提高效率,只針對特殊接口添加operatorrCtx := grpcserver.ConvertContext(ctx)rCtx = context.WithValue(rCtx, utils.StringContext("operator"), ParseGrpcOperator(ctx))// 客戶端請求中帶了 token 的,優先已請求中的為準if in.GetServiceToken().GetValue() != "" {rCtx = context.WithValue(rCtx, utils.ContextAuthTokenKey, in.GetServiceToken().GetValue())}grpcHeader := rCtx.Value(utils.ContextGrpcHeader).(metadata.MD)if _, ok := grpcHeader["async-regis"]; ok {rCtx = context.WithValue(rCtx, utils.ContextOpenAsyncRegis, true)}out := g.namingServer.RegisterInstance(rCtx, in)return out, nil
}
resource auth filter 層
在 apiserver 層處理好請求之后,就會將請求轉發給 resource auth filter 層進行權限檢查。但是從代碼來看,調用的路徑明明是 out := g.namingServer.RegisterInstance(rCtx, in),看這是 service 層的邏輯代碼呀,怎么是 resource auth filter 邏輯呢?
這里就要說明下,由于北極星服務端是多協議設計模式,如果說鑒權邏輯放在協議層,則需要針對每個協議接入都需要做一次鑒權代碼適配,因此這里做了一層調整,將 resource auth filter 的適配邏輯放在了 service 的接口層面,通過代理模式達到在每次調用 service 的接口都是經過 resource auth filter 層的。
// 需要返回包裝代理的 DiscoverServerorder := namingOpt.Interceptorsfor i := range order {factory, exist := serverProxyFactories[order[i]]if !exist {return fmt.Errorf("name(%s) not exist in serverProxyFactories", order[i])}proxySvr, err := factory(namingServer, server)if err != nil {return err}server = proxySvr}return nil
通過代理模式,因此在 apiserver 層調用 service 層的 RegisterInstance 方法時,會先經過 resource auth filter 進行資源權限檢查。
// RegisterInstance create one instance
func (svr *ServerAuthAbility) RegisterInstance(ctx context.Context, req *apiservice.Instance) *apiservice.Response {// 收集本次客戶端發起的服務實例注冊請求所涉及的操作資源authCtx := svr.collectClientInstanceAuthContext(ctx, []*apiservice.Instance{req}, model.Create, "RegisterInstance")// 調用對應的鑒權插件,對本次操作進行權限檢查_, err := svr.strategyMgn.GetAuthChecker().CheckClientPermission(authCtx)if err != nil {resp := api.NewResponseWithMsg(convertToErrCode(err), err.Error())return resp}ctx = authCtx.GetRequestContext()ctx = context.WithValue(ctx, utils.ContextAuthContextKey, authCtx)// 權限檢查通過之后,將請求轉發給 service 層return svr.targetServer.RegisterInstance(ctx, req)
}
service 層
在 resource auth filter 層處理好請求之后,接著就調用 g.namingServer.RegisterInstance(rCtx, in) 將服務實例數據寫進北極星集群中。
// CreateInstance create a single service instance
func (s *Server) CreateInstance(ctx context.Context, req *apiservice.Instance) *apiservice.Response {...data, resp := s.createInstance(ctx, req, &ins)...// 發布實例上線事件 & 記錄實例注冊操作記錄s.sendDiscoverEvent(*event)s.RecordHistory(ctx, instanceRecordEntry(ctx, req, svc, data, model.OCreate))...
}// createInstance store operate
func (s *Server) createInstance(ctx context.Context, req *apiservice.Instance, ins *apiservice.Instance) (*model.Instance, *apiservice.Response) {// 自動創建實例所在的服務信息svcId, errResp := s.createWrapServiceIfAbsent(ctx, req)if errResp != nil {log.Errorf("[Instance] create service if absent fail : %+v, req : %+v", errResp.String(), req)return nil, errResp}if len(svcId) == 0 {log.Errorf("[Instance] create service if absent return service id is empty : %+v", req)return nil, api.NewResponseWithMsg(apimodel.Code_BadRequest, "service id is empty")}// 根據 CMDB 插件填充實例的地域信息s.packCmdb(ins)// 如果沒有開啟實例批量注冊,則同步調用存儲層接口將實例數據進行持久化if namingServer.bc == nil || !namingServer.bc.CreateInstanceOpen() {return s.serialCreateInstance(ctx, svcId, req, ins) // 單個同步}// 如果開啟了實例批量注冊,則會將注冊請求丟入一個異步隊列進行處理return s.asyncCreateInstance(ctx, svcId, req, ins) // 批量異步
}
同步注冊實例
func (s *Server) serialCreateInstance(ctx context.Context, svcId string, req *apiservice.Instance, ins *apiservice.Instance) (*model.Instance, *apiservice.Response) {...instance, err := s.storage.GetInstance(ins.GetId().GetValue())...// 如果存在,則替換實例的屬性數據,但是需要保留用戶設置的隔離狀態,以免出現關鍵狀態丟失if instance != nil && ins.Isolate == nil {ins.Isolate = instance.Proto.Isolate}// 直接同步創建服務實例data := instancecommon.CreateInstanceModel(svcId, ins)// 創建服務實例時,需要先鎖住服務,避免在創建實例的時候把服務信息刪除導致出現錯誤的數據_, releaseFunc, errCode := s.lockService(ctx, req.GetNamespace().GetValue(),req.GetService().GetValue())if errCode != apimodel.Code_ExecuteSuccess {return nil, api.NewInstanceResponse(errCode, req)}defer releaseFunc()// 調用存儲層直接寫實例信息if err := s.storage.AddInstance(data); err != nil {log.Error(err.Error(), utils.ZapRequestID(rid), utils.ZapPlatformID(pid))return nil, wrapperInstanceStoreResponse(req, err)}return data, nil
}
異步注冊實例
func (s *Server) asyncCreateInstance(ctx context.Context, svcId string, req *apiservice.Instance, ins *apiservice.Instance) (*model.Instance, *apiservice.Response) {allowAsyncRegis, _ := ctx.Value(utils.ContextOpenAsyncRegis).(bool)// 將實例注冊請求放入異步任務池中future := s.bc.AsyncCreateInstance(svcId, ins, !allowAsyncRegis)// 等待任務完成if err := future.Wait(); err != nil {if future.Code() == apimodel.Code_ExistedResource {req.Id = utils.NewStringValue(ins.GetId().GetValue())}return nil, api.NewInstanceResponse(future.Code(), req)}return instancecommon.CreateInstanceModel(svcId, req), nil
}
存儲層處理注冊數據
北極星的存儲層是插件化設計,單機模式下是采用 boltdb,集群模式則是依賴 MySQL,這里只說集群模式下的存儲層處理。
依賴 MySQL 的存儲層實現中,針對實例信息,北極星將其拆分成了三個表。
實例主要信息
CREATE TABLE `instance`
(# 實例 ID,主鍵,唯一約束`id` varchar(128) NOT NULL,# 實例所屬的服務 ID`service_id` varchar(32) NOT NULL,# 實例對外可訪問的 IP 地址信息`host` varchar(128) NOT NULL,# 實例對外可訪問的端口地址信息`port` int(11) NOT NULL,# 實例的端口協議信息,用戶自定義,比如 dubbo、gRPC、http 等`protocol` varchar(32) DEFAULT NULL,# 實例的版本信息,用戶自定義,默認為空`version` varchar(32) DEFAULT NULL,# 實例的健康狀態,1 為健康,0 為不健康`health_status` tinyint(4) NOT NULL DEFAULT '1',# 實例的隔離狀態,1 為打開,0 為關閉`isolate` tinyint(4) NOT NULL DEFAULT '0',# 實例的權重信息,主要用于負載均衡`weight` smallint(6) NOT NULL DEFAULT '100',# 是否開啟健康檢查,0 為不開啟,1 為開啟`enable_health_check` tinyint(4) NOT NULL DEFAULT '0',# 實例的地域信息,記錄 region, zone, idc 信息`cmdb_region` varchar(128) DEFAULT NULL comment 'The region information of the instance is mainly used to close the route',`cmdb_zone` varchar(128) DEFAULT NULL comment 'The ZONE information of the instance is mainly used to close the route.',`cmdb_idc` varchar(128) DEFAULT NULL comment 'The IDC information of the instance is mainly used to close the route',# 實例的版本信息,僅北極星內部使用`revision` varchar(32) NOT NULL comment 'Instance version information',# 實例的邏輯刪除標記,0 表示表示正常,1 表示已邏輯刪除`flag` tinyint(4) NOT NULL DEFAULT '0',`ctime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP comment 'Create time',`mtime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP comment 'Last updated time',...
) ENGINE = InnoDB;
實例健康檢查的類型
CREATE TABLE `health_check`
(`id` varchar(128) NOT NULL comment 'Instance ID',`type` tinyint(4) NOT NULL DEFAULT '0' comment 'Instance health check type',`ttl` int(11) NOT NULL comment 'TTL time jumping',PRIMARY KEY (`id`),CONSTRAINT `health_check_ibfk_1` FOREIGN KEY (`id`) REFERENCES `instance` (`id`) ON DELETE CASCADE ON UPDATE CASCADE
) ENGINE = InnoDB;
實例的元數據
CREATE TABLE `instance_metadata`
(`id` varchar(128) NOT NULL comment 'Instance ID',`mkey` varchar(128) NOT NULL comment 'instance label of Key',`mvalue` varchar(4096) NOT NULL comment 'instance label Value',`ctime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP comment 'Create time',`mtime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP comment 'Last updated time',PRIMARY KEY (`id`, `mkey`),KEY `mkey` (`mkey`),CONSTRAINT `instance_metadata_ibfk_1` FOREIGN KEY (`id`) REFERENCES `instance` (`id`) ON DELETE CASCADE ON UPDATE CASCADE
) ENGINE = InnoDB;
因此在操作存儲層持久化服務實例信息時,需要做以下幾個操作:
// 添加實例主信息
if err := batchAddMainInstances(tx, instances); err != nil {log.Errorf("[Store][database] batch add main instances err: %s", err.Error())return err
}
// 添加實例的健康檢查信息
if err := batchAddInstanceCheck(tx, instances); err != nil {log.Errorf("[Store][database] batch add instance check err: %s", err.Error())return err
}
// 先清理實例原先的 metadata 信息數據,確保不會遺留臟數據
if err := batchDeleteInstanceMeta(tx, instances); err != nil {log.Errorf("[Store][database] batch delete instance metadata err: %s", err.Error())return err
}
// 添加實例的 metadata 信息
if err := batchAddInstanceMeta(tx, instances); err != nil {log.Errorf("[Store][database] batch add instance metadata err: %s", err.Error())return err
}