源碼深度剖析Eureka與Ribbon服務發現原理

本文基于 spring cloud dalston,同時文章較長,請選擇舒服姿勢進行閱讀。

Eureka 與 Ribbon 是什么?和服務發現什么關系?

Eureka 與 Ribbon 都是 Netflix 提供的微服務組件,分別用于服務注冊與發現、負載均衡。同時,這兩者均屬于 spring cloud netflix 體系,和 spring cloud 無縫集成,也正由于此被大家所熟知。

Eureka 本身是服務注冊發現組件,實現了完整的 Service Registry 和 Service Discovery。

Ribbon 則是一款負載均衡組件,那它和服務發現又有什么關系呢?負載均衡在整個微服務的調用模型中是緊挨著服務發現的,而 Ribbon 這個框架它其實是起到了開發者服務消費行為與底層服務發現組件 Eureka 之間橋梁的作用。從嚴格概念上說 Ribbon 并不是做服務發現的,但是由于 Netflix 組件的松耦合,Ribbon 需要對 Eureka 的緩存服務列表進行類似"服務發現"的行為,從而構建自己的負載均衡列表并及時更新,也就是說 Ribbon 中的"服務發現"的賓語變成了 Eureka(或其他服務發現組件)。

Eureka 的服務注冊與發現

我們會先對 Eureka 的服務發現進行描述,重點是 Eureka-client 是如何進行服務的注冊與發現的,同時不會過多停留于 Eureka 的架構、Eureka-server 的實現、Zone/Region 等范疇。

Eureka-client 的服務發現都是由 DiscoveryClient 類實現的,它主要包括的功能有:

  • 向 Eureka-server 注冊服務實例
  • 更新在 Eureka-server 的租期
  • 取消在 Eureka-server 的租約(服務下線)
  • 發現服務實例并定期更新

服務注冊

DiscoveryClient 所有的定時任務都是在 initScheduledTasks()方法里,我們可以看到以下關鍵代碼:

private void initScheduledTasks() {...if (clientConfig.shouldRegisterWithEureka()) {...// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSize...instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());}
}

我們可以看到在 if 判斷分支里創建了一個 instanceInfoReplicator 實例,它會通過 start 執行一個定時任務:

public void run() {try {discoveryClient.refreshInstanceInfo();Long dirtyTimestamp = instanceInfo.isDirtyWithTime();if (dirtyTimestamp != null) {discoveryClient.register();instanceInfo.unsetIsDirty(dirtyTimestamp);}} catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);} finally {Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}}

我們可以在 InstanceInfoReplicator 類的 run()方法中找到這一段,同時可以一眼發現其注冊關鍵點在于discoveryClient.register()這段,我們點進去看看:

boolean register() throws Throwable {logger.info(PREFIX + appPathIdentifier + ": registering service...");EurekaHttpResponse<Void> httpResponse;try {httpResponse = eurekaTransport.registrationClient.register(instanceInfo);} catch (Exception e) {logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);throw e;}if (logger.isInfoEnabled()) {logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());}return httpResponse.getStatusCode() == 204;}

這邊可以發現是通過 HTTP REST (jersey 客戶端)請求的方式將 instanceInfo 實例信息注冊到 Eureka-server 上。我們簡單看一下 InstanceInfo 對象,屬性基本上都能見名知義:

