自動裝配
SpringBoot 自動裝配機制 加載 WEB/INF spring.factories
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
com.alibaba.cloud.nacos.NacosConfigBootstrapConfiguration```java
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)
public class NacosConfigBootstrapConfiguration {......@Bean@ConditionalOnMissingBeanpublic NacosConfigManager nacosConfigManager(NacosConfigProperties nacosConfigProperties) {return new NacosConfigManager(nacosConfigProperties);}......}
創建 ConfigService
構建NacosConfigManagerBean的時候會在實例化的時候調用構造方法 他的構造方法中會創建ConfigService
public NacosConfigManager(NacosConfigProperties nacosConfigProperties) {this.nacosConfigProperties = nacosConfigProperties;// Compatible with older code in NacosConfigProperties,It will be deleted in the// future.createConfigService(nacosConfigProperties);}
static ConfigService createConfigService(NacosConfigProperties nacosConfigProperties) {if (Objects.isNull(service)) {synchronized (NacosConfigManager.class) {try {if (Objects.isNull(service)) {service = NacosFactory.createConfigService(nacosConfigProperties.assembleConfigServiceProperties());}}catch (NacosException e) {log.error(e.getMessage());throw new NacosConnectionFailureException(nacosConfigProperties.getServerAddr(), e.getMessage(), e);}}}return service;}
public static ConfigService createConfigService(Properties properties) throws NacosException {return ConfigFactory.createConfigService(properties);}
public static ConfigService createConfigService(Properties properties) throws NacosException {try {//反射拿到classClass<?> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");// 獲取帶Properties參數的構造函數Constructor constructor = driverImplClass.getConstructor(Properties.class);//通過反射構建實例ConfigService vendorImpl = (ConfigService) constructor.newInstance(properties);return vendorImpl;} catch (Throwable e) {throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);}}
NacosConfigService會在構造方法中 注入Listener接受server配置變更通知。
public NacosConfigService(Properties properties) throws NacosException {ValidatorUtils.checkInitParam(properties);// 設置namespace可以通過properties.setProperty(PropertyKeyConst.NAMESPACE)initNamespace(properties);// 初始化namespace、server地址等信息ServerListManager serverListManager = new ServerListManager(properties);// 啟動主要用于endpoint方式定時獲取server地址,當本地傳入isFixed=trueserverListManager.start();// clientWorker初始化this.worker = new ClientWorker(this.configFilterChainManager, serverListManager, properties);// 將被廢棄HttpAgent,先忽略// will be deleted in 2.0 later versionsagent = new ServerHttpAgent(serverListManager);
}
ClientWorker初始化
public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager,final Properties properties) throws NacosException {this.configFilterChainManager = configFilterChainManager;// 初始化超時時間、重試時間等init(properties);// gRPC config agent初始化agent = new ConfigRpcTransportClient(properties, serverListManager);// 調度線程池,「處理器核數」ScheduledExecutorService executorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.Worker");t.setDaemon(true);return t;}});agent.setExecutor(executorService);// 啟動grpc agentagent.start();}
初始化超時時間等
private void init(Properties properties) {// 超時時間,默認30秒timeout = Math.max(ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT),Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT);// 重試時間,默認2秒taskPenaltyTime = ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_RETRY_TIME), Constants.CONFIG_RETRY_TIME);// 開啟配置刪除同步,默認falsethis.enableRemoteSyncConfig = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.ENABLE_REMOTE_SYNC_CONFIG));
}
GRPCConfigAgent初始化
public ConfigTransportClient(Properties properties, ServerListManager serverListManager) {// 默認編碼UTF-8String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);if (StringUtils.isBlank(encodeTmp)) {this.encode = Constants.ENCODE;} else {this.encode = encodeTmp.trim();}// namespace租戶,默認空this.tenant = properties.getProperty(PropertyKeyConst.NAMESPACE);this.serverListManager = serverListManager;// 用戶名和密碼驗證this.securityProxy = new SecurityProxy(properties,ConfigHttpClientManager.getInstance().getNacosRestTemplate());}
啟動GRPC Config Agent
public void start() throws NacosException {// 簡單用戶名和密碼驗證if (securityProxy.isEnabled()) {securityProxy.login(serverListManager.getServerUrls());this.executor.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {securityProxy.login(serverListManager.getServerUrls());}}, 0, this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);}startInternal();
}
這里線程會一直運行從listenExecutebell這個阻塞隊列中獲取元素
listenExecutebell這里阻塞隊列會在服務變更之后發布變更事件最后會往這個阻塞隊列中塞元素 如果隊列為空等待5秒后執行,如果隊列不為空立即執行
@Overridepublic void startInternal() {executor.schedule(() -> {//線程池沒有管理并且所有線程沒有運行完while (!executor.isShutdown() && !executor.isTerminated()) {try {// 最長等待5秒listenExecutebell.poll(5L, TimeUnit.SECONDS);//如果線程池已經關閉 或者所有線程運行完直接if (executor.isShutdown() || executor.isTerminated()) {continue;}executeConfigListen();} catch (Exception e) {LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);}}}, 0L, TimeUnit.MILLISECONDS);}
注冊Listener
在Spring啟動的時候會在run方法中執行
SpringApplicationRunListeners的running(context)這里面會發送一個ApplicationReadyEvent事件
NacosContextRefresher會監聽到ApplicationReadyEvent事件進行nacos監聽器的注冊
@Overridepublic void onApplicationEvent(ApplicationReadyEvent event) {// many Spring contextif (this.ready.compareAndSet(false, true)) {this.registerNacosListenersForApplications();}}
private void registerNacosListenersForApplications() {......registerNacosListener(propertySource.getGroup(), dataId);......}
private void registerNacosListener(final String groupKey, final String dataKey) {.....//添加監聽器configService.addListener(dataKey, groupKey, listener);......
}
添加監聽器
構建CacheData,并緩存在cacheMap中,key是由「dataId+group+tenant」組成;每個CacheData會綁定了Listener列表,也綁定了taskId,3000個不同的CacheData對應一個taskId,對應一個gRPC通道實例
@Overridepublic void addListener(String dataId, String group, Listener listener) throws NacosException {worker.addTenantListeners(dataId, group, Arrays.asList(listener));}
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)throws NacosException {// 默認DEFAULT_GROUPgroup = blank2defaultGroup(group);//獲取租戶默認是空String tenant = agent.getTenant();//構建緩存數據CacheData并放入cacheMap中CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);synchronized (cache) {for (Listener listener : listeners) {cache.addListener(listener);}// cache md5 data是否來自server同步cache.setSyncWithServer(false);//往阻隊列中添加數據 listenExecutebell.offer(bellItem);agent.notifyListenConfig();}}
往緩存中添加內容
構建緩存數據CacheData并放入cacheMap中,緩存的key為 「dataId+group+tenant」例如:test+DEFAULT_GROUP。每個CacheData會綁定對應的taskId,每3000個CacheData對應一個taskId。其實從后面的代碼中可以看出,每個taskId會對應一個gRPC
public CacheData addCacheDataIfAbsent(String dataId, String group, String tenant) throws NacosException {// 從緩存中獲取 如果不是空的直接返回CacheData cache = getCache(dataId, group, tenant);if (null != cache) {return cache;}// 構造緩存key以+連接,test+DEFAULT_GROUPString key = GroupKey.getKeyTenant(dataId, group, tenant);synchronized (cacheMap) {CacheData cacheFromMap = getCache(dataId, group, tenant);// multiple listeners on the same dataid+group and race condition,so// double check again// other listener thread beat me to set to cacheMapif (null != cacheFromMap) { // 再檢查一遍cache = cacheFromMap;// reset so that server not hang this checkcache.setInitializing(true); // 緩存正在初始化} else {// 構造緩存數據對象cache = new CacheData(configFilterChainManager, agent.getName(), dataId, group, tenant);// 初始值taskId=0,注意此處每3000個CacheData共用一個taskIdint taskId = cacheMap.get().size() / (int) ParamUtil.getPerTaskConfigSize();cache.setTaskId(taskId);// fix issue # 1317 // 默認falseif (enableRemoteSyncConfig) {ConfigResponse response = getServerConfig(dataId, group, tenant, 3000L, false);cache.setContent(response.getContent());}}Map<String, CacheData> copy = new HashMap<>(this.cacheMap.get());// key = test+DEFAULT_GROUPcopy.put(key, cache);// cacheMap = {test+DEFAULT_GROUP=CacheData [test, DEFAULT_GROUP]}cacheMap.set(copy);}LOGGER.info("[{}] [subscribe] {}", agent.getName(), key);MetricsMonitor.getListenConfigCountMonitor().set(cacheMap.get().size());return cache;}
緩存的內容
public class CacheData {//ConfigTransportClient名稱,config_rpc_clientprivate final String name;//filter攔截鏈條,可以執行一些列攔截器private final ConfigFilterChainManager configFilterChainManager;//dataIdpublic final String dataId;//group名稱,默認為DEFAULT_GROUPpublic final String group;//租戶名稱public final String tenant;//添加的Listener列表,線程安全CopyOnWriteArrayListprivate final CopyOnWriteArrayList<ManagerListenerWrap> listeners;//MD5private volatile String md5;//配置內容private volatile String content; }
配置變更
public void executeConfigListen() {Map<String/*taskId*/, List<CacheData>> listenCachesMap = new HashMap<String, List<CacheData>>(16);Map<String, List<CacheData>> removeListenCachesMap = new HashMap<String, List<CacheData>>(16);long now = System.currentTimeMillis();// 超過5分鐘boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;for (CacheData cache : cacheMap.get().values()) {synchronized (cache) {// isSyncWithServer初始為false,在下文代碼中校驗結束后會設置為true,表示md5 cache data同步來自server。如果為true會校驗Md5.if (cache.isSyncWithServer()) { cache.checkListenerMd5(); // 內容有變更通知Listener執行if (!needAllSync) { // 不超過5分鐘則不再全局校驗continue;}}if (!CollectionUtils.isEmpty(cache.getListeners())) { // 有添加Listeners// get listen config 默認 falseif (!cache.isUseLocalConfigInfo()) {List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));if (cacheDatas == null) {cacheDatas = new LinkedList<CacheData>();listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);}// CacheData [test, DEFAULT_GROUP]cacheDatas.add(cache);}} else if (CollectionUtils.isEmpty(cache.getListeners())) { // 沒有添加Listenersif (!cache.isUseLocalConfigInfo()) {List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));if (cacheDatas == null) {cacheDatas = new LinkedList<CacheData>();removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);}cacheDatas.add(cache);}}}}boolean hasChangedKeys = false;if (!listenCachesMap.isEmpty()) { // 有Listenersfor (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {String taskId = entry.getKey();List<CacheData> listenCaches = entry.getValue();ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);configChangeListenRequest.setListen(true);try {// 每個taskId構建rpcClient,例如:taskId= config-0-c70e0314-4770-43f5-add4-f258a4083fd7;結合上下文每3000個CacheData對應一個rpcClientRpcClient rpcClient = ensureRpcClient(taskId);// 向server發起configChangeListenRequest,server端由ConfigChangeBatchListenRequestHandler處理,還是比較md5是否變更了,變更后server端返回變更的key列表。ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(rpcClient, configChangeListenRequest);if (configChangeBatchListenResponse != null && configChangeBatchListenResponse.isSuccess()) {Set<String> changeKeys = new HashSet<String>();// handle changed keys,notify listener// 有變化的configContextif (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) {hasChangedKeys = true;for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse.getChangedConfigs()) {String changeKey = GroupKey.getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(),changeConfig.getTenant());changeKeys.add(changeKey);boolean isInitializing = cacheMap.get().get(changeKey).isInitializing();//當server返回變更key列表時執行refreshContentAndCheck方法。然后回調ListenerrefreshContentAndCheck(changeKey, !isInitializing);}}//handler content configsfor (CacheData cacheData : listenCaches) {String groupKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant());if (!changeKeys.contains(groupKey)) {// key沒有變化的,內容由server同步,設置SyncWithServer=truesynchronized (cacheData) {if (!cacheData.getListeners().isEmpty()) {cacheData.setSyncWithServer(true);continue;}}}cacheData.setInitializing(false);}}} catch (Exception e) {LOGGER.error("Async listen config change error ", e);try {Thread.sleep(50L);} catch (InterruptedException interruptedException) {//ignore}}}}if (!removeListenCachesMap.isEmpty()) {for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) {String taskId = entry.getKey();List<CacheData> removeListenCaches = entry.getValue();ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches);configChangeListenRequest.setListen(false);try {// 向server發送Listener取消訂閱請求ConfigBatchListenRequest#listen為falseRpcClient rpcClient = ensureRpcClient(taskId);boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest);if (removeSuccess) {for (CacheData cacheData : removeListenCaches) {synchronized (cacheData) {if (cacheData.getListeners().isEmpty()) {// 移除本地緩存ClientWorker.this.removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);}}}}} catch (Exception e) {LOGGER.error("async remove listen config change error ", e);}try {Thread.sleep(50L);} catch (InterruptedException interruptedException) {//ignore}}}if (needAllSync) {lastAllSyncTime = now;}//If has changed keys,notify re sync md5.if (hasChangedKeys) { // key有變化觸發下一輪notifyListenConfig();}
}
校驗MD5
當CacheData從server同步后,會校驗md5是否變更了,當變更時會回調到我們注冊的Listener完成通知。通知任務被封裝成Runnable任務,執行線程池可以自定義,默認為5個線程。
void checkListenerMd5() {for (ManagerListenerWrap wrap : listeners) {if (!md5.equals(wrap.lastCallMd5)) { // 配置內容有變更時,回調到]Listener中。safeNotifyListener(dataId, group, content, type, md5, wrap);}}
}
private void safeNotifyListener(final String dataId, final String group, final String content, final String type,final String md5, final ManagerListenerWrap listenerWrap) {final Listener listener = listenerWrap.listener;if (listenerWrap.inNotifying) {// ...return;}Runnable job = new Runnable() {@Overridepublic void run() {long start = System.currentTimeMillis();ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();ClassLoader appClassLoader = listener.getClass().getClassLoader();try {if (listener instanceof AbstractSharedListener) {AbstractSharedListener adapter = (AbstractSharedListener) listener;adapter.fillContext(dataId, group);// ...}Thread.currentThread().setContextClassLoader(appClassLoader);ConfigResponse cr = new ConfigResponse();cr.setDataId(dataId);cr.setGroup(group);cr.setContent(content);// filter攔截繼續過濾configFilterChainManager.doFilter(null, cr);String contentTmp = cr.getContent();listenerWrap.inNotifying = true;// 回調注冊Listener的receiveConfigInfo方法或者receiveConfigChange邏輯listener.receiveConfigInfo(contentTmp);// compare lastContent and contentif (listener instanceof AbstractConfigChangeListener) {Map data = ConfigChangeHandler.getInstance().parseChangeData(listenerWrap.lastContent, content, type);ConfigChangeEvent event = new ConfigChangeEvent(data);// 回調變更事件方法((AbstractConfigChangeListener) listener).receiveConfigChange(event);listenerWrap.lastContent = content;}listenerWrap.lastCallMd5 = md5;// ..} catch (NacosException ex) {// ...} catch (Throwable t) {// ...} finally {listenerWrap.inNotifying = false;Thread.currentThread().setContextClassLoader(myClassLoader);}}};final long startNotify = System.currentTimeMillis();try {// 優先使用我們示例中注冊提供的線程池執行job,如果沒有設置使用默認線程池「INTERNAL_NOTIFIER」,默認5個線程if (null != listener.getExecutor()) {listener.getExecutor().execute(job);} else {try {INTERNAL_NOTIFIER.submit(job); // 默認線程池執行,為5個線程} catch (RejectedExecutionException rejectedExecutionException) {// ...job.run();} catch (Throwable throwable) {// ...job.run();}}} catch (Throwable t) {// ...}final long finishNotify = System.currentTimeMillis();// ...
}
key有變更
注冊Listener后,會構建與server的RPC通道rpcClient;向server發起變更查詢請求configChangeListenRequest,server端通過比較緩存的md5值,返回client變更的key列表;client通過變更的key列表向server發起配置查詢請求ConfigQueryRequest,獲取變更內容,并回調我們注冊的Listener。
private void refreshContentAndCheck(CacheData cacheData, boolean notify) {try {// 向server發起ConfigQueryRequest,查詢配置內容String[] ct = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L, notify);//設置最新的內容信息cacheData.setContent(ct[0]);if (null != ct[1]) {cacheData.setType(ct[1]);}if (notify) { // 記錄日志// ...}// 回調注冊的Listener邏輯cacheData.checkListenerMd5();} catch (Exception e) {//...}
}
總結
客戶端在啟動的時候會構建一個ConfigService的處理類,然后再ConfigService的構造,方法中會創建一個ClientWorker 用來處理對服務端的網絡通信及后續變更處理,
當服務端有配置變更的時候會發送配置變更事件最終會往一個阻塞隊列中取offer數據,然后ClientWorker啟動的時候會構建一個定時線程去從這個阻塞隊列中阻塞拿數據 如果隊列為空等待5秒后執行,如果隊列不為空立即執行 然后會將3000個CacheDate其實就是配置數據組成一個taskId 然后往服務端發送grpc請求,服務端會檢查 md5比較哪些配置發生了變更 然后會返回發生變更的key列表,然后客戶端根據服務端返回的key列表 去服務端拉取最新的配置信息 然后緩存到本地