58.Nacos源碼分析2

三、服務心跳。

3.服務心跳

Nacos的實例分為臨時實例和永久實例兩種,可以通過在yaml 文件配置:

spring:application:name: order-servicecloud:nacos:discovery:ephemeral: false # 設置實例為永久實例。true:臨時; false:永久server-addr: 192.168.150.1:8845

臨時實例基于心跳方式做健康檢測,而永久實例則是由Nacos主動探測實例狀態。

其中Nacos提供的心跳的API接口為:

接口描述:發送某個實例的心跳

請求類型:PUT

請求路徑

/nacos/v1/ns/instance/beat

請求參數

名稱類型是否必選描述
serviceName字符串服務名
groupName字符串分組名
ephemeralboolean是否臨時實例
beatJSON格式字符串實例心跳內容

錯誤編碼

錯誤代碼描述語義
400Bad Request客戶端請求中的語法錯誤
403Forbidden沒有權限
404Not Found無法找到資源
500Internal Server Error服務器內部錯誤
200OK正常

3.1.客戶端

在2.2.4.服務注冊這一節中,我們說過NacosNamingService這個類實現了服務的注冊,同時也實現了服務心跳:

@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {NamingUtils.checkInstanceIsLegal(instance);String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);// 判斷是否是臨時實例。if (instance.isEphemeral()) {// 如果是臨時實例,則構建心跳信息BeatInfoBeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);// 添加心跳任務beatReactor.addBeatInfo(groupedServiceName, beatInfo);}serverProxy.registerService(groupedServiceName, groupName, instance);
}

3.1.1.BeatInfo

這里的BeanInfo就包含心跳需要的各種信息:

3.1.2.BeatReactor

BeatReactor這個類則維護了一個線程池:

當調用BeatReactor.addBeatInfo(groupedServiceName, beatInfo)方法時,就會執行心跳:

public void addBeatInfo(String serviceName, BeatInfo beatInfo) {NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());BeatInfo existBeat = null;//fix #1733if ((existBeat = dom2Beat.remove(key)) != null) {existBeat.setStopped(true);}dom2Beat.put(key, beatInfo);// 利用線程池,定期執行心跳任務,周期為 beatInfo.getPeriod()executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

心跳周期的默認值在com.alibaba.nacos.api.common.Constants類中:

可以看到是5秒,默認5秒一次心跳。

3.1.3.BeatTask

心跳的任務封裝在BeatTask這個類中,是一個Runnable,其run方法如下:

@Override
public void run() {if (beatInfo.isStopped()) {return;}// 獲取心跳周期long nextTime = beatInfo.getPeriod();try {// 發送心跳JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);long interval = result.get("clientBeatInterval").asLong();boolean lightBeatEnabled = false;if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();}BeatReactor.this.lightBeatEnabled = lightBeatEnabled;if (interval > 0) {nextTime = interval;}// 判斷心跳結果int code = NamingResponseCode.OK;if (result.has(CommonParams.CODE)) {code = result.get(CommonParams.CODE).asInt();}if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {// 如果失敗,則需要 重新注冊實例Instance instance = new Instance();instance.setPort(beatInfo.getPort());instance.setIp(beatInfo.getIp());instance.setWeight(beatInfo.getWeight());instance.setMetadata(beatInfo.getMetadata());instance.setClusterName(beatInfo.getCluster());instance.setServiceName(beatInfo.getServiceName());instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(true);try {serverProxy.registerService(beatInfo.getServiceName(),NamingUtils.getGroupName(beatInfo.getServiceName()), instance);} catch (Exception ignore) {}}} catch (NacosException ex) {NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}",JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
?} catch (Exception unknownEx) {NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, unknown exception msg: {}",JacksonUtils.toJson(beatInfo), unknownEx.getMessage(), unknownEx);} finally {executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);}
}

3.1.5.發送心跳

最終心跳的發送還是通過NamingProxysendBeat方法來實現:

public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
?if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());}// 組織請求參數Map<String, String> params = new HashMap<String, String>(8);Map<String, String> bodyMap = new HashMap<String, String>(2);if (!lightBeatEnabled) {bodyMap.put("beat", JacksonUtils.toJson(beatInfo));}params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());params.put("ip", beatInfo.getIp());params.put("port", String.valueOf(beatInfo.getPort()));// 發送請求,這個地址就是:/v1/ns/instance/beatString result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);return JacksonUtils.toObj(result);
}

3.2.服務端