@JsonCreatorpublic InstanceInfo(@JsonProperty("instanceId") String instanceId,@JsonProperty("app") String appName,@JsonProperty("appGroupName") String appGroupName,@JsonProperty("ipAddr") String ipAddr,@JsonProperty("sid") String sid,@JsonProperty("port") PortWrapper port,@JsonProperty("securePort") PortWrapper securePort,@JsonProperty("homePageUrl") String homePageUrl,@JsonProperty("statusPageUrl") String statusPageUrl,@JsonProperty("healthCheckUrl") String healthCheckUrl,@JsonProperty("secureHealthCheckUrl") String secureHealthCheckUrl,@JsonProperty("vipAddress") String vipAddress,@JsonProperty("secureVipAddress") String secureVipAddress,@JsonProperty("countryId") int countryId,@JsonProperty("dataCenterInfo") DataCenterInfo dataCenterInfo,@JsonProperty("hostName") String hostName,@JsonProperty("status") InstanceStatus status,@JsonProperty("overriddenstatus") InstanceStatus overriddenstatus,@JsonProperty("leaseInfo") LeaseInfo leaseInfo,@JsonProperty("isCoordinatingDiscoveryServer") Boolean isCoordinatingDiscoveryServer,@JsonProperty("metadata") HashMap<String, String> metadata,@JsonProperty("lastUpdatedTimestamp") Long lastUpdatedTimestamp,@JsonProperty("lastDirtyTimestamp") Long lastDirtyTimestamp,@JsonProperty("actionType") ActionType actionType,@JsonProperty("asgName") String asgName) {this.instanceId = instanceId;this.sid = sid;this.appName = StringCache.intern(appName);this.appGroupName = StringCache.intern(appGroupName);this.ipAddr = ipAddr;this.port = port == null ? 0 : port.getPort();this.isUnsecurePortEnabled = port != null && port.isEnabled();this.securePort = securePort == null ? 0 : securePort.getPort();this.isSecurePortEnabled = securePort != null && securePort.isEnabled();this.homePageUrl = homePageUrl;this.statusPageUrl = statusPageUrl;this.healthCheckUrl = healthCheckUrl;this.secureHealthCheckUrl = secureHealthCheckUrl;this.vipAddress = StringCache.intern(vipAddress);this.secureVipAddress = StringCache.intern(secureVipAddress);this.countryId = countryId;this.dataCenterInfo = dataCenterInfo;this.hostName = hostName;this.status = status;this.overriddenstatus = overriddenstatus;this.leaseInfo = leaseInfo;this.isCoordinatingDiscoveryServer = isCoordinatingDiscoveryServer;this.lastUpdatedTimestamp = lastUpdatedTimestamp;this.lastDirtyTimestamp = lastDirtyTimestamp;this.actionType = actionType;this.asgName = StringCache.intern(asgName);// ---------------------------------------------------------------// for compatibilityif (metadata == null) {this.metadata = Collections.emptyMap();} else if (metadata.size() == 1) {this.metadata = removeMetadataMapLegacyValues(metadata);} else {this.metadata = metadata;}if (sid == null) {this.sid = SID_DEFAULT;}}

總結一下整個過程如下:

服務續期

服務續期說起來可能比較晦澀,其實就是在 client 端定時發起調用,讓 Eureka-server 知道自己還活著,在 eureka 代碼中的注釋解釋為心跳(heart-beat)。

這里有兩個比較重要的配置需要注意:

  • instance.leaseRenewalIntervalInSeconds
    表示客戶端的更新頻率,默認 30s,也就是每 30s 就會向 Eureka-server 發起 renew 更新操作。
  • instance.leaseExpirationDurationInSeconds
    這是服務端視角的失效時間,默認是 90s,也就是 Eureka-server 在 90s 內沒有接收到來自 client 的 renew 操作就會將其剔除。

我們直接從代碼角度看一下,同樣呢相關定時任務在 initScheduledTasks()方法中:

private void initScheduledTasks() {...if (clientConfig.shouldRegisterWithEureka()) {...// Heartbeat timerscheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()),renewalIntervalInSecs, TimeUnit.SECONDS);...}
}

可以看到這里創建了一個 HeartbeatThread()線程執行操作:

private class HeartbeatThread implements Runnable {public void run() {if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();}}}

我們直接看 renew()方法:

    boolean renew() {EurekaHttpResponse<InstanceInfo> httpResponse;try {httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());if (httpResponse.getStatusCode() == 404) {REREGISTER_COUNTER.increment();logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());return register();}return httpResponse.getStatusCode() == 200;} catch (Throwable e) {logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);return false;}}

這里比較簡單,可以發現和服務注冊是類似的,同樣使用 HTTP REST 發起一個 hearbeat 請求,底層使用 jersey 客戶端。
總結一下整個過程如下:

服務注銷

