Nacos源碼—7.Nacos升級gRPC分析四

大綱

5.服務變動時如何通知訂閱的客戶端

6.微服務實例信息如何同步集群節點

6.微服務實例信息如何同步集群節點

(1)服務端處理服務注冊時會發布一個ClientChangedEvent事件

(2)ClientChangedEvent事件的處理源碼

(3)集群節點處理數據同步請求的源碼

(1)服務端處理服務注冊時會發布一個ClientChangedEvent事件

ClientChangedEvent事件的作用就是向集群節點同步服務實例數據的。

//Instance request handler.
@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {private final EphemeralClientOperationServiceImpl clientOperationService;public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {this.clientOperationService = clientOperationService;}@Override@Secured(action = ActionTypes.WRITE)public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {//根據請求信息創建一個Service對象,里面包含了:命名空間、分組名、服務名Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);switch (request.getType()) {case NamingRemoteConstants.REGISTER_INSTANCE://注冊實例return registerInstance(service, request, meta);case NamingRemoteConstants.DE_REGISTER_INSTANCE://注銷實例return deregisterInstance(service, request, meta);default:throw new NacosException(NacosException.INVALID_PARAM, String.format("Unsupported request type %s", request.getType()));}}private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {//調用EphemeralClientOperationServiceImpl的注冊方法registerInstance(),這里需要注意如下參數;//參數service:根據請求信息創建的一個Service對象,里面有命名空間、分組名、服務名//參數request.getInstance():這個參數就對應了客戶端的實例對象,里面包含IP、端口等信息//參數meta.getConnectionId():這個參數很關鍵,它是連接IDclientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);}...
}@Component("ephemeralClientOperationService")
public class EphemeralClientOperationServiceImpl implements ClientOperationService {private final ClientManager clientManager;public EphemeralClientOperationServiceImpl(ClientManagerDelegate clientManager) {this.clientManager = clientManager;}@Overridepublic void registerInstance(Service service, Instance instance, String clientId) {//從ServiceManager中根據由請求信息創建的Service對象獲取一個已注冊的Service對象Service singleton = ServiceManager.getInstance().getSingleton(service);if (!singleton.isEphemeral()) {throw new NacosRuntimeException(NacosException.INVALID_PARAM, String.format("Current service %s is persistent service, can't register ephemeral instance.", singleton.getGroupedServiceName()));}//從ClientManagerDelegate中根據請求參數中的connectionId獲取一個Client對象,即IpPortBasedClient對象Client client = clientManager.getClient(clientId);if (!clientIsLegal(client, clientId)) {return;}//將請求中的instance實例信息封裝為InstancePublishInfo對象InstancePublishInfo instanceInfo = getPublishInfo(instance);//往Client對象里添加已注冊的服務對象Service,調用的是IpPortBasedClient對象的父類AbstractClient的addServiceInstance()方法client.addServiceInstance(singleton, instanceInfo);//設置IpPortBasedClient對象的lastUpdatedTime屬性為最新時間client.setLastUpdatedTime();//發布客戶端注冊服務實例的事件NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));//發布服務實例元數據的事件NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));}...
}//Nacos naming client based ip and port.
//The client is bind to the ip and port users registered. It's a abstract content to simulate the tcp session client.
public class IpPortBasedClient extends AbstractClient {...@Overridepublic boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {return super.addServiceInstance(service, parseToHealthCheckInstance(instancePublishInfo));}...
}//Abstract implementation of {@code Client}.
public abstract class AbstractClient implements Client {//publishers其實就是記錄該客戶端提供的服務和服務實例,一個客戶端可提供多個服務//存儲客戶端發送過來的請求中的Instance信息,當然這些信息已封裝為InstancePublishInfo對象//key為已注冊的Service,value是根據請求中的instance實例信息封裝的InstancePublishInfo對象protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);//subscribers存放著:訂閱者Subscriber(其實可理解為當前客戶端)訂閱了的Service服務//subscribers的key=stock-service(要訂閱的某個服務)、value=order-service(訂閱者,某個具體的包含IP的服務實例)protected final ConcurrentHashMap<Service, Subscriber> subscribers = new ConcurrentHashMap<>(16, 0.75f, 1);...@Overridepublic boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {//服務注冊時,如果是第一次put進去Service對象,會返回nullif (null == publishers.put(service, instancePublishInfo)) {//監視器記錄MetricsMonitor.incrementInstanceCount();}//發布客戶端改變事件,用于處理集群間的數據同步NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());return true;}...
}

(2)ClientChangedEvent事件的處理源碼

DistroClientDataProcessor的onEvent()方法會響應ClientChangedEvent。該方法如果判斷出事件類型為ClientChangedEvent事件,那么就會執行DistroClientDataProcessor的syncToAllServer()方法,然后調用DistroProtocol的sync()方法進行集群節點同步處理。

DistroProtocol的sync()方法會遍歷集群中除自身節點外的其他節點,然后對遍歷到的每個節點執行DistroProtocol的syncToTarget()方法。

在DistroProtocol的syncToTarget()方法中,首先把要同步的集群節點targetServer包裝成DistroKey對象,然后根據DistroKey對象創建DistroDelayTask延遲任務,接著調用NacosDelayTaskExecuteEngine的addTask()方法,往延遲任務執行引擎的tasks中添加任務。

NacosDelayTaskExecuteEngine在初始化時會啟動一個定時任務,這個定時任務會定時執行ProcessRunnable的run()方法。而ProcessRunnable的run()方法會不斷從任務池tasks中取出延遲任務處理,處理DistroDelayTask任務時會調用DistroDelayTaskProcessor的process()方法。

在執行DistroDelayTaskProcessor的process()方法時,會先根據DistroDelayTask任務封裝一個DistroSyncChangeTask任務,然后調用NacosExecuteTaskExecuteEngine的addTask()方法。也就是調用TaskExecuteWorker的process()方法,將DistroSyncChangeTask任務添加到TaskExecuteWorker的阻塞隊列中,同時創建TaskExecuteWorker時會啟動線程不斷從隊列中取出任務處理。因此最終會執行DistroSyncChangeTask的run()方法。

public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {private final ClientManager clientManager;private final DistroProtocol distroProtocol;...@Overridepublic void onEvent(Event event) {...if (event instanceof ClientEvent.ClientVerifyFailedEvent) {syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {syncToAllServer((ClientEvent) event);}}private void syncToAllServer(ClientEvent event) {Client client = event.getClient();//Only ephemeral data sync by Distro, persist client should sync by raft.//臨時實例使用Distro協議,持久化實例使用Raft協議//ClientManager.isResponsibleClient()方法,判斷只有該client的責任節點才能進行集群數據同步if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {//如果event是客戶端注銷實例時需要進行集群節點同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {//如果event是客戶端注冊實例時需要進行集群節點同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}}...
}@Component
public class DistroProtocol {private final ServerMemberManager memberManager;private final DistroTaskEngineHolder distroTaskEngineHolder;...//Start to sync by configured delay.public void sync(DistroKey distroKey, DataOperation action) {sync(distroKey, action, DistroConfig.getInstance().getSyncDelayMillis());}//Start to sync data to all remote server.public void sync(DistroKey distroKey, DataOperation action, long delay) {//遍歷集群中除自身節點外的其他節點for (Member each : memberManager.allMembersWithoutSelf()) {syncToTarget(distroKey, action, each.getAddress(), delay);}}//Start to sync to target server.public void syncToTarget(DistroKey distroKey, DataOperation action, String targetServer, long delay) {//先把要同步的集群節點targetServer包裝成DistroKey對象DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), targetServer);//然后根據DistroKey對象創建DistroDelayTask任務DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);//接著調用NacosDelayTaskExecuteEngine.addTask()方法//往延遲任務執行引擎DistroDelayTaskExecuteEngine中添加延遲任務DistroDelayTaskdistroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, targetServer);}}...
}//延遲任務執行引擎
public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {private final ScheduledExecutorService processingExecutor;protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;//任務池public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {super(logger);tasks = new ConcurrentHashMap<>(initCapacity);processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));//開啟定時任務,即啟動ProcessRunnable線程任務processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);}...@Overridepublic void addTask(Object key, AbstractDelayTask newTask) {lock.lock();try {AbstractDelayTask existTask = tasks.get(key);if (null != existTask) {newTask.merge(existTask);}//最后放入到任務池中tasks.put(key, newTask);} finally {lock.unlock();}}protected void processTasks() {//獲取tasks中所有的任務,然后進行遍歷Collection<Object> keys = getAllTaskKeys();for (Object taskKey : keys) {//通過任務key,獲取具體的任務,并且從任務池中移除掉AbstractDelayTask task = removeTask(taskKey);if (null == task) {continue;}//通過任務key獲取對應的NacosTaskProcessor延遲任務處理器NacosTaskProcessor processor = getProcessor(taskKey);if (null == processor) {getEngineLog().error("processor not found for task, so discarded. " + task);continue;}try {//ReAdd task if process failed//調用獲取到的NacosTaskProcessor延遲任務處理器的process()方法if (!processor.process(task)) {//如果失敗了,會重試添加task回tasks這個map中retryFailedTask(taskKey, task);}} catch (Throwable e) {getEngineLog().error("Nacos task execute error ", e);retryFailedTask(taskKey, task);}}}...private class ProcessRunnable implements Runnable {@Overridepublic void run() {try {processTasks();} catch (Throwable e) {getEngineLog().error(e.toString(), e);}}}
}//Distro delay task processor.
public class DistroDelayTaskProcessor implements NacosTaskProcessor {...@Overridepublic boolean process(NacosTask task) {if (!(task instanceof DistroDelayTask)) {return true;}DistroDelayTask distroDelayTask = (DistroDelayTask) task;DistroKey distroKey = distroDelayTask.getDistroKey();switch (distroDelayTask.getAction()) {case DELETE://處理客戶端注銷實例時的延遲任務(同步數據到集群節點)//根據DistroDelayTask任務封裝一個DistroSyncTask任務DistroSyncDeleteTask syncDeleteTask = new DistroSyncDeleteTask(distroKey, distroComponentHolder);//調用NacosExecuteTaskExecuteEngine.addTask()方法distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncDeleteTask);return true;case CHANGE:case ADD://處理客戶端注冊實例時的延遲任務(同步數據到集群節點)//根據DistroDelayTask任務封裝一個DistroSyncChangeTask任務DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder);//調用NacosExecuteTaskExecuteEngine.addTask()方法distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask);return true;default:return false;}}
}//任務執行引擎
public class NacosExecuteTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractExecuteTask> {private final TaskExecuteWorker[] executeWorkers;public NacosExecuteTaskExecuteEngine(String name, Logger logger, int dispatchWorkerCount) {super(logger);executeWorkers = new TaskExecuteWorker[dispatchWorkerCount];for (int mod = 0; mod < dispatchWorkerCount; ++mod) {executeWorkers[mod] = new TaskExecuteWorker(name, mod, dispatchWorkerCount, getEngineLog());}}...@Overridepublic void addTask(Object tag, AbstractExecuteTask task) {//根據tag獲取到TaskExecuteWorkerNacosTaskProcessor processor = getProcessor(tag);if (null != processor) {processor.process(task);return;}TaskExecuteWorker worker = getWorker(tag);//調用TaskExecuteWorker.process()方法把AbstractExecuteTask任務放入到隊列當中去worker.process(task);}private TaskExecuteWorker getWorker(Object tag) {int idx = (tag.hashCode() & Integer.MAX_VALUE) % workersCount();return executeWorkers[idx];}    ...
}public final class TaskExecuteWorker implements NacosTaskProcessor, Closeable {private final BlockingQueue<Runnable> queue;//任務存儲容器public TaskExecuteWorker(final String name, final int mod, final int total, final Logger logger) {...this.queue = new ArrayBlockingQueue<Runnable>(QUEUE_CAPACITY);new InnerWorker(name).start();}@Overridepublic boolean process(NacosTask task) {if (task instanceof AbstractExecuteTask) {//把NacosTask任務放入到阻塞隊列中putTask((Runnable) task);}return true;}private void putTask(Runnable task) {try {//把NacosTask任務放入到阻塞隊列中queue.put(task);} catch (InterruptedException ire) {log.error(ire.toString(), ire);}}...private class InnerWorker extends Thread {InnerWorker(String name) {setDaemon(false);setName(name);}@Overridepublic void run() {while (!closed.get()) {try {//一直取阻塞隊列中的任務Runnable task = queue.take();long begin = System.currentTimeMillis();//調用NacosTask中的run方法task.run();long duration = System.currentTimeMillis() - begin;if (duration > 1000L) {log.warn("task {} takes {}ms", task, duration);}} catch (Throwable e) {log.error("[TASK-FAILED] " + e.toString(), e);}}}}
}

執行DistroSyncChangeTask的run()方法,其實就是執行AbstractDistroExecuteTask的run()方法。AbstractDistroExecuteTask的run()方法會先獲取請求數據,然后調用DistroClientTransportAgent的syncData()方法同步集群節點,也就是調用ClusterRpcClientProxy的sendRequest()方法發送數據同步請求,最終會調用RpcClient的request()方法 -> GrpcConnection的request()方法。

public class DistroSyncChangeTask extends AbstractDistroExecuteTask {...@Overrideprotected void doExecuteWithCallback(DistroCallback callback) {String type = getDistroKey().getResourceType();//獲取請求數據DistroData distroData = getDistroData(type);if (null == distroData) {Loggers.DISTRO.warn("[DISTRO] {} with null data to sync, skip", toString());return;}//默認調用DistroClientTransportAgent.syncData()方法同步集群節點getDistroComponentHolder().findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer(), callback);}private DistroData getDistroData(String type) {DistroData result = getDistroComponentHolder().findDataStorage(type).getDistroData(getDistroKey());if (null != result) {result.setType(OPERATION);}return result;}...
}public abstract class AbstractDistroExecuteTask extends AbstractExecuteTask {...@Overridepublic void run() {//Nacos:Naming:v2:ClientDataString type = getDistroKey().getResourceType();//獲取DistroClientTransportAgent對象DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(type);if (null == transportAgent) {Loggers.DISTRO.warn("No found transport agent for type [{}]", type);return;}Loggers.DISTRO.info("[DISTRO-START] {}", toString());//默認返回trueif (transportAgent.supportCallbackTransport()) {//默認執行子類的doExecuteWithCallback()方法doExecuteWithCallback(new DistroExecuteCallback());} else {executeDistroTask();}}protected abstract void doExecuteWithCallback(DistroCallback callback);...
}public class DistroClientTransportAgent implements DistroTransportAgent {private final ClusterRpcClientProxy clusterRpcClientProxy;private final ServerMemberManager memberManager;...@Overridepublic boolean syncData(DistroData data, String targetServer) {if (isNoExistTarget(targetServer)) {return true;}//創建請求對象DistroDataRequest request = new DistroDataRequest(data, data.getType());//找到集群節點Member member = memberManager.find(targetServer);if (checkTargetServerStatusUnhealthy(member)) {Loggers.DISTRO.warn("[DISTRO] Cancel distro sync caused by target server {} unhealthy", targetServer);return false;}try {//向集群節點發送RPC異步請求Response response = clusterRpcClientProxy.sendRequest(member, request);return checkResponse(response);} catch (NacosException e) {Loggers.DISTRO.error("[DISTRO-FAILED] Sync distro data failed! ", e);}return false;}...
}@Service
public class ClusterRpcClientProxy extends MemberChangeListener {...//send request to member.public Response sendRequest(Member member, Request request, long timeoutMills) throws NacosException {RpcClient client = RpcClientFactory.getClient(memberClientKey(member));if (client != null) {//調用RpcClient.request()方法return client.request(request, timeoutMills);} else {throw new NacosException(CLIENT_INVALID_PARAM, "No rpc client related to member: " + member);}}...
}public abstract class RpcClient implements Closeable {//在NamingGrpcClientProxy初始化 -> 調用RpcClient.start()方法時,//會將GrpcClient.connectToServer()方法的返回值賦值給currentConnection屬性protected volatile Connection currentConnection;...//send request.public Response request(Request request, long timeoutMills) throws NacosException {int retryTimes = 0;Response response;Exception exceptionThrow = null;long start = System.currentTimeMillis();while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {...//發起gRPC請求,調用GrpcConnection.request()方法response = this.currentConnection.request(request, timeoutMills);...}...}...
}

(3)集群節點處理數據同步請求的源碼

通過DistroClientTransportAgent的syncData()方法發送的數據同步請求,會被DistroDataRequestHandler的handle()方法處理。然后會調用DistroDataRequestHandler的handleSyncData()方法,接著調用DistroProtocol的onReceive()方法,于是最終會調用到DistroClientDataProcessor.processData()方法。

在執行DistroClientDataProcessor的processData()方法時,如果是同步服務實例新增、修改后的數據,則執行DistroClientDataProcessor的handlerClientSyncData()方法。該方法會和處理服務注冊時一樣,發布一個客戶端注冊服務實例的事件。如果是同步服務實例刪除后的數據,則調用EphemeralIpPortClientManager的clientDisconnected()方法。首先移除客戶端對象信息,然后發布一個客戶端注銷服務實例的事件。

其中客戶端注銷服務實例的事件ClientDisconnectEvent,首先會被ClientServiceIndexesManager的onEvent()方法進行處理,處理時會調用ClientServiceIndexesManager的handleClientDisconnect()方法,移除ClientServiceIndexesManager訂閱者列表的元素和注冊表的元素。然后會被DistroClientDataProcessor的onEvent()方法進行處理,進行集群節點之間的數據同步。

@Component
public class DistroDataRequestHandler extends RequestHandler<DistroDataRequest, DistroDataResponse> {private final DistroProtocol distroProtocol;...@Overridepublic DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {try {switch (request.getDataOperation()) {case VERIFY:return handleVerify(request.getDistroData(), meta);case SNAPSHOT:return handleSnapshot();case ADD:case CHANGE:case DELETE://服務實例新增、修改、刪除的同步,都會由DistroDataRequestHandler.handleSyncData()方法處理return handleSyncData(request.getDistroData());case QUERY:return handleQueryData(request.getDistroData());default:return new DistroDataResponse();}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);DistroDataResponse result = new DistroDataResponse();result.setErrorCode(ResponseCode.FAIL.getCode());result.setMessage("handle distro request with exception");return result;}}private DistroDataResponse handleSyncData(DistroData distroData) {DistroDataResponse result = new DistroDataResponse();//調用DistroProtocol.onReceive()方法if (!distroProtocol.onReceive(distroData)) {result.setErrorCode(ResponseCode.FAIL.getCode());result.setMessage("[DISTRO-FAILED] distro data handle failed");}return result;}...
}@Component
public class DistroProtocol {...//Receive synced distro data, find processor to process.public boolean onReceive(DistroData distroData) {Loggers.DISTRO.info("[DISTRO] Receive distro data type: {}, key: {}", distroData.getType(), distroData.getDistroKey());//Nacos:Naming:v2:ClientDataString resourceType = distroData.getDistroKey().getResourceType();//獲取DistroClientDataProcessor處理對象DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);if (null == dataProcessor) {Loggers.DISTRO.warn("[DISTRO] Can't find data process for received data {}", resourceType);return false;}//調用DistroClientDataProcessor.processData()方法return dataProcessor.processData(distroData);}...
}public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {...@Overridepublic boolean processData(DistroData distroData) {switch (distroData.getType()) {case ADD:case CHANGE://服務實例添加和改變時的執行邏輯ClientSyncData clientSyncData = ApplicationUtils.getBean(Serializer.class).deserialize(distroData.getContent(), ClientSyncData.class);handlerClientSyncData(clientSyncData);return true;case DELETE://服務實例刪除時的執行邏輯String deleteClientId = distroData.getDistroKey().getResourceKey();Loggers.DISTRO.info("[Client-Delete] Received distro client sync data {}", deleteClientId);//調用EphemeralIpPortClientManager.clientDisconnected()方法clientManager.clientDisconnected(deleteClientId);return true;default:return false;}}private void handlerClientSyncData(ClientSyncData clientSyncData) {Loggers.DISTRO.info("[Client-Add] Received distro client sync data {}", clientSyncData.getClientId());clientManager.syncClientConnected(clientSyncData.getClientId(), clientSyncData.getAttributes());Client client = clientManager.getClient(clientSyncData.getClientId());upgradeClient(client, clientSyncData);}private void upgradeClient(Client client, ClientSyncData clientSyncData) {List<String> namespaces = clientSyncData.getNamespaces();List<String> groupNames = clientSyncData.getGroupNames();List<String> serviceNames = clientSyncData.getServiceNames();List<InstancePublishInfo> instances = clientSyncData.getInstancePublishInfos();Set<Service> syncedService = new HashSet<>();for (int i = 0; i < namespaces.size(); i++) {Service service = Service.newService(namespaces.get(i), groupNames.get(i), serviceNames.get(i));Service singleton = ServiceManager.getInstance().getSingleton(service);syncedService.add(singleton);InstancePublishInfo instancePublishInfo = instances.get(i);//如果和當前不一樣才發布事件if (!instancePublishInfo.equals(client.getInstancePublishInfo(singleton))) {client.addServiceInstance(singleton, instancePublishInfo);//發布客戶端注冊服務實例的事件,與客戶端進行服務注冊時一樣NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, client.getClientId()));}}for (Service each : client.getAllPublishedService()) {if (!syncedService.contains(each)) {client.removeServiceInstance(each);NotifyCenter.publishEvent(new ClientOperationEvent.ClientDeregisterServiceEvent(each, client.getClientId()));}}}...
}@Component("ephemeralIpPortClientManager")
public class EphemeralIpPortClientManager implements ClientManager {//key是請求參數中的connectionId即clientId,value是一個繼承了實現Client接口的AbstractClient的IpPortBasedClient對象private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();...@Overridepublic boolean clientDisconnected(String clientId) {Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);//移除客戶端信息IpPortBasedClient client = clients.remove(clientId);if (null == client) {return true;}//發布客戶端注銷服務實例的事件NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));client.release();return true;}...
}@Component
public class ClientServiceIndexesManager extends SmartSubscriber {//注冊表(服務提供者),一個Service服務對象,對應多個服務實例的clientIdprivate final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();//訂閱者列表(服務消費者),一個Service服務對象,對應多個訂閱者的clientIdprivate final ConcurrentMap<Service, Set<String>> subscriberIndexes = new ConcurrentHashMap<>();...@Overridepublic void onEvent(Event event) {if (event instanceof ClientEvent.ClientDisconnectEvent) {handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);} else if (event instanceof ClientOperationEvent) {handleClientOperation((ClientOperationEvent) event);}}private void handleClientDisconnect(ClientEvent.ClientDisconnectEvent event) {Client client = event.getClient();for (Service each : client.getAllSubscribeService()) {//移除訂閱者列表的元素removeSubscriberIndexes(each, client.getClientId());}for (Service each : client.getAllPublishedService()) {//移除注冊表的元素removePublisherIndexes(each, client.getClientId());}}...
}public class DistroClientDataProcessor extends SmartSubscriber implements DistroDataStorage, DistroDataProcessor {...@Overridepublic void onEvent(Event event) {...if (event instanceof ClientEvent.ClientVerifyFailedEvent) {syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {syncToAllServer((ClientEvent) event);}}private void syncToAllServer(ClientEvent event) {Client client = event.getClient();//Only ephemeral data sync by Distro, persist client should sync by raft.//臨時實例使用Distro協議,持久化實例使用Raft協議//ClientManager.isResponsibleClient()方法,判斷只有該client的責任節點才能進行集群數據同步if (null == client || !client.isEphemeral() || !clientManager.isResponsibleClient(client)) {return;}if (event instanceof ClientEvent.ClientDisconnectEvent) {//如果event是客戶端注銷實例時需要進行集群節點同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.DELETE);} else if (event instanceof ClientEvent.ClientChangedEvent) {//如果event是客戶端注冊實例時需要進行集群節點同步的事件DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);distroProtocol.sync(distroKey, DataOperation.CHANGE);}}...
}

(4)總結

一.執行引擎的總結

延時任務執行引擎的實現原理是引擎有一個Map類型的tasks任務池,這個任務池可以根據key映射對應的任務處理器。引擎會定時從任務池中獲取任務,執行任務處理器的處理方法處理任務。

任務執行引擎的實現原理是會創建多個任務執行Worker,每個任務執行Worker都會有一個阻塞隊列。向任務執行引擎添加任務時會將任務添加到其中一個Woker的阻塞隊列中,Worker在初始化時就會啟動一個線程不斷取出阻塞隊列中的任務來處理。所以任務執行引擎會通過阻塞隊列 + 異步任務的方式來實現。

二.用于向集群節點同步數據的客戶端改變事件的處理流程總結

步驟一:先創建DistroDelayTask延遲任務放入到延遲任務執行引擎的任務池,DistroDelayTask延遲任務會由DistroDelayTaskProcessor處理器處理。

步驟二:DistroDelayTaskProcessor處理時會創建DistroSyncChangeTask任務,然后再將任務分發添加到執行引擎中的任務執行Worker的阻塞隊列中。

步驟三:任務執行Worker會從隊列中獲取并執行DistroSyncChangeTask任務,也就是執行引擎會觸發調用AbstractDistroExecuteTask的run()方法,從而調用DistroSyncChangeTask的doExecuteWithCallback()方法。

步驟四:doExecuteWithCallback()方法會獲取最新的微服務實例列表,然后通過DistroClientTransportAgent的syncData()方法發送數據同步請求。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/82283.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/82283.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/82283.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

《Overlapping Experiment Infrastructure: More, Better, Faster》論文閱讀筆記

文章目錄 1 背景2 三個核心概念3 Launch層&#xff1a;特性發布的專用機制4 流量分發策略和條件篩選4.1 四種流量分發類型4.2 條件篩選機制 5 工具鏈與監控體系6 實驗設計原則7 培訓參考與推薦 1 背景 谷歌&#xff08;Google&#xff09;以數據驅動著稱&#xff0c;幾乎所有可…

國芯思辰| 醫療AED可使用2通道24位模擬前端SC2946(ADS1292)

生物電信號監測技術在醫療健康行業中發展迅速&#xff0c;成為評估人體生理健康狀況的關鍵手段。心電&#xff08;ECG&#xff09;、腦電&#xff08;EEG&#xff09;和肌電&#xff08;EMG&#xff09;等信號&#xff0c;通過精密模擬前端芯片捕捉和處理&#xff0c;對醫療診斷…

數據結構【二叉搜索樹(BST)】

二叉搜索樹 1. 二叉搜索樹的概念2. 二叉搜索樹的性能分析3.二叉搜索樹的插入4. 二叉搜索樹的查找5. 二叉搜索樹的刪除6.二叉搜索樹的實現代碼7. 二叉搜索樹key和key/value使用場景7.1 key搜索場景&#xff1a;7.2 key/value搜索場景&#xff1a; 1. 二叉搜索樹的概念 二叉搜索…

RDMA高性能網絡通信實踐

RDMA高性能網絡通信實踐 一、背景介紹二、方法設計A.實現方案B.關鍵技術點 三、代碼及注釋四、注意事項 一、背景介紹 遠程直接內存訪問&#xff08;RDMA&#xff09;技術通過繞過操作系統內核和CPU直接訪問遠程內存&#xff0c;實現了超低延遲、高吞吐量的網絡通信。該技術廣…

ndarray數組掩碼操作,True和False獲取數據

#數組掩碼的表示方法 def testht05():a np.arange(1,10)mask [True,False,True,True,False,True,False,True,True]print(a[mask]) 另外的用法&#xff1a; #掩碼操作獲取子集 def testht06():a np.arange(1,100)print(a[a%3 0 & (a%7 0)] )b np.array([A,"B&qu…

索引工具explain

EXPLAIN 是 MySQL 中一個非常有用的工具,用于分析查詢的執行計劃。通過 EXPLAIN,你可以了解 MySQL 是如何執行查詢的,包括它如何使用索引、表的掃描方式等。這有助于優化查詢性能。以下是 EXPLAIN 輸出的各個字段的詳細解釋: 基本用法 EXPLAIN SELECT * FROM table_name …

Git回顧

參考視頻:【GeekHour】一小時Git教程 一句話定義&#xff1a;Git是一個免費開源的分布式版本控制系統。 版本控制系統可以分為兩種&#xff0c;1.集中式&#xff08;SVN&#xff0c;CVS&#xff09;&#xff1b;2.分布式&#xff08;git&#xff09; git的工作區域和文件狀態…

python打卡day20

特征降維------特征組合&#xff08;以SVD為例&#xff09; 知識點回顧&#xff1a; 奇異值的應用&#xff1a; 特征降維&#xff1a;對高維數據減小計算量、可視化數據重構&#xff1a;比如重構信號、重構圖像&#xff08;可以實現有損壓縮&#xff0c;k 越小壓縮率越高&#…

GuPPy-v1.2.0安裝與使用-生信工具52

GuPPy&#xff1a;Python中用于光纖光度數據分析的免費開源工具 01 背景 Basecalling 是將原始測序信號轉換為堿基序列的過程&#xff0c;通俗地說&#xff0c;就是“把堿基識別出來”。這一過程在不同代測序技術中各不相同&#xff1a; 一代測序是通過解析峰圖實現&#xff1…

47. 全排列 II

題目 給定一個可包含重復數字的序列 nums &#xff0c;按任意順序 返回所有不重復的全排列。 示例 1&#xff1a; 輸入&#xff1a;nums [1,1,2] 輸出&#xff1a; [[1,1,2],[1,2,1],[2,1,1]] 示例 2&#xff1a; 輸入&#xff1a;nums [1,2,3] 輸出&#xff1a;[[1,2,3…

ERP系統操作流程,如何快速搭建流程體系

ERP流程圖&#xff0c;如何搭建和建立&#xff0c;ERP系統操作流程&#xff0c;ERP系統操作流程圖&#xff0c;采購流程&#xff0c;銷售流程&#xff0c;倉庫流程&#xff0c;MRP流程&#xff0c;PMC流程&#xff0c;財務流程&#xff0c;應收流程&#xff0c;應付流程&#x…

class path resource [] cannot be resolved to absolute file path

問題情景 java應用程序在IDE運行正常&#xff0c;打成jar包后執行卻發生異常&#xff1a; java.io.FileNotFoundException: class path resource [cert/sync_signer_pri_test.key] cannot be resolved to absolute file path because it does not reside in the file system:…

19、HashTable(哈希)、位圖的實現和布隆過濾器的介紹

一、了解哈希【散列表】 1、哈希的結構 在STL中&#xff0c;HashTable是一個重要的底層數據結構, 無序關聯容器包括unordered_set, unordered_map內部都是基于哈希表實現 哈希表又稱散列表&#xff0c;一種以「key-value」形式存儲數據的數據結構。哈希函數&#xff1a;負責將…

基于 Flask的深度學習模型部署服務端詳解

基于 Flask 的深度學習模型部署服務端詳解 在深度學習領域&#xff0c;訓練出一個高精度的模型只是第一步&#xff0c;將其部署到生產環境中&#xff0c;為實際業務提供服務才是最終目標。本文將詳細解析一個基于 Flask 和 PyTorch 的深度學習模型部署服務端代碼&#xff0c;幫…

Vue3 + Node.js 實現客服實時聊天系統(WebSocket + Socket.IO 詳解)

Node.js 實現客服實時聊天系統&#xff08;WebSocket Socket.IO 詳解&#xff09; 一、為什么選擇 WebSocket&#xff1f; 想象一下淘寶客服的聊天窗口&#xff1a;你發消息&#xff0c;客服立刻就能看到并回復。這種即時通訊效果是如何實現的呢&#xff1f;我們使用 Vue3 作…

MySQL數據庫與表結構操作指南

前言&#xff1a;本文系統梳理MySQL核心操作語句。內容覆蓋建庫建表、結構調整、數據遷移全流程&#xff08;包含創建/修改/刪除/備份場景&#xff09;。希望它們能幫你快速解決問題。 庫結構操作 一、庫的創建 一個庫的簡單創建&#xff1a; create database 庫名; 注意&am…

【WEB3】區塊鏈、隱私計算、AI和Web3.0——數據民主化(1)

區塊鏈、隱私計算、AI&#xff0c;是未來Web3.0至關重要的三項技術。 1.數據民主化問題 數據在整個生命周期&#xff08;生產、傳輸、處理、存儲&#xff09;內的隱私安全&#xff0c;則是Web3.0在初始階段首要解決的問題。 數據民主化旨在打破數據壟斷&#xff0c;讓個體能…

C語言—指針2

1. const 修飾變量 1.1 const修飾變量 變量被const修飾時&#xff0c;變量此時為常變量&#xff0c;本質為常量&#xff0c;語法上不可被修改&#xff0c;但是如果此時需要修改變量值&#xff0c;可以通過指針的方式修改。 雖然此時通過指針的方式確實修改了變量的值&#xff…

高級架構軟考之網絡OSI網絡模型

高級架構軟考之網絡&#xff1a; 1.OSI網絡模型&#xff1a; a.物理層&#xff1a; a.物理傳輸介質物理連接&#xff0c;負責數據傳輸&#xff0c;并監控數據 b.傳輸單位&#xff1a;bit c.協議&#xff1a; d:對應設備&#xff1a;中繼器、集線器 b.數據鏈路層&#xff1a; a.…

el-table計算表頭列寬,不換行顯示

1、在utils.js中封裝renderHeader方法 2、在el-table-column中引入&#xff1a; 3、頁面展示&#xff1a;