對于臨時實例,服務端代碼分兩部分:

  • 1)InstanceController提供了一個接口,處理客戶端的心跳請求

  • 2)定時檢測實例心跳是否按期執行

3.2.1.InstanceController

與服務注冊時一樣,在nacos-naming模塊中的InstanceController類中,定義了一個方法用來處理心跳請求:

@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {// 解析心跳的請求參數ObjectNode result = JacksonUtils.createEmptyJsonNode();result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());
?String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);RsInfo clientBeat = null;if (StringUtils.isNotBlank(beat)) {clientBeat = JacksonUtils.toObj(beat, RsInfo.class);}String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));if (clientBeat != null) {if (StringUtils.isNotBlank(clientBeat.getCluster())) {clusterName = clientBeat.getCluster();} else {// fix #2533clientBeat.setCluster(clusterName);}ip = clientBeat.getIp();port = clientBeat.getPort();}String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);// 嘗試根據參數中的namespaceId、serviceName、clusterName、ip、port等信息// 從Nacos的注冊表中 獲取實例Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);// 如果獲取失敗,說明心跳失敗,實例尚未注冊if (instance == null) {if (clientBeat == null) {result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);return result;}
?Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, "+ "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);// 這里重新注冊一個實例instance = new Instance();instance.setPort(clientBeat.getPort());instance.setIp(clientBeat.getIp());instance.setWeight(clientBeat.getWeight());instance.setMetadata(clientBeat.getMetadata());instance.setClusterName(clusterName);instance.setServiceName(serviceName);instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(clientBeat.isEphemeral());
?serviceManager.registerInstance(namespaceId, serviceName, instance);}// 嘗試基于namespaceId和serviceName從 注冊表中獲取Service服務Service service = serviceManager.getService(namespaceId, serviceName);// 如果不存在,說明服務不存在,返回404if (service == null) {throw new NacosException(NacosException.SERVER_ERROR,"service not found: " + serviceName + "@" + namespaceId);}if (clientBeat == null) {clientBeat = new RsInfo();clientBeat.setIp(ip);clientBeat.setPort(port);clientBeat.setCluster(clusterName);}// 如果心跳沒問題,開始處理心跳結果service.processClientBeat(clientBeat);
?result.put(CommonParams.CODE, NamingResponseCode.OK);if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());}result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());return result;
}

最終,在確認心跳請求對應的服務、實例都在的情況下,開始交給Service類處理這次心跳請求。調用了Service的processClientBeat方法

3.2.2.處理心跳請求

查看Serviceservice.processClientBeat(clientBeat);方法:

public void processClientBeat(final RsInfo rsInfo) {ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();clientBeatProcessor.setService(this);clientBeatProcessor.setRsInfo(rsInfo);HealthCheckReactor.scheduleNow(clientBeatProcessor);
}

可以看到心跳信息被封裝到了 ClientBeatProcessor類中,交給了HealthCheckReactor處理,HealthCheckReactor就是對線程池的封裝,不用過多查看。

關鍵的業務邏輯都在ClientBeatProcessor這個類中,它是一個Runnable,其中的run方法如下:

@Override
public void run() {Service service = this.service;if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());}
?String ip = rsInfo.getIp();String clusterName = rsInfo.getCluster();int port = rsInfo.getPort();// 獲取集群信息Cluster cluster = service.getClusterMap().get(clusterName);// 獲取集群中的所有實例信息List<Instance> instances = cluster.allIPs(true);
?for (Instance instance : instances) {// 找到心跳的這個實例if (instance.getIp().equals(ip) && instance.getPort() == port) {if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());}// 更新實例的最后一次心跳時間 lastBeatinstance.setLastBeat(System.currentTimeMillis());if (!instance.isMarked()) {if (!instance.isHealthy()) {instance.setHealthy(true);Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",cluster.getService().getName(), ip, port, cluster.getName(),UtilsAndCommons.LOCALHOST_SITE);getPushService().serviceChanged(service);}}}}
}

處理心跳請求的核心就是更新心跳實例的最后一次心跳時間,lastBeat,這個會成為判斷實例心跳是否過期的關鍵指標!

3.3.3.心跳異常檢測

在服務注冊時,一定會創建一個Service對象,而Service中有一個init方法,會在注冊時被調用:

public void init() {// 開啟心跳檢測的任務HealthCheckReactor.scheduleCheck(clientBeatCheckTask);for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);entry.getValue().init();}
}

其中HealthCheckReactor.scheduleCheck就是執行心跳檢測的定時任務:

可以看到,該任務是5000ms執行一次,也就是5秒對實例的心跳狀態做一次檢測。

此處的ClientBeatCheckTask同樣是一個Runnable,其中的run方法為:

@Override
public void run() {try {// 找到所有臨時實例的列表List<Instance> instances = service.allIPs(true);
?// first set health status of instances:for (Instance instance : instances) {// 判斷 心跳間隔(當前時間 - 最后一次心跳時間) 是否大于 心跳超時時間,默認15秒if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {if (instance.isHealthy()) {// 如果超時,標記實例為不健康 healthy = falseinstance.setHealthy(false);// 發布實例狀態變更的事件getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}
?if (!getGlobalConfig().isExpireInstance()) {return;}
?// then remove obsolete instances:for (Instance instance : instances) {
?if (instance.isMarked()) {continue;}// 判斷心跳間隔(當前時間 - 最后一次心跳時間)是否大于 實例被刪除的最長超時時間,默認30秒if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// 如果是超過了30秒,則刪除實例Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),JacksonUtils.toJson(instance));deleteIp(instance);}}
?} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}
?
}

其中的超時時間同樣是在com.alibaba.nacos.api.common.Constants這個類中:

3.3.4.主動健康檢測

對于非臨時實例(ephemeral=false),Nacos會采用主動的健康檢測,定時向實例發送請求,根據響應來判斷實例健康狀態。

入口在2.3.2小節的ServiceManager類中的registerInstance方法:

創建空服務時:

public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {// 如果服務不存在,創建新的服務createServiceIfAbsent(namespaceId, serviceName, local, null);
}

創建服務流程:

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)throws NacosException {// 嘗試獲取服務Service service = getService(namespaceId, serviceName);if (service == null) {// 發現服務不存在,開始創建新服務Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);service = new Service();service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(NamingUtils.getGroupName(serviceName));// now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();if (cluster != null) {cluster.setService(service);service.getClusterMap().put(cluster.getName(), cluster);}service.validate();// ** 寫入注冊表并初始化 **putServiceAndInit(service);if (!local) {addOrReplaceService(service);}}
}

關鍵在putServiceAndInit(service)方法中:

private void putServiceAndInit(Service service) throws NacosException {// 將服務寫入注冊表putService(service);service = getService(service.getNamespaceId(), service.getName());// 完成服務的初始化service.init();consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

進入初始化邏輯:service.init(),這個會進入Service類中:

/*** Init service.*/
public void init() {// 開啟臨時實例的心跳監測任務HealthCheckReactor.scheduleCheck(clientBeatCheckTask);// 遍歷注冊表中的集群for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);// 完成集群初識化entry.getValue().init();}
}

這里集群的初始化entry.getValue().init();會進入Cluster類型的init()方法:

/*** Init cluster.*/
public void init() {if (inited) {return;}// 創建健康檢測的任務checkTask = new HealthCheckTask(this);// 這里會開啟對 非臨時實例的 定時健康檢測HealthCheckReactor.scheduleCheck(checkTask);inited = true;
}

這里的HealthCheckReactor.scheduleCheck(checkTask);會開啟定時任務,對非臨時實例做健康檢測。檢測邏輯定義在HealthCheckTask這個類中,是一個Runnable,其中的run方法:

public void run() {
?try {if (distroMapper.responsible(cluster.getService().getName()) && switchDomain.isHealthCheckEnabled(cluster.getService().getName())) {// 開始健康檢測healthCheckProcessor.process(this);// 記錄日志 。。。}} catch (Throwable e) {// 記錄日志 。。。} finally {if (!cancelled) {// 結束后,再次進行任務調度,一定延遲后執行HealthCheckReactor.scheduleCheck(this);// 。。。}}
}

健康檢測邏輯定義在healthCheckProcessor.process(this);方法中,在HealthCheckProcessor接口中,這個接口也有很多實現,默認是TcpSuperSenseProcessor

進入TcpSuperSenseProcessor的process方法:

@Override
public void process(HealthCheckTask task) {// 獲取所有 非臨時實例的 集合List<Instance> ips = task.getCluster().allIPs(false);
?if (CollectionUtils.isEmpty(ips)) {return;}
?for (Instance ip : ips) {// 封裝健康檢測信息到 BeatBeat beat = new Beat(ip, task);// 放入一個阻塞隊列中taskQueue.add(beat);MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();}
}

可以看到,所有的健康檢測任務都被放入一個阻塞隊列,而不是立即執行了。這里又采用了異步執行的策略,可以看到Nacos中大量這樣的設計。

TcpSuperSenseProcessor本身就是一個Runnable,在它的構造函數中會把自己放入線程池中去執行,其run方法如下:

public void run() {while (true) {try {// 處理任務processTask();// ...} catch (Throwable e) {SRV_LOG.error("[HEALTH-CHECK] error while processing NIO task", e);}}
}

通過processTask來處理健康檢測的任務:

private void processTask() throws Exception {// 將任務封裝為一個 TaskProcessor,并放入集合Collection<Callable<Void>> tasks = new LinkedList<>();do {Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);if (beat == null) {return;}
?tasks.add(new TaskProcessor(beat));} while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);// 批量處理集合中的任務for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) {f.get();}
}

任務被封裝到了TaskProcessor中去執行了,TaskProcessor是一個Callable,其中的call方法:

@Override
public Void call() {// 獲取檢測任務已經等待的時長long waited = System.currentTimeMillis() - beat.getStartTime();if (waited > MAX_WAIT_TIME_MILLISECONDS) {Loggers.SRV_LOG.warn("beat task waited too long: " + waited + "ms");}SocketChannel channel = null;try {// 獲取實例信息Instance instance = beat.getIp();// 通過NIO建立TCP連接channel = SocketChannel.open();channel.configureBlocking(false);// only by setting this can we make the socket close event asynchronouschannel.socket().setSoLinger(false, -1);channel.socket().setReuseAddress(true);channel.socket().setKeepAlive(true);channel.socket().setTcpNoDelay(true);
?Cluster cluster = beat.getTask().getCluster();int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport();channel.connect(new InetSocketAddress(instance.getIp(), port));// 注冊連接、讀取事件SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);key.attach(beat);keyMap.put(beat.toString(), new BeatKey(key));
?beat.setStartTime(System.currentTimeMillis());
?GlobalExecutor.scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);} catch (Exception e) {beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(),"tcp:error:" + e.getMessage());
?if (channel != null) {try {channel.close();} catch (Exception ignore) {}}}
?return null;
}

3.3.總結

Nacos的健康檢測有兩種模式:

  • 臨時實例:

    • 采用客戶端心跳檢測模式,心跳周期5秒

    • 心跳間隔超過15秒則標記為不健康

    • 心跳間隔超過30秒則從服務列表刪除

  • 永久實例:

    • 采用服務端主動健康檢測方式

    • 周期為2000 + 5000毫秒內的隨機數

    • 檢測異常只會標記為不健康,不會刪除

那么為什么Nacos有臨時和永久兩種實例呢?

以淘寶為例,雙十一大促期間,流量會比平常高出很多,此時服務肯定需要增加更多實例來應對高并發,而這些實例在雙十一之后就無需繼續使用了,采用臨時實例比較合適。而對于服務的一些常備實例,則使用永久實例更合適。

與eureka相比,Nacos與Eureka在臨時實例上都是基于心跳模式實現,差別不大,主要是心跳周期不同,eureka是30秒,Nacos是5秒。

另外,Nacos支持永久實例,而Eureka不支持,Eureka只提供了心跳模式的健康監測,而沒有主動檢測功能。

四、服務發現。

4.服務發現

Nacos提供了一個根據serviceId查詢實例列表的接口:

接口描述:查詢服務下的實例列表

請求類型:GET

請求路徑

/nacos/v1/ns/instance/list

請求參數

名稱類型是否必選描述
serviceName字符串服務名
groupName字符串分組名
namespaceId字符串命名空間ID
clusters字符串,多個集群用逗號分隔集群名稱
healthyOnlyboolean否,默認為false是否只返回健康實例

錯誤編碼

錯誤代碼描述語義
400Bad Request客戶端請求中的語法錯誤
403Forbidden沒有權限
404Not Found無法找到資源
500Internal Server Error服務器內部錯誤
200OK正常

4.1.客戶端

4.1.1.定時更新服務列表

4.1.1.1.NacosNamingService

在2.2.4小節中,我們講到一個類NacosNamingService,這個類不僅僅提供了服務注冊功能,同樣提供了服務發現的功能。

多個重載的方法最終都會進入一個方法:

@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,boolean subscribe) throws NacosException {
?ServiceInfo serviceInfo;// 1.判斷是否需要訂閱服務信息(默認為 true)if (subscribe) {// 1.1.訂閱服務信息serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),StringUtils.join(clusters, ","));} else {// 1.2.直接去nacos拉取服務信息serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),StringUtils.join(clusters, ","));}// 2.從服務信息中獲取實例列表并返回List<Instance> list;if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {return new ArrayList<Instance>();}return list;
}
4.1.1.2.HostReactor