服務注銷邏輯比較簡單,本身并不在定時任務中觸發,而是通過對方法標記@PreDestroy,從而調用 shutdown 方法觸發,最終會調用 unRegister()方法進行注銷,同樣的這也是一個 HTTP REST 請求,可以簡單看下代碼:

    @PreDestroy@Overridepublic synchronized void shutdown() {if (isShutdown.compareAndSet(false, true)) {logger.info("Shutting down DiscoveryClient ...");if (statusChangeListener != null && applicationInfoManager != null) {applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());}cancelScheduledTasks();// If APPINFO was registeredif (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka()) {applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);unregister();}if (eurekaTransport != null) {eurekaTransport.shutdown();}heartbeatStalenessMonitor.shutdown();registryStalenessMonitor.shutdown();logger.info("Completed shut down of DiscoveryClient");}}/*** unregister w/ the eureka service.*/void unregister() {// It can be null if shouldRegisterWithEureka == falseif(eurekaTransport != null && eurekaTransport.registrationClient != null) {try {logger.info("Unregistering ...");EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());logger.info(PREFIX + appPathIdentifier + " - deregister  status: " + httpResponse.getStatusCode());} catch (Exception e) {logger.error(PREFIX + appPathIdentifier + " - de-registration failed" + e.getMessage(), e);}}}

服務發現及更新

我們來看作為服務消費者的關鍵邏輯,即發現服務以及更新服務。

首先 consumer 會在啟動時從 Eureka-server 獲取所有的服務列表,并在本地緩存。同時呢,由于本地有一份緩存,所以需要定期更新,更新頻率可以配置。

啟動時候在 consumer 在 discoveryClient 中會調用 fetchRegistry() 方法:

private boolean fetchRegistry(boolean forceFullRegistryFetch) {...if (clientConfig.shouldDisableDelta()|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))|| forceFullRegistryFetch|| (applications == null)|| (applications.getRegisteredApplications().size() == 0)|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta{...getAndStoreFullRegistry();} else {getAndUpdateDelta(applications);}...
}

這里可以看到 fetchRegistry 里有 2 個判斷分支,對應首次更新以及后續更新。首次更新會調用 getAndStoreFullRegistry()方法,我們看一下:

 private void getAndStoreFullRegistry() throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();logger.info("Getting all instance registry info from the eureka server");Applications apps = null;EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()): eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {apps = httpResponse.getEntity();}logger.info("The response status is {}", httpResponse.getStatusCode());if (apps == null) {logger.error("The application is null for some reason. Not storing this information");} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {localRegionApps.set(this.filterAndShuffle(apps));logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());} else {logger.warn("Not updating applications as another thread is updating it already");}}

可以看到和之前類似,如果在沒有特殊指定的情況下,我們會發起一個 HTTP REST 請求拉取所有應用的信息并進行緩存,緩存對象為 Applications,有興趣的可以進一步查看。

接下來,在我們熟悉的 initScheduledTasks()方法中,我們還會啟動一個更新應用信息緩存的 task:

private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();scheduler.schedule(new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread()),registryFetchIntervalSeconds, TimeUnit.SECONDS);}...
}

在 CacheRefreshThread()這個 task 的 run 方法中,仍然會調用到我們之前的 fetchRegistry()方法,同時在判斷時會走到另一個分支中,即調用到 getAndUpdateDelta()方法:

