三、服務心跳。
3.服務心跳
Nacos的實例分為臨時實例和永久實例兩種,可以通過在yaml 文件配置:
spring:application:name: order-servicecloud:nacos:discovery:ephemeral: false # 設置實例為永久實例。true:臨時; false:永久server-addr: 192.168.150.1:8845臨時實例基于心跳方式做健康檢測,而永久實例則是由Nacos主動探測實例狀態。
其中Nacos提供的心跳的API接口為:
接口描述:發送某個實例的心跳
請求類型:PUT
請求路徑:
/nacos/v1/ns/instance/beat請求參數:
名稱 類型 是否必選 描述 serviceName 字符串 是 服務名 groupName 字符串 否 分組名 ephemeral boolean 否 是否臨時實例 beat JSON格式字符串 是 實例心跳內容 錯誤編碼:
錯誤代碼 描述 語義 400 Bad Request 客戶端請求中的語法錯誤 403 Forbidden 沒有權限 404 Not Found 無法找到資源 500 Internal Server Error 服務器內部錯誤 200 OK 正常 3.1.客戶端
在2.2.4.服務注冊這一節中,我們說過NacosNamingService這個類實現了服務的注冊,同時也實現了服務心跳:
@Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);// 判斷是否是臨時實例。if (instance.isEphemeral()) {// 如果是臨時實例,則構建心跳信息BeatInfoBeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);// 添加心跳任務beatReactor.addBeatInfo(groupedServiceName, beatInfo);}serverProxy.registerService(groupedServiceName, groupName, instance); }
3.1.1.BeatInfo
這里的BeanInfo就包含心跳需要的各種信息:
3.1.2.BeatReactor
而
BeatReactor
這個類則維護了一個線程池:
當調用
BeatReactor
的.addBeatInfo(groupedServiceName, beatInfo)
方法時,就會執行心跳:public void addBeatInfo(String serviceName, BeatInfo beatInfo) {NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());BeatInfo existBeat = null;//fix #1733if ((existBeat = dom2Beat.remove(key)) != null) {existBeat.setStopped(true);}dom2Beat.put(key, beatInfo);// 利用線程池,定期執行心跳任務,周期為 beatInfo.getPeriod()executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); }
心跳周期的默認值在
com.alibaba.nacos.api.common.Constants
類中:
可以看到是5秒,默認5秒一次心跳。
3.1.3.BeatTask
心跳的任務封裝在
BeatTask
這個類中,是一個Runnable,其run方法如下:@Override public void run() {if (beatInfo.isStopped()) {return;}// 獲取心跳周期long nextTime = beatInfo.getPeriod();try {// 發送心跳JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);long interval = result.get("clientBeatInterval").asLong();boolean lightBeatEnabled = false;if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();}BeatReactor.this.lightBeatEnabled = lightBeatEnabled;if (interval > 0) {nextTime = interval;}// 判斷心跳結果int code = NamingResponseCode.OK;if (result.has(CommonParams.CODE)) {code = result.get(CommonParams.CODE).asInt();}if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {// 如果失敗,則需要 重新注冊實例Instance instance = new Instance();instance.setPort(beatInfo.getPort());instance.setIp(beatInfo.getIp());instance.setWeight(beatInfo.getWeight());instance.setMetadata(beatInfo.getMetadata());instance.setClusterName(beatInfo.getCluster());instance.setServiceName(beatInfo.getServiceName());instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(true);try {serverProxy.registerService(beatInfo.getServiceName(),NamingUtils.getGroupName(beatInfo.getServiceName()), instance);} catch (Exception ignore) {}}} catch (NacosException ex) {NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg()); ?} catch (Exception unknownEx) {NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);} finally {executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);} }
3.1.5.發送心跳
最終心跳的發送還是通過
NamingProxy
的sendBeat
方法來實現:public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException { ?if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());}// 組織請求參數Map<String, String> params = new HashMap<String, String>(8);Map<String, String> bodyMap = new HashMap<String, String>(2);if (!lightBeatEnabled) {bodyMap.put("beat", JacksonUtils.toJson(beatInfo));}params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());params.put("ip", beatInfo.getIp());params.put("port", String.valueOf(beatInfo.getPort()));// 發送請求,這個地址就是:/v1/ns/instance/beatString result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);return JacksonUtils.toObj(result); }
3.2.服務端
對于臨時實例,服務端代碼分兩部分:
1)InstanceController提供了一個接口,處理客戶端的心跳請求
2)定時檢測實例心跳是否按期執行
3.2.1.InstanceController
與服務注冊時一樣,在nacos-naming模塊中的InstanceController類中,定義了一個方法用來處理心跳請求:
@CanDistro @PutMapping("/beat") @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public ObjectNode beat(HttpServletRequest request) throws Exception {// 解析心跳的請求參數ObjectNode result = JacksonUtils.createEmptyJsonNode();result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval()); ?String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);RsInfo clientBeat = null;if (StringUtils.isNotBlank(beat)) {clientBeat = JacksonUtils.toObj(beat, RsInfo.class);}String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));if (clientBeat != null) {if (StringUtils.isNotBlank(clientBeat.getCluster())) {clusterName = clientBeat.getCluster();} else {// fix #2533clientBeat.setCluster(clusterName);}ip = clientBeat.getIp();port = clientBeat.getPort();}String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);// 嘗試根據參數中的namespaceId、serviceName、clusterName、ip、port等信息// 從Nacos的注冊表中 獲取實例Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);// 如果獲取失敗,說明心跳失敗,實例尚未注冊if (instance == null) {if (clientBeat == null) {result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);return result;} ?Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);// 這里重新注冊一個實例instance = new Instance();instance.setPort(clientBeat.getPort());instance.setIp(clientBeat.getIp());instance.setWeight(clientBeat.getWeight());instance.setMetadata(clientBeat.getMetadata());instance.setClusterName(clusterName);instance.setServiceName(serviceName);instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(clientBeat.isEphemeral()); ?serviceManager.registerInstance(namespaceId, serviceName, instance);}// 嘗試基于namespaceId和serviceName從 注冊表中獲取Service服務Service service = serviceManager.getService(namespaceId, serviceName);// 如果不存在,說明服務不存在,返回404if (service == null) {throw new NacosException(NacosException.SERVER_ERROR,"service not found: " + serviceName + "@" + namespaceId);}if (clientBeat == null) {clientBeat = new RsInfo();clientBeat.setIp(ip);clientBeat.setPort(port);clientBeat.setCluster(clusterName);}// 如果心跳沒問題,開始處理心跳結果service.processClientBeat(clientBeat); ?result.put(CommonParams.CODE, NamingResponseCode.OK);if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());}result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());return result; }
最終,在確認心跳請求對應的服務、實例都在的情況下,開始交給Service類處理這次心跳請求。調用了Service的processClientBeat方法
3.2.2.處理心跳請求
查看
Service
的service.processClientBeat(clientBeat);
方法:public void processClientBeat(final RsInfo rsInfo) {ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();clientBeatProcessor.setService(this);clientBeatProcessor.setRsInfo(rsInfo);HealthCheckReactor.scheduleNow(clientBeatProcessor); }
可以看到心跳信息被封裝到了 ClientBeatProcessor類中,交給了HealthCheckReactor處理,HealthCheckReactor就是對線程池的封裝,不用過多查看。
關鍵的業務邏輯都在ClientBeatProcessor這個類中,它是一個Runnable,其中的run方法如下:
@Override public void run() {Service service = this.service;if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());} ?String ip = rsInfo.getIp();String clusterName = rsInfo.getCluster();int port = rsInfo.getPort();// 獲取集群信息Cluster cluster = service.getClusterMap().get(clusterName);// 獲取集群中的所有實例信息List<Instance> instances = cluster.allIPs(true); ?for (Instance instance : instances) {// 找到心跳的這個實例if (instance.getIp().equals(ip) && instance.getPort() == port) {if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());}// 更新實例的最后一次心跳時間 lastBeatinstance.setLastBeat(System.currentTimeMillis());if (!instance.isMarked()) {if (!instance.isHealthy()) {instance.setHealthy(true);Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",cluster.getService().getName(), ip, port, cluster.getName(),UtilsAndCommons.LOCALHOST_SITE);getPushService().serviceChanged(service);}}}} }
處理心跳請求的核心就是更新心跳實例的最后一次心跳時間,lastBeat,這個會成為判斷實例心跳是否過期的關鍵指標!
3.3.3.心跳異常檢測
在服務注冊時,一定會創建一個
Service
對象,而Service
中有一個init
方法,會在注冊時被調用:public void init() {// 開啟心跳檢測的任務HealthCheckReactor.scheduleCheck(clientBeatCheckTask);for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);entry.getValue().init();} }
其中HealthCheckReactor.scheduleCheck就是執行心跳檢測的定時任務:
可以看到,該任務是5000ms執行一次,也就是5秒對實例的心跳狀態做一次檢測。
此處的ClientBeatCheckTask同樣是一個Runnable,其中的run方法為:
@Override public void run() {try {// 找到所有臨時實例的列表List<Instance> instances = service.allIPs(true); ?// first set health status of instances:for (Instance instance : instances) {// 判斷 心跳間隔(當前時間 - 最后一次心跳時間) 是否大于 心跳超時時間,默認15秒if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {if (instance.isHealthy()) {// 如果超時,標記實例為不健康 healthy = falseinstance.setHealthy(false);// 發布實例狀態變更的事件getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}} ?if (!getGlobalConfig().isExpireInstance()) {return;} ?// then remove obsolete instances:for (Instance instance : instances) { ?if (instance.isMarked()) {continue;}// 判斷心跳間隔(當前時間 - 最后一次心跳時間)是否大于 實例被刪除的最長超時時間,默認30秒if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// 如果是超過了30秒,則刪除實例Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),JacksonUtils.toJson(instance));deleteIp(instance);}} ?} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);} ? }
其中的超時時間同樣是在
com.alibaba.nacos.api.common.Constants
這個類中:
3.3.4.主動健康檢測
對于非臨時實例(ephemeral=false),Nacos會采用主動的健康檢測,定時向實例發送請求,根據響應來判斷實例健康狀態。
入口在2.3.2小節的
ServiceManager
類中的registerInstance方法:
創建空服務時:
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {// 如果服務不存在,創建新的服務createServiceIfAbsent(namespaceId, serviceName, local, null); }
創建服務流程:
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)throws NacosException {// 嘗試獲取服務Service service = getService(namespaceId, serviceName);if (service == null) {// 發現服務不存在,開始創建新服務Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);service = new Service();service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(NamingUtils.getGroupName(serviceName));// now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();if (cluster != null) {cluster.setService(service);service.getClusterMap().put(cluster.getName(), cluster);}service.validate();// ** 寫入注冊表并初始化 **putServiceAndInit(service);if (!local) {addOrReplaceService(service);}} }
關鍵在
putServiceAndInit(service)
方法中:private void putServiceAndInit(Service service) throws NacosException {// 將服務寫入注冊表putService(service);service = getService(service.getNamespaceId(), service.getName());// 完成服務的初始化service.init();consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson()); }
進入初始化邏輯:
service.init()
,這個會進入Service類中:/*** Init service.*/ public void init() {// 開啟臨時實例的心跳監測任務HealthCheckReactor.scheduleCheck(clientBeatCheckTask);// 遍歷注冊表中的集群for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);// 完成集群初識化entry.getValue().init();} }
這里集群的初始化
entry.getValue().init();
會進入Cluster
類型的init()
方法:/*** Init cluster.*/ public void init() {if (inited) {return;}// 創建健康檢測的任務checkTask = new HealthCheckTask(this);// 這里會開啟對 非臨時實例的 定時健康檢測HealthCheckReactor.scheduleCheck(checkTask);inited = true; }
這里的
HealthCheckReactor.scheduleCheck(checkTask);
會開啟定時任務,對非臨時實例做健康檢測。檢測邏輯定義在HealthCheckTask
這個類中,是一個Runnable,其中的run方法:public void run() { ?try {if (distroMapper.responsible(cluster.getService().getName()) && switchDomain.isHealthCheckEnabled(cluster.getService().getName())) {// 開始健康檢測healthCheckProcessor.process(this);// 記錄日志 。。。}} catch (Throwable e) {// 記錄日志 。。。} finally {if (!cancelled) {// 結束后,再次進行任務調度,一定延遲后執行HealthCheckReactor.scheduleCheck(this);// 。。。}} }
健康檢測邏輯定義在
healthCheckProcessor.process(this);
方法中,在HealthCheckProcessor接口中,這個接口也有很多實現,默認是TcpSuperSenseProcessor
:
進入
TcpSuperSenseProcessor
的process方法:@Override public void process(HealthCheckTask task) {// 獲取所有 非臨時實例的 集合List<Instance> ips = task.getCluster().allIPs(false); ?if (CollectionUtils.isEmpty(ips)) {return;} ?for (Instance ip : ips) {// 封裝健康檢測信息到 BeatBeat beat = new Beat(ip, task);// 放入一個阻塞隊列中taskQueue.add(beat);MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();} }
可以看到,所有的健康檢測任務都被放入一個阻塞隊列,而不是立即執行了。這里又采用了異步執行的策略,可以看到Nacos中大量這樣的設計。
而
TcpSuperSenseProcessor
本身就是一個Runnable,在它的構造函數中會把自己放入線程池中去執行,其run方法如下:public void run() {while (true) {try {// 處理任務processTask();// ...} catch (Throwable e) {SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task", e);}} }
通過processTask來處理健康檢測的任務:
private void processTask() throws Exception {// 將任務封裝為一個 TaskProcessor,并放入集合Collection<Callable<Void>> tasks = new LinkedList<>();do {Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);if (beat == null) {return;} ?tasks.add(new TaskProcessor(beat));} while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);// 批量處理集合中的任務for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) {f.get();} }
任務被封裝到了TaskProcessor中去執行了,TaskProcessor是一個Callable,其中的call方法:
@Override public Void call() {// 獲取檢測任務已經等待的時長long waited = System.currentTimeMillis() - beat.getStartTime();if (waited > MAX_WAIT_TIME_MILLISECONDS) {Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms");}SocketChannel channel = null;try {// 獲取實例信息Instance instance = beat.getIp();// 通過NIO建立TCP連接channel = SocketChannel.open();channel.configureBlocking(false);// only by setting this can we make the socket close event asynchronouschannel.socket().setSoLinger(false, -1);channel.socket().setReuseAddress(true);channel.socket().setKeepAlive(true);channel.socket().setTcpNoDelay(true); ?Cluster cluster = beat.getTask().getCluster();int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport();channel.connect(new InetSocketAddress(instance.getIp(), port));// 注冊連接、讀取事件SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);key.attach(beat);keyMap.put(beat.toString(), new BeatKey(key)); ?beat.setStartTime(System.currentTimeMillis()); ?GlobalExecutor.scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);} catch (Exception e) {beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(),"tcp:error:" + e.getMessage()); ?if (channel != null) {try {channel.close();} catch (Exception ignore) {}}} ?return null; }
3.3.總結
Nacos的健康檢測有兩種模式:
臨時實例:
采用客戶端心跳檢測模式,心跳周期5秒
心跳間隔超過15秒則標記為不健康
心跳間隔超過30秒則從服務列表刪除
永久實例:
采用服務端主動健康檢測方式
周期為2000 + 5000毫秒內的隨機數
檢測異常只會標記為不健康,不會刪除
那么為什么Nacos有臨時和永久兩種實例呢?
以淘寶為例,雙十一大促期間,流量會比平常高出很多,此時服務肯定需要增加更多實例來應對高并發,而這些實例在雙十一之后就無需繼續使用了,采用臨時實例比較合適。而對于服務的一些常備實例,則使用永久實例更合適。
與eureka相比,Nacos與Eureka在臨時實例上都是基于心跳模式實現,差別不大,主要是心跳周期不同,eureka是30秒,Nacos是5秒。
另外,Nacos支持永久實例,而Eureka不支持,Eureka只提供了心跳模式的健康監測,而沒有主動檢測功能。
四、服務發現。
4.服務發現
Nacos提供了一個根據serviceId查詢實例列表的接口:
接口描述:查詢服務下的實例列表
請求類型:GET
請求路徑:
/nacos/v1/ns/instance/list請求參數:
名稱 類型 是否必選 描述 serviceName 字符串 是 服務名 groupName 字符串 否 分組名 namespaceId 字符串 否 命名空間ID clusters 字符串,多個集群用逗號分隔 否 集群名稱 healthyOnly boolean 否,默認為false 是否只返回健康實例 錯誤編碼:
錯誤代碼 描述 語義 400 Bad Request 客戶端請求中的語法錯誤 403 Forbidden 沒有權限 404 Not Found 無法找到資源 500 Internal Server Error 服務器內部錯誤 200 OK 正常 4.1.客戶端
4.1.1.定時更新服務列表
4.1.1.1.NacosNamingService
在2.2.4小節中,我們講到一個類
NacosNamingService
,這個類不僅僅提供了服務注冊功能,同樣提供了服務發現的功能。
多個重載的方法最終都會進入一個方法:
@Override public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,boolean subscribe) throws NacosException { ?ServiceInfo serviceInfo;// 1.判斷是否需要訂閱服務信息(默認為 true)if (subscribe) {// 1.1.訂閱服務信息serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),StringUtils.join(clusters, ","));} else {// 1.2.直接去nacos拉取服務信息serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),StringUtils.join(clusters, ","));}// 2.從服務信息中獲取實例列表并返回List<Instance> list;if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {return new ArrayList<Instance>();}return list; }
4.1.1.2.HostReactor
進入1.1.訂閱服務消息,這里是由
HostReactor
類的getServiceInfo()
方法來實現的:public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { ?NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());// 由 服務名@@集群名拼接 keyString key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}// 讀取本地服務列表的緩存,緩存是一個Map,格式:Map<String, ServiceInfo>ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);// 判斷緩存是否存在if (null == serviceObj) {// 不存在,創建空ServiceInfoserviceObj = new ServiceInfo(serviceName, clusters);// 放入緩存serviceInfoMap.put(serviceObj.getKey(), serviceObj);// 放入待更新的服務列表(updatingMap)中updatingMap.put(serviceName, new Object());// 立即更新服務列表updateServiceNow(serviceName, clusters);// 從待更新列表中移除updatingMap.remove(serviceName); ?} else if (updatingMap.containsKey(serviceName)) {// 緩存中有,但是需要更新if (UPDATE_HOLD_INTERVAL > 0) {// hold a moment waiting for update finish 等待5秒中,待更新完成synchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);}}}}// 開啟定時更新服務列表的功能scheduleUpdateIfAbsent(serviceName, clusters);// 返回緩存中的服務信息return serviceInfoMap.get(serviceObj.getKey()); }
基本邏輯就是先從本地緩存讀,根據結果來選擇:
如果本地緩存沒有,立即去nacos讀取,
updateServiceNow(serviceName, clusters)
如果本地緩存有,則開啟定時更新功能,并返回緩存結果:
scheduleUpdateIfAbsent(serviceName, clusters)
在UpdateTask中,最終還是調用updateService方法:
不管是立即更新服務列表,還是定時更新服務列表,最終都會執行HostReactor中的updateService()方法:
public void updateService(String serviceName, String clusters) throws NacosException {ServiceInfo oldService = getServiceInfo0(serviceName, clusters);try {// 基于ServerProxy發起遠程調用,查詢服務列表String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); ?if (StringUtils.isNotEmpty(result)) {// 處理查詢結果processServiceJson(result);}} finally {if (oldService != null) {synchronized (oldService) {oldService.notifyAll();}}} }
4.1.1.3.ServerProxy
而ServerProxy的queryList方法如下:
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)throws NacosException {// 準備請求參數final Map<String, String> params = new HashMap<String, String>(8);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put("clusters", clusters);params.put("udpPort", String.valueOf(udpPort));params.put("clientIP", NetUtils.localIP());params.put("healthyOnly", String.valueOf(healthyOnly));// 發起請求,地址與API接口一致return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET); }
4.1.2.處理服務變更通知
除了定時更新服務列表的功能外,Nacos還支持服務列表變更時的主動推送功能。
在HostReactor類的構造函數中,有非常重要的幾個步驟:
![]()
基本思路是:
通過PushReceiver監聽服務端推送的變更數據
解析數據后,通過NotifyCenter發布服務變更的事件
InstanceChangeNotifier監聽變更事件,完成對服務列表的更新
4.1.2.1.PushReceiver
我們先看PushReceiver,這個類會以UDP方式接收Nacos服務端推送的服務變更數據。
先看構造函數:
public PushReceiver(HostReactor hostReactor) {try {this.hostReactor = hostReactor;// 創建 UDP客戶端String udpPort = getPushReceiverUdpPort();if (StringUtils.isEmpty(udpPort)) {this.udpSocket = new DatagramSocket();} else {this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));}// 準備線程池this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.naming.push.receiver");return thread;}});// 開啟線程任務,準備接收變更數據this.executorService.execute(this);} catch (Exception e) {NAMING_LOGGER.error("[NA] init udp socket failed", e);} }
PushReceiver構造函數中基于線程池來運行任務。這是因為PushReceiver本身也是一個Runnable,其中的run方法業務邏輯如下:
@Override public void run() {while (!closed) {try {// byte[] is initialized with 0 full filled by defaultbyte[] buffer = new byte[UDP_MSS];DatagramPacket packet = new DatagramPacket(buffer, buffer.length);// 接收推送數據udpSocket.receive(packet);// 解析為json字符串String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());// 反序列化為對象PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);String ack;if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {// 交給 HostReactor去處理hostReactor.processServiceJson(pushPacket.data); ?// send ack to server 發送ACK回執,略。。} catch (Exception e) {if (closed) {return;}NAMING_LOGGER.error("[NA] error while receiving push data", e);}} }
4.1.2.2.HostReactor
通知數據的處理由交給了
HostReactor
的processServiceJson
方法:public ServiceInfo processServiceJson(String json) {// 解析出ServiceInfo信息ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);String serviceKey = serviceInfo.getKey();if (serviceKey == null) {return null;}// 查詢緩存中的 ServiceInfoServiceInfo oldService = serviceInfoMap.get(serviceKey); ?// 如果緩存存在,則需要校驗哪些數據要更新boolean changed = false;if (oldService != null) {// 拉取的數據是否已經過期if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "+ serviceInfo.getLastRefTime());}// 放入緩存serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);// 中間是緩存與新數據的對比,得到newHosts:新增的實例;remvHosts:待移除的實例;// modHosts:需要修改的實例if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {// 發布實例變更的事件NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));DiskCache.write(serviceInfo, cacheDir);} ?} else {// 本地緩存不存在changed = true;// 放入緩存serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);// 直接發布實例變更的事件NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));serviceInfo.setJsonFromServer(json);DiskCache.write(serviceInfo, cacheDir);}// 。。。return serviceInfo; }
4.2.服務端
4.2.1.拉取服務列表接口
在2.3.1小節介紹的InstanceController中,提供了拉取服務列表的接口:
/*** Get all instance of input service.** @param request http request* @return list of instance* @throws Exception any error during list*/ @GetMapping("/list") @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ) public ObjectNode list(HttpServletRequest request) throws Exception {// 從request中獲取namespaceId和serviceNameString namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName); ?String agent = WebUtils.getUserAgent(request);String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);// 獲取客戶端的 UDP端口int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));String env = WebUtils.optional(request, "env", StringUtils.EMPTY);boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false")); ?String app = WebUtils.optional(request, "app", StringUtils.EMPTY); ?String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY); ?boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false")); ?// 獲取服務列表return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,healthyOnly); }
進入
doSrvIpxt()
方法來獲取服務列表:public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent,String clusters, String clientIP,int udpPort, String env, boolean isCheck,String app, String tid, boolean healthyOnly) throws Exception {ClientInfo clientInfo = new ClientInfo(agent);ObjectNode result = JacksonUtils.createEmptyJsonNode();// 獲取服務列表信息Service service = serviceManager.getService(namespaceId, serviceName);long cacheMillis = switchDomain.getDefaultCacheMillis(); ?// now try to enable the pushtry {if (udpPort > 0 && pushService.canEnablePush(agent)) {// 添加當前客戶端 IP、UDP端口到 PushService 中pushService.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),pushDataSource, tid, app);cacheMillis = switchDomain.getPushCacheMillis(serviceName);}} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);cacheMillis = switchDomain.getDefaultCacheMillis();} ?if (service == null) {// 如果沒找到,返回空if (Loggers.SRV_LOG.isDebugEnabled()) {Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);}result.put("name", serviceName);result.put("clusters", clusters);result.put("cacheMillis", cacheMillis);result.replace("hosts", JacksonUtils.createEmptyArrayNode());return result;}// 結果的檢測,異常實例的剔除等邏輯省略// 最終封裝結果并返回 。。。 ?result.replace("hosts", hosts);if (clientInfo.type == ClientInfo.ClientType.JAVA&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {result.put("dom", serviceName);} else {result.put("dom", NamingUtils.getServiceName(serviceName));}result.put("name", serviceName);result.put("cacheMillis", cacheMillis);result.put("lastRefTime", System.currentTimeMillis());result.put("checksum", service.getChecksum());result.put("useSpecifiedURL", false);result.put("clusters", clusters);result.put("env", env);result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));return result; }
4.2.2.發布服務變更的UDP通知
在上一節中,
InstanceController
中的doSrvIpxt()
方法中,有這樣一行代碼:pushService.addClient(namespaceId, serviceName, clusters, agent,new InetSocketAddress(clientIP, udpPort),pushDataSource, tid, app);
其實是把消費者的UDP端口、IP等信息封裝為一個PushClient對象,存儲PushService中。方便以后服務變更后推送消息。
PushService類本身實現了
ApplicationListener
接口:![]()
這個是事件監聽器接口,監聽的是ServiceChangeEvent(服務變更事件)。
當服務列表變化時,就會通知我們:
![]()
4.3.總結
Nacos的服務發現分為兩種模式:
模式一:主動拉取模式,消費者定期主動從Nacos拉取服務列表并緩存起來,再服務調用時優先讀取本地緩存中的服務列表。
模式二:訂閱模式,消費者訂閱Nacos中的服務列表,并基于UDP協議來接收服務變更通知。當Nacos中的服務列表更新時,會發送UDP廣播給所有訂閱者。
與Eureka相比,Nacos的訂閱模式服務狀態更新更及時,消費者更容易及時發現服務列表的變化,剔除故障服務。