進入1.1.訂閱服務消息,這里是由HostReactor類的getServiceInfo()方法來實現的:

public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
?NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());// 由 服務名@@集群名拼接 keyString key = ServiceInfo.getKey(serviceName, clusters);if (failoverReactor.isFailoverSwitch()) {return failoverReactor.getService(key);}// 讀取本地服務列表的緩存,緩存是一個Map,格式:Map<String, ServiceInfo>ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);// 判斷緩存是否存在if (null == serviceObj) {// 不存在,創建空ServiceInfoserviceObj = new ServiceInfo(serviceName, clusters);// 放入緩存serviceInfoMap.put(serviceObj.getKey(), serviceObj);// 放入待更新的服務列表(updatingMap)中updatingMap.put(serviceName, new Object());// 立即更新服務列表updateServiceNow(serviceName, clusters);// 從待更新列表中移除updatingMap.remove(serviceName);
?} else if (updatingMap.containsKey(serviceName)) {// 緩存中有,但是需要更新if (UPDATE_HOLD_INTERVAL > 0) {// hold a moment waiting for update finish 等待5秒中,待更新完成synchronized (serviceObj) {try {serviceObj.wait(UPDATE_HOLD_INTERVAL);} catch (InterruptedException e) {NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);}}}}// 開啟定時更新服務列表的功能scheduleUpdateIfAbsent(serviceName, clusters);// 返回緩存中的服務信息return serviceInfoMap.get(serviceObj.getKey());
}

基本邏輯就是先從本地緩存讀,根據結果來選擇:

  • 如果本地緩存沒有,立即去nacos讀取,updateServiceNow(serviceName, clusters)

  • 如果本地緩存有,則開啟定時更新功能,并返回緩存結果:

    • scheduleUpdateIfAbsent(serviceName, clusters)

    在UpdateTask中,最終還是調用updateService方法:

不管是立即更新服務列表,還是定時更新服務列表,最終都會執行HostReactor中的updateService()方法:

public void updateService(String serviceName, String clusters) throws NacosException {ServiceInfo oldService = getServiceInfo0(serviceName, clusters);try {// 基于ServerProxy發起遠程調用,查詢服務列表String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
?if (StringUtils.isNotEmpty(result)) {// 處理查詢結果processServiceJson(result);}} finally {if (oldService != null) {synchronized (oldService) {oldService.notifyAll();}}}
}
4.1.1.3.ServerProxy

而ServerProxy的queryList方法如下:

public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)throws NacosException {// 準備請求參數final Map<String, String> params = new HashMap<String, String>(8);params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, serviceName);params.put("clusters", clusters);params.put("udpPort", String.valueOf(udpPort));params.put("clientIP", NetUtils.localIP());params.put("healthyOnly", String.valueOf(healthyOnly));// 發起請求,地址與API接口一致return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}

4.1.2.處理服務變更通知

除了定時更新服務列表的功能外,Nacos還支持服務列表變更時的主動推送功能。

在HostReactor類的構造函數中,有非常重要的幾個步驟:

基本思路是:

  • 通過PushReceiver監聽服務端推送的變更數據

  • 解析數據后,通過NotifyCenter發布服務變更的事件

  • InstanceChangeNotifier監聽變更事件,完成對服務列表的更新

4.1.2.1.PushReceiver

我們先看PushReceiver,這個類會以UDP方式接收Nacos服務端推送的服務變更數據。

先看構造函數:

public PushReceiver(HostReactor hostReactor) {try {this.hostReactor = hostReactor;// 創建 UDP客戶端String udpPort = getPushReceiverUdpPort();if (StringUtils.isEmpty(udpPort)) {this.udpSocket = new DatagramSocket();} else {this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));}// 準備線程池this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setDaemon(true);thread.setName("com.alibaba.nacos.naming.push.receiver");return thread;}});// 開啟線程任務,準備接收變更數據this.executorService.execute(this);} catch (Exception e) {NAMING_LOGGER.error("[NA] init udp socket failed", e);}
}

PushReceiver構造函數中基于線程池來運行任務。這是因為PushReceiver本身也是一個Runnable,其中的run方法業務邏輯如下:

@Override
public void run() {while (!closed) {try {// byte[] is initialized with 0 full filled by defaultbyte[] buffer = new byte[UDP_MSS];DatagramPacket packet = new DatagramPacket(buffer, buffer.length);// 接收推送數據udpSocket.receive(packet);// 解析為json字符串String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());// 反序列化為對象PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);String ack;if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {// 交給 HostReactor去處理hostReactor.processServiceJson(pushPacket.data);
?// send ack to server 發送ACK回執,略。。} catch (Exception e) {if (closed) {return;}NAMING_LOGGER.error("[NA] error while receiving push data", e);}}
}
4.1.2.2.HostReactor

通知數據的處理由交給了HostReactorprocessServiceJson方法:

public ServiceInfo processServiceJson(String json) {// 解析出ServiceInfo信息ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);String serviceKey = serviceInfo.getKey();if (serviceKey == null) {return null;}// 查詢緩存中的 ServiceInfoServiceInfo oldService = serviceInfoMap.get(serviceKey);
?// 如果緩存存在,則需要校驗哪些數據要更新boolean changed = false;if (oldService != null) {// 拉取的數據是否已經過期if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "+ serviceInfo.getLastRefTime());}// 放入緩存serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);// 中間是緩存與新數據的對比,得到newHosts:新增的實例;remvHosts:待移除的實例;// modHosts:需要修改的實例if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {// 發布實例變更的事件NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));DiskCache.write(serviceInfo, cacheDir);}
?} else {// 本地緩存不存在changed = true;// 放入緩存serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);// 直接發布實例變更的事件NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), serviceInfo.getHosts()));serviceInfo.setJsonFromServer(json);DiskCache.write(serviceInfo, cacheDir);}// 。。。return serviceInfo;
}

4.2.服務端

4.2.1.拉取服務列表接口

在2.3.1小節介紹的InstanceController中,提供了拉取服務列表的接口:

/*** Get all instance of input service.** @param request http request* @return list of instance* @throws Exception any error during list*/
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {// 從request中獲取namespaceId和serviceNameString namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);
?String agent = WebUtils.getUserAgent(request);String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);// 獲取客戶端的 UDP端口int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));String env = WebUtils.optional(request, "env", StringUtils.EMPTY);boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
?String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
?String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
?boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
?// 獲取服務列表return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,healthyOnly);
}

進入doSrvIpxt()方法來獲取服務列表:

public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent,String clusters, String clientIP,int udpPort, String env, boolean isCheck,String app, String tid, boolean healthyOnly) throws Exception {ClientInfo clientInfo = new ClientInfo(agent);ObjectNode result = JacksonUtils.createEmptyJsonNode();// 獲取服務列表信息Service service = serviceManager.getService(namespaceId, serviceName);long cacheMillis = switchDomain.getDefaultCacheMillis();
?// now try to enable the pushtry {if (udpPort > 0 && pushService.canEnablePush(agent)) {// 添加當前客戶端 IP、UDP端口到 PushService 中pushService.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),pushDataSource, tid, app);cacheMillis = switchDomain.getPushCacheMillis(serviceName);}} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);cacheMillis = switchDomain.getDefaultCacheMillis();}
?if (service == null) {// 如果沒找到,返回空if (Loggers.SRV_LOG.isDebugEnabled()) {Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);}result.put("name", serviceName);result.put("clusters", clusters);result.put("cacheMillis", cacheMillis);result.replace("hosts", JacksonUtils.createEmptyArrayNode());return result;}// 結果的檢測,異常實例的剔除等邏輯省略// 最終封裝結果并返回 。。。
?result.replace("hosts", hosts);if (clientInfo.type == ClientInfo.ClientType.JAVA&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {result.put("dom", serviceName);} else {result.put("dom", NamingUtils.getServiceName(serviceName));}result.put("name", serviceName);result.put("cacheMillis", cacheMillis);result.put("lastRefTime", System.currentTimeMillis());result.put("checksum", service.getChecksum());result.put("useSpecifiedURL", false);result.put("clusters", clusters);result.put("env", env);result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));return result;
}

4.2.2.發布服務變更的UDP通知

在上一節中,InstanceController中的doSrvIpxt()方法中,有這樣一行代碼:

pushService.addClient(namespaceId, serviceName, clusters, agent,new InetSocketAddress(clientIP, udpPort),pushDataSource, tid, app);

其實是把消費者的UDP端口、IP等信息封裝為一個PushClient對象,存儲PushService中。方便以后服務變更后推送消息。

PushService類本身實現了ApplicationListener接口:

這個是事件監聽器接口,監聽的是ServiceChangeEvent(服務變更事件)。

當服務列表變化時,就會通知我們:

4.3.總結

Nacos的服務發現分為兩種模式:

  • 模式一:主動拉取模式,消費者定期主動從Nacos拉取服務列表并緩存起來,再服務調用時優先讀取本地緩存中的服務列表。

  • 模式二:訂閱模式,消費者訂閱Nacos中的服務列表,并基于UDP協議來接收服務變更通知。當Nacos中的服務列表更新時,會發送UDP廣播給所有訂閱者。

與Eureka相比,Nacos的訂閱模式服務狀態更新更及時,消費者更容易及時發現服務列表的變化,剔除故障服務。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/211775.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/211775.shtml
英文地址,請注明出處:http://en.pswp.cn/news/211775.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

MySQL-備份+日志:介質故障與數據庫恢復

目錄 第1關&#xff1a;備份與恢復 第2關&#xff1a;備份日志&#xff1a;介質故障的發生與數據庫的恢復 第1關&#xff1a;備份與恢復 任務描述 本關任務: 備份數據庫&#xff0c;然后再恢復它。 test1_1.sh # 你寫的命令將在linux的命令行運行 # 對數據庫residents作海…

【C/C++筆試練習】多態的概念、虛函數的概念、虛表地址、派生類的虛函數、虛函數的訪問、指針引用、動態多態、完全數計算、撲克牌大小

文章目錄 C/C筆試練習選擇部分&#xff08;1&#xff09;多態的概念&#xff08;2&#xff09;虛函數的概念&#xff08;3&#xff09;虛表地址&#xff08;4&#xff09;派生類的虛函數&#xff08;5&#xff09;虛函數的訪問&#xff08;6&#xff09;分析程序&#xff08;7&…

C# WPF上位機開發(會員管理軟件)

【 聲明&#xff1a;版權所有&#xff0c;歡迎轉載&#xff0c;請勿用于商業用途。 聯系信箱&#xff1a;feixiaoxing 163.com】 好多同學都認為上位機只是純軟件開發&#xff0c;不涉及到硬件設備&#xff0c;比如聽聽音樂、看看電影、寫寫小的應用等等。如果是消費電子&#…

HibernateJPA快速搭建

1. 先創建一個普通Maven工程&#xff0c;導入依賴 <dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency><depe…

Java 匿名內部類使用的外部變量,為什么一定要加 final?

問題描述 Effectively final Java 1.8 新特性&#xff0c;對于一個局部變量或方法參數&#xff0c;如果他的值在初始化后就從未更改&#xff0c;那么該變量就是 effectively final&#xff08;事實 final&#xff09;。 這種情況下&#xff0c;可以不用加 final 關鍵字修飾。 …

報錯:Parsed mapper file: ‘file mapper.xml 導致無法啟動

報錯 &#xff1a; Logging initialized using class org.apache.ibatis.logging.stdout.StdOutImpl adapter. Registered plugin: com.github.yulichang.interceptor.MPJInterceptor3b2c8bda Parsed mapper file: file [/Mapper.xml] application無法啟動 我這邊產生原因是項…

K8S學習指南(4)-minikube的使用

文章目錄 簡介安裝 Minikube啟動 Minikube 集群基本概念創建和管理資源1. 創建 Pod2. 創建 Deployment3. 創建 Service 監視和調試1. 查看集群狀態2. 查看集群信息3. 訪問 Kubernetes Dashboard4. 使用 kubectl 命令 清理資源1. 刪除 Pod2. 刪除 Deployment3. 刪除 Service4. 停…

! [remote rejected] master -> master (pre-receive hook declined)

! [remote rejected] master -> master (pre-receive hook declined) 如圖&#xff1a; 出錯解決方法 首先輸入命令 git branch xindefenzhi然后&#xff0c;進入這個新創建的分支 git checkout branch然后重新git push就可以了

爬蟲學習-基礎庫的使用(urllib庫)

目錄 一、urllib庫介紹 二、request模塊使用 &#xff08;1&#xff09;urlopen ①data參數 ②timeout參數 &#xff08;2&#xff09;request &#xff08;3&#xff09;高級用法 ①驗證 ②代理 ③Cookie 三、處理異常 ①URLError ②HTTPError 四、解析鏈接 ①urlparse ②…

LeetCode-10. 正則表達式匹配

LeetCode-10. 正則表達式匹配 問題分析算法描述程序代碼CGo 問題分析 這道題的難點主要在于*號的匹配&#xff0c;這里記dp[i][j]表示s[1...i]和p[1...j]能否完成匹配&#xff0c;先根據特殊情況歸納總結&#xff1a; *號匹配 0 次&#xff0c;則dp[i][j] dp[i][j-2]*號匹配…

Mybatis源碼解析4:獲取Session、Mapper

Mybatis源碼解析4&#xff1a;獲取Session、Mapper 1.項目結構2. 源碼分析2.1 獲取Session DefaultSqlSessionFactory#openSession2.2 獲取Mapper DefaultSqlSession#getMapper 1.項目結構 2. 源碼分析 2.1 獲取Session DefaultSqlSessionFactory#openSession private SqlSe…

利用人工智能算法解決內存垃圾回收問題

內存垃圾回收是計算機領域中的一個重要問題&#xff0c;可以利用人工智能算法解決此問題。常用的人工智能算法包括遺傳算法、模擬退火算法、禁忌搜索算法等。 其中&#xff0c;遺傳算法是一種基于自然選擇和遺傳進化的算法&#xff0c;可以用于優化問題。在內存垃圾回收中&…

Python實戰演練之Python實現一個簡單的天氣查詢應用

今天&#xff0c;曉白給大家分享Python實現一個簡單的天氣查詢應用&#xff0c;幫助大家獲取實時的天氣信息&#xff0c;內容僅供學習交流。 首先&#xff0c;我們需要安裝一個名為"requests"的Python庫&#xff0c;它可以幫助我們發送HTTP請求并獲取響應數據。你可…

Kernel(一):基礎

本文主要討論210的kernel基礎相關知識。 內核驅動 驅動是內核中的硬件設備管理模塊,工作在內核態,程序故障可能導致內核崩潰,程序漏洞會使內核不安全 根文件系統提供根目錄,進程存放在根文件系統中,內核啟動最后會裝載根文件系統 應用程序不屬于內核,…

1828_ChibiOS中的對象FIFO

全部學習匯總&#xff1a; GreyZhang/g_ChibiOS: I found a new RTOS called ChibiOS and it seems interesting! (github.com) 1. 最初的這個理解&#xff0c;當看到后面之后就知道有點偏差了。其實&#xff0c;這個傳輸就是一個單純的FIFO而不是兩個FIFO之間的什么操作。 2.…

去掉參數中第一個“,”

記錄一下&#xff0c;前端傳參中&#xff0c;傳給我參數是“categoryIds: ,1731557494586241026,1731569816263311362,1731569855534579713,1731858335179223042,1731858366821052418” 但是后端&#xff0c;因為我的mybati是in查詢&#xff0c;所以因為第一個是“,”。所以會導…

RabbitMQ安裝在Linux系統詳細教程

安裝教程&#xff1a; 1.首先將下載好的文件上傳到服務器&#xff0c;拉到opt文件夾中(可以用xftp&#xff09; 2.輸入命令&#xff1a; cd /opt 3.安裝erlang rpm -ivh erlang-23.3.4.11-1.el7.x86_64.rpm rpm -ivh&#xff08;復制配置文件的名字&#xff09; 4.在Rab…

sap增強

四代增強 2種顯示增強1種隱式增強 隱式增強 光標放在增強點或其中的代碼點擊修改即可修改代碼 顯示增強 1.ENHANCEMENT-POINT 在代碼修改界面選擇空行 光標所在位置 可以創建多個增強實施且激活后都會執行. 2.ENHANCEMENT-SECTION 1,選中程序中空行 2.編輯->創建選項 …

回顧2023 亞馬遜云科技 re_Invent,創新AI,一路同行

作為全球云計算龍頭企業的亞馬遜云科技于2023年11月27日至12月1日在美國拉斯維加斯舉辦了2023 亞馬遜云科技 re:Invent&#xff0c;從2012年開始舉辦的亞馬遜云科技 re:Invent 全球大會,到現如今2023 亞馬遜云科技 re:Invent&#xff0c;回顧歷屆re:Invent大會&#xff0c;亞馬…

Spring 動態代理時是如何解決循環依賴的?為什么要使用三級緩存?

首先&#xff0c;我將簡單介紹一下Spring框架中的動態代理和循環依賴問題。 動態代理與循環依賴 1. 動態代理 在Spring框架中&#xff0c;動態代理是一種常用的技術&#xff0c;用于實現AOP&#xff08;面向切面編程&#xff09;。動態代理允許Spring在運行時為目標對象創建…