private void getAndUpdateDelta(Applications applications) throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();Applications delta = null;EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {delta = httpResponse.getEntity();}if (delta == null) {logger.warn("The server does not allow the delta revision to be applied because it is not safe. "+ "Hence got the full registry.");getAndStoreFullRegistry();} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());String reconcileHashCode = "";if (fetchRegistryUpdateLock.tryLock()) {try {updateDelta(delta);reconcileHashCode = getReconcileHashCode(applications);} finally {fetchRegistryUpdateLock.unlock();}} else {logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");}// There is a diff in number of instances for some reasonif (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall}} else {logger.warn("Not updating application delta as another thread is updating it already");logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());}}

可以看到,這邊是使用 HTTP REST 發起一個 getDelta 請求,同時在 updateDelta()方法中會更新本地的 Applications 緩存對象。

總結一下,整個服務發現與更新的過程如下:

Ribbon 的"服務發現"

接下來我們來看看 Ribbon 是怎么基于 Eureka 進行"服務發現"的,我們之前說過這里的"服務發現"并不是嚴格意義上的服務發現,而是 Ribbon 如何基于 Eureka 構建自己的負載均衡列表并及時更新,同時我們也不關注 Ribbon 其他負載均衡的具體邏輯(包括 IRule 路由,IPing 判斷可用性)。

我們可以先做一些猜想,首先 Ribbon 肯定是基于 Eureka 的服務發現的。我們上邊描述了 Eureka 會拉取所有服務信息到本地緩存 Applications 中,那么 Ribbon 肯定是基于這個 Applications 緩存來構建負載均衡列表的了,同時呢,負載均衡列表同樣需要一個定時更新的機制來保證一致性。

服務調用

首先我們從開發者的最初使用上看,在開發者在 RestTemplate 上開啟@LoadBalanced 注解就可開啟 Ribbon 的邏輯了,顯然這是用了類似攔截的方法。在 LoadBalancerAutoConfiguration 類中,我們可以看到相關代碼:

...
@Beanpublic SmartInitializingSingleton loadBalancedRestTemplateInitializer(final List<RestTemplateCustomizer> customizers) {return new SmartInitializingSingleton() {@Overridepublic void afterSingletonsInstantiated() {for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {for (RestTemplateCustomizer customizer : customizers) {customizer.customize(restTemplate);}}}};}@Configuration@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")static class LoadBalancerInterceptorConfig {@Beanpublic LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient,LoadBalancerRequestFactory requestFactory) {return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);}@Bean@ConditionalOnMissingBeanpublic RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {return new RestTemplateCustomizer() {@Overridepublic void customize(RestTemplate restTemplate) {List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());list.add(loadBalancerInterceptor);restTemplate.setInterceptors(list);}};}}
...

可以看到,在初始化的過程中通過調用 customize()方法來給 RestTemplate 增加了攔截器 LoadBalancerInterceptor。而 LoadBalancerInterceptor 則是在攔截方法中使用了 loadBalancer(RibbonLoadBalancerClient 類) 完成請求調用:

@Overridepublic ClientHttpResponse intercept(final HttpRequest request, final byte[] body,final ClientHttpRequestExecution execution) throws IOException {final URI originalUri = request.getURI();String serviceName = originalUri.getHost();Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));}

服務發現

到現在為止呢,我們的請求調用已經被 RibbonLoadBalancerClient 所封裝,而其"服務發現"也是發生在 RibbonLoadBalancerClient 中的。

我們點到其 execute()方法中:

@Overridepublic <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {ILoadBalancer loadBalancer = getLoadBalancer(serviceId);Server server = getServer(loadBalancer);if (server == null) {throw new IllegalStateException("No instances available for " + serviceId);}RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,serviceId), serverIntrospector(serviceId).getMetadata(server));return execute(serviceId, ribbonServer, request);}

這里根據 serviceId 構建了一個 ILoadBalancer,同時從 loadBalancer 中獲取到了最終的實例 server 信息。ILoadBalancer 是定義了負載均衡的一個接口,它的關鍵方法 chooseServer()即是從負載均衡列表根據路由規則中選取一個 server。當然我們主要關心的點在于,負載均衡列表是怎么構建出來的。通過源碼跟蹤我們發現,在通過 getLoadBalancer()方法構建好 ILoadBalancer 對象后,對象中就已經包含了服務列表。所以我們來看看 ILoadBalancer 對象是怎么創建的:

	protected ILoadBalancer getLoadBalancer(String serviceId) {return this.clientFactory.getLoadBalancer(serviceId);}

那么這里其實是 springcloud 封裝的 clientFactory,它會在 applicationContext 容器中尋找對應的 bean 。

通過源碼追蹤,我們可以在自動配置類 RibbonClientConfiguration 中找到對應代碼:

	@Bean@ConditionalOnMissingBeanpublic ILoadBalancer ribbonLoadBalancer(IClientConfig config,ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,IRule rule, IPing ping, ServerListUpdater serverListUpdater) {if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {return this.propertiesFactory.get(ILoadBalancer.class, config, name);}return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,serverListFilter, serverListUpdater);}

