大綱
7.服務端對服務實例進行健康檢查
8.服務下線如何注銷注冊表和客戶端等信息
9.事件驅動架構源碼分析
7.服務端對服務實例進行健康檢查
(1)服務端對服務實例進行健康檢查的設計邏輯
(2)服務端對服務實例進行健康檢查的源碼
(3)服務端檢查服務實例不健康后的注銷處理
(1)服務端對服務實例進行健康檢查的設計邏輯
一.首先會獲取所有客戶端的Connection連接對象
Connection連接對象里有個屬性叫lastActiveTime,表示的是最后存活時間。
二.然后判斷當前時間-最后存活時間是否大于20s
如果大于,則把該Connection連接對象的connectionId放入到一個集合里。這個集合是一個名為outDatedConnections的待移除集合Set,此時該Connection連接對象并不會馬上刪除。
三.當判斷完全部的Connection連接對象后會遍歷outDatedConnections集合
向遍歷到的Connection連接對象發起一次請求,確認是否真的下線。如果響應成功,則往successConnections集合中添加connectionId,并且刷新Connection連接對象的lastActiveTime屬性。這個機制有一個專業的名稱叫做:探活機制。
四.遍歷待移除集合進行注銷并且在注銷之前先判斷一下是否探活成功
也就是connectionId存在于待移除集合outDatedConnections中,但是不存在于探活成功集合successConnections中,那么這個connectionId對應的客戶端就會被注銷掉。
(2)服務端對服務實例進行健康檢查的源碼
對服務實例進行健康檢查的源碼入口是ConnectionManager的start()方法。
@Service
public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> {Map<String, Connection> connections = new ConcurrentHashMap<>();...//Start Task:Expel the connection which active Time expire.@PostConstructpublic void start() {//Start UnHealthy Connection Expel Task.RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {...//一.首先獲取所有的連接Set<Map.Entry<String, Connection>> entries = connections.entrySet();...//二.然后判斷客戶端是否超過20s沒有發來心跳信息了,如果是則會將clientId加入outDatedConnections集合中Set<String> outDatedConnections = new HashSet<>();long now = System.currentTimeMillis();for (Map.Entry<String, Connection> entry : entries) {Connection client = entry.getValue();String clientIp = client.getMetaInfo().getClientIp();AtomicInteger integer = expelForIp.get(clientIp);if (integer != null && integer.intValue() > 0) {integer.decrementAndGet();expelClient.add(client.getMetaInfo().getConnectionId());expelCount--;} else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {//判斷心跳時間//添加到待移除列表outDatedConnections.add(client.getMetaInfo().getConnectionId());}}...//client active detection.//三.初次檢測完超過20s的Connection連接對象后,并不會立馬進行刪除,而是進行探活,服務端主動請求客戶端,來確認是否真的下線Loggers.REMOTE_DIGEST.info("Out dated connection ,size={}", outDatedConnections.size());if (CollectionUtils.isNotEmpty(outDatedConnections)) {Set<String> successConnections = new HashSet<>();final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());//遍歷超過20s沒有心跳的客戶端clientIdfor (String outDateConnectionId : outDatedConnections) {try {Connection connection = getConnection(outDateConnectionId);if (connection != null) {ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();//調用GrpcConnection.asyncRequest()方法異步發送請求connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {@Overridepublic Executor getExecutor() {return null;}@Overridepublic long getTimeout() {return 1000L;}@Overridepublic void onResponse(Response response) {latch.countDown();if (response != null && response.isSuccess()) {//響應成功刷新心跳時間connection.freshActiveTime();//并且加入到探活成功的集合列表中successConnections.add(outDateConnectionId);}}@Overridepublic void onException(Throwable e) {latch.countDown();}});Loggers.REMOTE_DIGEST.info("[{}]send connection active request ", outDateConnectionId);} else {latch.countDown();} } catch (ConnectionAlreadyClosedException e) {latch.countDown();} catch (Exception e) {Loggers.REMOTE_DIGEST.error("[{}]Error occurs when check client active detection ,error={}", outDateConnectionId, e);latch.countDown();}}latch.await(3000L, TimeUnit.MILLISECONDS);Loggers.REMOTE_DIGEST.info("Out dated connection check successCount={}", successConnections.size());//經過探活還是不成功的Connection連接對象,就準備進行移除了//遍歷20s沒有心跳的客戶端,準備移除客戶端信息for (String outDateConnectionId : outDatedConnections) {//判斷探活是否成功,如果成功了則不需要移除if (!successConnections.contains(outDateConnectionId)) {Loggers.REMOTE_DIGEST.info("[{}]Unregister Out dated connection....", outDateConnectionId);//執行客戶端注銷邏輯unregister(outDateConnectionId);}}}...}}, 1000L, 3000L, TimeUnit.MILLISECONDS);}...
}
(3)服務端檢查服務實例不健康后的注銷處理
進行注銷處理的方法是ConnectionManager的unregister()方法。該方法主要會移除Connection連接對象 + 清除一些數據,以及發布一個ClientDisconnectEvent客戶端注銷事件。
@Service
public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> {private Map<String, AtomicInteger> connectionForClientIp = new ConcurrentHashMap<String, AtomicInteger>(16);Map<String, Connection> connections = new ConcurrentHashMap<>();@Autowiredprivate ClientConnectionEventListenerRegistry clientConnectionEventListenerRegistry;...//unregister a connection .public synchronized void unregister(String connectionId) {//移除客戶端信息Connection remove = this.connections.remove(connectionId);if (remove != null) {String clientIp = remove.getMetaInfo().clientIp;AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);if (atomicInteger != null) {int count = atomicInteger.decrementAndGet();if (count <= 0) {connectionForClientIp.remove(clientIp);}}remove.close();Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId);//通知客戶端注銷連接clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);}}...
}@Service
public class ClientConnectionEventListenerRegistry {...//notify where a new client disconnected.public void notifyClientDisConnected(final Connection connection) {for (ClientConnectionEventListener clientConnectionEventListener : clientConnectionEventListeners) {try {//調用ConnectionBasedClientManager.clientDisConnected()方法clientConnectionEventListener.clientDisConnected(connection);} catch (Throwable throwable) {Loggers.REMOTE.info("[NotifyClientDisConnected] failed for listener {}", clientConnectionEventListener.getName(), throwable);}}}...
}@Component("connectionBasedClientManager")
public class ConnectionBasedClientManager extends ClientConnectionEventListener implements ClientManager {private final ConcurrentMap<String, ConnectionBasedClient> clients = new ConcurrentHashMap<>();...@Overridepublic boolean clientDisconnected(String clientId) {Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);ConnectionBasedClient client = clients.remove(clientId);if (null == client) {return true;}client.release();//最后發布客戶端注銷事件NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));return true;}...
}
ClientDisconnectEvent客戶端注銷事件會被兩個監聽響應:一是ClientServiceIndexesManager的onEvent()方法用來移除注冊表 + 訂閱表信息,二是DistroClientDataProcessor的onEvent()方法用來同步服務實例被注銷后的數據。
@Component
public class ClientServiceIndexesManager extends SmartSubscriber {//注冊表(服務提供者),一個Service服務對象,對應多個服務實例的clientIdprivate final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();//訂閱者列表(服務消費者),一個Service服務對象,對應多個訂閱者的clientIdprivate final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();...@Overridepublic void onEvent(Event event) {if (event instanceof ClientEvent.ClientDisconnectEvent) {handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);} else if (event instanceof ClientOperationEvent) {handleClientOperation((ClientOperationEvent) event);}}private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {Client client = event.getClient();for (Service each : client.getAllSubscribeService()) {//移除訂閱者列表的元素removeSubscriberIndexes(each, client.getClientId());}for (Service each : client.getAllPublishedService()) {//移除注冊表的元素removePublisherIndexes(each, client.getClientId());}}private void removePublisherIndexes(Service service, String clientId) {if (!publisherIndexes.containsKey(service)) {return;}publisherIndexes.get(service).remove(clientId);NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));}private void removeSubscriberIndexes(Service service, String clientId) {if (!subscriberIndexes.containsKey(service)) {return;}subscriberIndexes.get(service).remove(clientId);if (subscriberIndexes.get(service).isEmpty()) {subscriberIndexes.remove(service);}}...
}public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {...@Overridepublic void onEvent(Event event) {...if (event instanceof ClientEvent.ClientVerifyFailedEvent) {syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {syncToAllServer((ClientEvent) event);}}private void syncToAllServer(ClientEvent event) {Client client = event.getClient();//Only ephemeral data sync by Distro, persist client should sync by raft.//臨時實例使用Distro協議,持久化實例使用Raft協議//ClientManager.isResponsibleClient()方法,判斷只有該client的責任節點才能進行集群數據同步if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {//如果event是客戶端注銷實例時需要進行集群節點同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {//如果event是客戶端注冊實例時需要進行集群節點同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}}...
}
(4)總結
在Nacos 1.4.1版本中的服務健康檢查:是15s沒心跳則把健康狀態修改為不健康,30s沒心跳則把實例對象移除。
在Nacos 2.1.0版本中的服務健康檢查:是20s沒心跳則把客戶端放入一個過期集合,此時并不移除客戶端連接。由于引入了gRPC長連接,所以可以新增探活機制檢查過期集合中的連接。服務端發送探活請求給客戶端時的代價并不大,可確保客戶端下線。
8.服務下線如何注銷注冊表和客戶端等信息
(1)客戶端發出服務下線請求的源碼
(2)服務端處理服務下線請求的源碼
(1)客戶端發出服務下線請求的源碼
Nacos客戶端發起服務下線的入口在AbstractAutoServiceRegistration這個類之中,而AbstractAutoServiceRegistration是nacos-discovery中的類。
由于AbstractAutoServiceRegistration的destroy()方法被@PreDestroy修飾,所以當容器關閉時,會調用AbstractAutoServiceRegistration的destroy()方法。該方法最后會觸發調用NacosNamingService的deregisterInstance()方法,然后調用NamingClientProxyDelegate的deregisterService()方法,接著調用NamingGrpcClientProxy的deregisterService()方法。和客戶端發起服務注冊一樣,首先會創建請求參數對象,然后通過NamingGrpcClientProxy的requestToServer()方法發起請求,也就是調用RpcClient的request()方法發起gRPC請求進行服務下線。
public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {...@PreDestroypublic void destroy() {stop();}public void stop() {if (this.getRunning().compareAndSet(true, false) && isEnabled()) {deregister();if (shouldRegisterManagement()) {deregisterManagement();}this.serviceRegistry.close();}}protected void deregister() {this.serviceRegistry.deregister(getRegistration());}...
}public class NacosServiceRegistry implements ServiceRegistry<Registration> {...@Overridepublic void deregister(Registration registration) {log.info("De-registering from Nacos Server now...");if (StringUtils.isEmpty(registration.getServiceId())) {log.warn("No dom to de-register for nacos client...");return;}NamingService namingService = namingService();String serviceId = registration.getServiceId();String group = nacosDiscoveryProperties.getGroup();try {//調用NacosNamingService.deregisterInstance()方法namingService.deregisterInstance(serviceId, group, registration.getHost(), registration.getPort(), nacosDiscoveryProperties.getClusterName());} catch (Exception e) {log.error("ERR_NACOS_DEREGISTER, de-register failed...{},", registration.toString(), e);}log.info("De-registration finished.");}...
}@SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule")
public class NacosNamingService implements NamingService {private NamingClientProxy clientProxy;...public NacosNamingService(Properties properties) throws NacosException {init(properties);}private void init(Properties properties) throws NacosException {...this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);}@Overridepublic void deregisterInstance(String serviceName, String groupName, String ip, int port, String clusterName) throws NacosException {Instance instance = new Instance();instance.setIp(ip);instance.setPort(port);instance.setClusterName(clusterName);deregisterInstance(serviceName, groupName, instance);}@Overridepublic void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {//調用NamingClientProxyDelegate.deregisterService()方法clientProxy.deregisterService(serviceName, groupName, instance);}...
}public class NamingClientProxyDelegate implements NamingClientProxy {private final NamingHttpClientProxy httpClientProxy;private final NamingGrpcClientProxy grpcClientProxy;...public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties, InstancesChangeNotifier changeNotifier) throws NacosException {...this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);}@Overridepublic void deregisterService(String serviceName, String groupName, Instance instance) throws NacosException {getExecuteClientProxy(instance).deregisterService(serviceName, groupName, instance);}private NamingClientProxy getExecuteClientProxy(Instance instance) {return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;}...
}public class NamingGrpcClientProxy extends AbstractNamingClientProxy {private final NamingGrpcRedoService redoService;...@Overridepublic void deregisterService(String serviceName, String groupName, Instance instance) throws NacosException {NAMING_LOGGER.info("[DEREGISTER-SERVICE] {} deregistering service {} with instance: {}", namespaceId, serviceName, instance);redoService.instanceDeregister(serviceName, groupName);doDeregisterService(serviceName, groupName, instance);}//Execute deregister operation.public void doDeregisterService(String serviceName, String groupName, Instance instance) throws NacosException {//創建請求參數對象 InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName, NamingRemoteConstants.DE_REGISTER_INSTANCE, instance);//向服務端發起請求requestToServer(request, Response.class);redoService.removeInstanceForRedo(serviceName, groupName);}private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass) throws NacosException {try {request.putAllHeader(getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));//實際會調用RpcClient.request()方法發起gRPC請求Response response = requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {throw new NacosException(response.getErrorCode(), response.getMessage());}if (responseClass.isAssignableFrom(response.getClass())) {return (T) response;}NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'", response.getClass().getName(), responseClass.getName());} catch (Exception e) {throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);}throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");}...
}
(2)服務端處理服務下線請求的源碼
服務注冊時請求參數InstanceRequest的類型是REGISTER_INSTANCE,服務下線時請求參數InstanceRequest的類型是DE_REGISTER_INSTANCE。
處理服務注冊和服務下線的入口是InstanceRequestHandler的handle()方法,這個方法會觸發調用InstanceRequestHandler的deregisterInstance()方法,也就是調用EphemeralClientOperationServiceImpl的deregisterInstance()方法。
在EphemeralClientOperationServiceImpl的deregisterInstance()方法中,會在移除Client對象中的instance信息時,發布ClientChangedEvent事件,然后接著發布客戶端注銷服務實例的事件ClientDeregisterServiceEvent。
其中ClientChangedEvent事件是用來同步數據給集群節點的,ClientDeregisterServiceEvent事件是用來移除注冊表 + 訂閱表的服務實例。移除注冊表 + 訂閱表的服務實例時,還會發布ServiceChangeEvent事件,ServiceChangeEvent事件是用來通知訂閱了該服務的Nacos客戶端的。同理,服務注冊時其實也會發布類似的三個事件。
@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {private final EphemeralClientOperationServiceImpl clientOperationService;public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {this.clientOperationService = clientOperationService;}@Override@Secured(action = ActionTypes.WRITE)public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {//根據請求信息創建一個Service對象,里面包含了:命名空間、分組名、服務名Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);switch (request.getType()) {case NamingRemoteConstants.REGISTER_INSTANCE://注冊實例return registerInstance(service, request, meta);case NamingRemoteConstants.DE_REGISTER_INSTANCE://注銷實例return deregisterInstance(service, request, meta);default:throw new NacosException(NacosException.INVALID_PARAM, String.format("Unsupported request type %s", request.getType()));}}private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {//調用EphemeralClientOperationServiceImpl的注冊方法registerInstance(),這里需要注意如下參數;//參數service:根據請求信息創建的一個Service對象,里面有命名空間、分組名、服務名//參數request.getInstance():這個參數就對應了客戶端的實例對象,里面包含IP、端口等信息//參數meta.getConnectionId():這個參數很關鍵,它是連接IDclientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);}private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {//調用EphemeralClientOperationServiceImpl的注銷方法deregisterInstance()clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);}
}@Component("ephemeralClientOperationService")
public class EphemeralClientOperationServiceImpl implements ClientOperationService {private final ClientManager clientManager;...@Overridepublic void deregisterInstance(Service service, Instance instance, String clientId) {if (!ServiceManager.getInstance().containSingleton(service)) {Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", service);return;}//從ServiceManager中根據由請求信息創建的Service對象獲取一個已注冊的Service對象Service singleton = ServiceManager.getInstance().getSingleton(service);//從ClientManagerDelegate中根據請求參數中的connectionId獲取一個Client對象,即IpPortBasedClient對象Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}//調用AbstractClient.removeServiceInstance()方法//移除Client對象中的instance信息并發布ClientChangedEvent事件來同步集群節點InstancePublishInfo removedInstance = client.removeServiceInstance(singleton);client.setLastUpdatedTime();if (null != removedInstance) {//發布客戶端注銷服務實例的事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(singleton, clientId));NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, removedInstance.getMetadataId(), true));}}...
}public abstract class AbstractClient implements Client {//publishers其實就是記錄該客戶端提供的服務和服務實例,一個客戶端可提供多個服務//存儲客戶端發送過來的請求中的Instance信息,當然這些信息已封裝為InstancePublishInfo對象//key為已注冊的Service,value是根據請求中的instance實例信息封裝的InstancePublishInfo對象protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);...@Overridepublic InstancePublishInfo removeServiceInstance(Service service) {InstancePublishInfo result = publishers.remove(service);if (null != result) {MetricsMonitor.decrementInstanceCount();//發布客戶端改變事件,用于處理集群間的數據同步NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));}Loggers.SRV_LOG.info("Client remove for service {}, {}", service, getClientId());return result;}...
}