我們看到這里最終構建了 ILoadBalancer,其實現類是 ZoneAwareLoadBalancer,我們觀察其超類的初始化:

    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,ServerList<T> serverList, ServerListFilter<T> filter,ServerListUpdater serverListUpdater) {super(clientConfig, rule, ping);this.serverListImpl = serverList;this.filter = filter;this.serverListUpdater = serverListUpdater;if (filter instanceof AbstractServerListFilter) {((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());}restOfInit(clientConfig);}

這邊最終執行了 restOfInit()方法,進一步跟蹤:

    void restOfInit(IClientConfig clientConfig) {boolean primeConnection = this.isEnablePrimingConnections();// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()this.setEnablePrimingConnections(false);enableAndInitLearnNewServersFeature();updateListOfServers();if (primeConnection && this.getPrimeConnections() != null) {this.getPrimeConnections().primeConnections(getReachableServers());}this.setEnablePrimingConnections(primeConnection);LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());}

updateListOfServers()方法是獲取所有的 ServerList 的,最終由 serverListImpl.getUpdatedListOfServers()獲取所有的服務列表,在此 serverListImpl 即實現類為 DiscoveryEnabledNIWSServerList。其中 DiscoveryEnabledNIWSServerList 有 getInitialListOfServers()和 getUpdatedListOfServers()方法,具體代碼如下

    @Overridepublic List<DiscoveryEnabledServer> getInitialListOfServers(){return obtainServersViaDiscovery();}@Overridepublic List<DiscoveryEnabledServer> getUpdatedListOfServers(){return obtainServersViaDiscovery();}

此時我們查看 obtainServersViaDiscovery()方法,已經基本接近于事物本質了,它創建了一個 EurekaClient 對象,在此就是 Eureka 的 DiscoveryClient 實現類,調用了其 getInstancesByVipAddress()方法,它最終從 DiscoveryClient 的 Applications 緩存中根據 serviceId 選取了對應的服務信息:

    private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {logger.warn("EurekaClient has not been initialized yet, returning an empty list");return new ArrayList<DiscoveryEnabledServer>();}EurekaClient eurekaClient = eurekaClientProvider.get();if (vipAddresses!=null){for (String vipAddress : vipAddresses.split(",")) {// if targetRegion is null, it will be interpreted as the same region of clientList<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);for (InstanceInfo ii : listOfInstanceInfo) {if (ii.getStatus().equals(InstanceStatus.UP)) {if(shouldUseOverridePort){if(logger.isDebugEnabled()){logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);}// copy is necessary since the InstanceInfo builder just uses the original reference,// and we don't want to corrupt the global eureka copy of the object which may be// used by other clients in our systemInstanceInfo copy = new InstanceInfo(ii);if(isSecure){ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();}else{ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();}}DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);des.setZone(DiscoveryClient.getZone(ii));serverList.add(des);}}if (serverList.size()>0 && prioritizeVipAddressBasedServers){break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers}}}return serverList;}

服務更新

我們已經知道初次啟動時,Ribbon 是怎么結合 Eureka 完成負載均衡列表的構建了,那么與 Eureka 類似,我們還需要及時對服務列表進行更新以保證一致性。

在 RibbonClientConfiguration 自動配置類中構建 ILoadBalancer 時我們可以看到其構造器中有 ServerListUpdater 對象,而此對象也是在當前類中構建的:

	@Bean@ConditionalOnMissingBeanpublic ServerListUpdater ribbonServerListUpdater(IClientConfig config) {return new PollingServerListUpdater(config);}

我們觀察此對象中的 start()方法看是如何完成更新的:

@Overridepublic synchronized void start(final UpdateAction updateAction) {if (isActive.compareAndSet(false, true)) {final Runnable wrapperRunnable = new Runnable() {@Overridepublic void run() {if (!isActive.get()) {if (scheduledFuture != null) {scheduledFuture.cancel(true);}return;}try {updateAction.doUpdate();lastUpdated = System.currentTimeMillis();} catch (Exception e) {logger.warn("Failed one update cycle", e);}}};scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(wrapperRunnable,initialDelayMs,refreshIntervalMs,TimeUnit.MILLISECONDS);} else {logger.info("Already active, no-op");}}

這里有 2 個配置,即 initialDelayMs 首次檢測默認 1s,refreshIntervalMs 檢測間隔默認 30s(和 Eureka 一致),創建了一個定時任務,執行 updateAction.doUpdate()方法。

我們回到之前的 restOfInit()方法,查看其中的 enableAndInitLearnNewServersFeature()方法,可以看到是在此處觸發了 ServerListUpdater 的 start 方法,同時傳入了 updateAction 對象:

    public void enableAndInitLearnNewServersFeature() {LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());serverListUpdater.start(updateAction);}

其實 updateAction 一開始就已經創建好了,它仍然是調用 之前的 updateListOfServers 方法來進行后續的更新:

    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {@Overridepublic void doUpdate() {updateListOfServers();}};

總結一下 Ribbon 三部分服務發現的整體流程如下:

參考資料

深度剖析服務發現組件 Netflix Eureka
深入理解 Ribbon 之源碼解析


---------------------
作者:YouluBank
來源:CSDN
原文:https://blog.csdn.net/m0_37787662/article/details/109286790
版權聲明:本文為作者原創文章,轉載請附上博文鏈接!
內容解析By:CSDN,CNBLOG博客文章一鍵轉載插件

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/283980.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/283980.shtml
英文地址,請注明出處:http://en.pswp.cn/news/283980.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

3月6日云棲精選夜讀:如何實現32.5萬筆/秒的交易峰值?阿里交易系統TMF2.0技術揭秘...

交易平臺遇到的挑戰 2017雙11&#xff0c;交易峰值達到了32.5萬筆/秒&#xff0c;這給整個交易系統帶來了非常大的挑戰。 一方面&#xff0c;系統需要支撐全集團幾十個事業部的所有交易類需求&#xff1a;要考慮如何能更快響應需求、加快發布周期&#xff1b;如何能為新小業務提…

std的find和reverse_iterator聯合使用

上代碼&#xff1a; // test2013.cpp : 定義控制臺應用程序的入口點。 //#include "stdafx.h" #include <stdlib.h> #include <stdio.h> #include<iostream> #include<vector> #include<map> #include<string> using namespace …

論如何提升學習的能力

為啥要學習如果有一件事情是能改變你自己的&#xff0c;我想這件事情必然就是學習&#xff0c;我的人生重要的轉折點也是從學習這件事情始發的&#xff0c;那么&#xff0c;我們就從這里開始。學習不僅僅是為了找到答案&#xff0c;而是為了找到方法&#xff0c;找到一個可以找…

linux下svn常用指令

windows下的TortoiseSVN是資源管理器的一個插件&#xff0c;以覆蓋圖標表示文件狀態&#xff0c;幾乎所以命令都有圖形界面支持&#xff0c;比較好用&#xff0c;這里就不多說。主要說說linux下svn的使用&#xff0c;因為linux下大部分的操作都是通過命令行來進行&#xff0c;所…

CSS布局解決方案(終結版)

前端布局非常重要的一環就是頁面框架的搭建&#xff0c;也是最基礎的一環。在頁面框架的搭建之中&#xff0c;又有居中布局、多列布局以及全局布局&#xff0c;今天我們就來總結總結前端干貨中的CSS布局。 居中布局 水平居中 1&#xff09;使用inline-blocktext-align&#xff…

基于ABP和Magicodes實現Excel導出操作

前端使用的vue-element-admin框架&#xff0c;后端使用ABP框架&#xff0c;Excel導出使用的Magicodes.IE.Excel.Abp庫。Excel導入和導出操作幾乎一樣&#xff0c;不再介紹。文本主要介紹Excel導出操作和過程中遇到的坑&#xff0c;主要是Excel文件導出后無法打開的問題。一.Mag…

消息模式在實際開發應用中的優勢

曾經.NET面試過程中經常問的一個問題是&#xff0c;如果程序集A&#xff0c;引用B &#xff0c;B 引用C&#xff0c;那么C怎么去訪問A中的方法呢。 這個問題初學.net可能一時想不出該咋處理&#xff0c;這涉及到循環引用問題。但有點經驗的可能就簡單了&#xff0c;通過委托的方…

微服務:注冊中心ZooKeeper、Eureka、Consul 、Nacos對比

前言 服務注冊中心本質上是為了解耦服務提供者和服務消費者。對于任何一個微服務&#xff0c;原則上都應存在或者支持多個提供者&#xff0c;這是由微服務的分布式屬性決定的。更進一步&#xff0c;為了支持彈性擴縮容特性&#xff0c;一個微服務的提供者的數量和分布往往是動…

MyBatis總結七:動態sql和sql片段

開發中&#xff0c;sql拼接很常見&#xff0c;所以說一下動態sql&#xff1a; 1if2chose,when,otherwise3where,set4foreach用法解析(現有一張users表 內有id username age 三個字段)&#xff1a; <!--查詢所有用戶&#xff0c;傳遞參數type&#xff0c;如果值為0&#xff0…

iOS - OC Copy 拷貝

前言 copy&#xff1a;需要先實現 NSCopying 協議&#xff0c;創建的是不可變副本。mutableCopy&#xff1a;需要實現 NSMutableCopying 協議&#xff0c;創建的是可變副本。淺拷貝&#xff1a;指針拷貝&#xff0c;源對象和副本指向的是同一個對象。對象的引用計數器 &#xf…

三.選擇結構(一)

1.if結構: if(條件){ 代碼塊 } 2.隨機產生數: int randon (int)(Math.random()*10); 3.多重if選擇結構: if(條件1){ 代碼塊1 }else if (條件2){ 代碼塊2 }else{ 代碼塊3 } 4.嵌套if選擇結構: if(條件1){ if(條件2){ 代碼塊1 }else{ 代碼塊2 } }else{ 代碼塊3 } 轉載于:https://…

為了高性能、超大規模的模型訓練,這個組合“出道”了

點擊上方藍字關注我們&#xff08;本文閱讀時間&#xff1a;3分鐘)近年來&#xff0c;在大量數據上訓練的基于 transformer 的大規模深度學習模型在多項認知任務中取得了很好的成果&#xff0c;并且被使用到一些新產品和功能背后&#xff0c;進一步增強了人類的能力。在過去五年…

SVN就是這么簡單

什么是SVN SVN全稱&#xff1a;Subversion&#xff0c;是一個開放源代碼的版本控制系統 Svn是一種集中式文件版本管理系統。集中式代碼管理的核心是服務器&#xff0c;所有開發者在開始新一天的工作之前必須從服務器獲取代碼&#xff0c;然后開發&#xff0c;最后解決沖突&…

SpringCloud必會知識點大全

為什么要學習Spring Cloud 在項目開發中隨著業務越來越多&#xff0c;導致功能之間耦合性高、開發效率低、系統運行緩慢難以維護、不穩定。微服務 架構可以解決這些問題&#xff0c;而Spring Cloud是微服務架構最流行的實現. 1.微服務 微服務架構是使用一套小服務來開發單個應用…

thinkphp3.22 多項目配置

1.index.php if(version_compare(PHP_VERSION,5.3.0,<)) die(require PHP > 5.3.0 !); // 開啟調試模式 建議開發階段開啟 部署階段注釋或者設為false define(APP_DEBUG,true); // 創建 //define(BIND_MODULE,Login); define(erp,true); // 定義應用目錄 define(APP_PAT…

30分鐘掌握 C#7

1. out 變量&#xff08;out variables&#xff09; 以前我們使用out變量必須在使用前進行聲明&#xff0c;C# 7.0 給我們提供了一種更簡潔的語法 “使用時進行內聯聲明” 。如下所示&#xff1a; 1 var input ReadLine(); 2 if (int.TryParse(input, out var result)) 3 …

在 C# 中如何檢查參數是否為 null

前言前不久&#xff0c;微軟宣布從 C# 11 中移除參數空值檢查功能&#xff0c;該功能允許在方法開始執行之前&#xff0c;在參數名稱的末尾提供參數空值檢查&#xff08;!!操作符&#xff09;。那么&#xff0c;在 C# 中如何檢查參數是否為 null 呢&#xff1f;1. null這個可能…

什么是Maven快照(SNAPSHOT)

本文來說下Maven的SNAPSHOT版本有什么作用 文章目錄 問題解決 正式版本"abc-1.0"快照版本"abc-1.0-SNAPSHOT"本文小結問題 在使用maven進行依賴管理時&#xff0c;有的版本號后面會帶有"-SNAPSHOT"&#xff0c;有什么作用呢&#xff1f; <dep…

帶你剖析WebGis的世界奧秘----Geojson數據加載(高級)

前言&#xff1a;前兩周我帶你們分析了WebGis中關鍵步驟瓦片加載點擊事件&#xff08;具體的看前兩篇文章&#xff09;&#xff0c;下面呢&#xff0c;我帶大家來看看Geojson的加載及其點擊事件 Geojson數據解析 GeoJSON是一種對各種地理數據結構進行編碼的格式。GeoJSON對象可…

如果要存ip地址,用什么數據類型比較好

在看高性能MySQL第3版&#xff08;4.1.7節&#xff09;時&#xff0c;作者建議當存儲IPv4地址時&#xff0c;應該使用32位的無符號整數&#xff08;UNSIGNED INT&#xff09;來存儲IP地址&#xff0c;而不是使用字符串。但是沒有給出具體原因。為了搞清楚這個原因&#xff0c;查…