Nacos源碼—5.Nacos配置中心實現分析二

大綱

1.關于Nacos配置中心的幾個問題

2.Nacos如何整合SpringBoot讀取遠程配置

3.Nacos加載讀取遠程配置數據的源碼分析

4.客戶端如何感知遠程配置數據的變更

5.集群架構下節點間如何同步配置數據

4.客戶端如何感知遠程配置數據的變更

(1)ConfigService對象使用介紹

(2)客戶端注冊監聽器的源碼

(3)回調監聽器的方法的源碼

(1)ConfigService對象使用介紹

ConfigService是一個接口,定義了獲取配置、發布配置、移除配置等方法。ConfigService只有一個實現類NacosConfigService,Nacos配置中心源碼的核心其實就是這個NacosConfigService對象。

步驟一:手動創建ConfigService對象

首先定義好基本的Nacos信息,然后利用NacosFactory工廠類來創建ConfigService對象。

public class Demo {public static void main(String[] args) throws Exception {//步驟一:配置信息String serverAddr = "124.223.102.236:8848";String dataId = "stock-service-test.yaml";String group = "DEFAULT_GROUP";Properties properties = new Properties();properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);//步驟一:獲取配置中心服務ConfigService configService = NacosFactory.createConfigService(properties);}
}

步驟二:獲取配置、發布配置

創建好ConfigService對象后,就可以使用ConfigService對象的getConfig()方法來獲取配置信息,還可以使用ConfigService對象的publishConfig()方法來發布配置信息。

如下Demo先獲取一次配置數據,然后發布新配置,緊接著重新獲取數據。發現第二次獲取的配置數據已發生變化,從而也說明發布配置成功了。

public class Demo {public static void main(String[] args) throws Exception {//步驟一:配置信息String serverAddr = "124.223.102.236:8848";String dataId = "stock-service-test.yaml";String group = "DEFAULT_GROUP";Properties properties = new Properties();properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);//步驟一:獲取配置中心服務ConfigService configService = NacosFactory.createConfigService(properties);//步驟二:從配置中心獲取配置String content = configService.getConfig(dataId, group, 5000);System.out.println("發布配置前" + content);//步驟二:發布配置configService.publishConfig(dataId, group, "userName: userName被修改了", ConfigType.PROPERTIES.getType());Thread.sleep(300L);//步驟二:從配置中心獲取配置content = configService.getConfig(dataId, group, 5000);System.out.println("發布配置后" + content);}
}

步驟三:添加監聽器

可以使用ConfigService對象的addListener()方法來添加監聽器。通過dataId + group這兩個參數,就可以注冊一個監聽器。當dataId + group對應的配置在服務端發生改變時,客戶端的監聽器就可以馬上感知并對配置數據進行刷新。

public class Demo {public static void main(String[] args) throws Exception {//步驟一:配置信息String serverAddr = "124.223.102.236:8848";String dataId = "stock-service-test.yaml";String group = "DEFAULT_GROUP";Properties properties = new Properties();properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);//步驟一:獲取配置中心服務ConfigService configService = NacosFactory.createConfigService(properties);//步驟二:從配置中心獲取配置String content = configService.getConfig(dataId, group, 5000);System.out.println("發布配置前" + content);//步驟二:發布配置configService.publishConfig(dataId, group, "userName: userName被修改了", ConfigType.PROPERTIES.getType());Thread.sleep(300L);//步驟二:從配置中心獲取配置content = configService.getConfig(dataId, group, 5000);System.out.println("發布配置后" + content);//步驟三:注冊監聽器configService.addListener(dataId, group, new Listener() {@Overridepublic void receiveConfigInfo(String configInfo) {System.out.println("感知配置變化:" + configInfo);}@Overridepublic Executor getExecutor() {return null;}});//阻斷進程關閉Thread.sleep(Integer.MAX_VALUE);}
}

(2)客戶端注冊監聽器的源碼

Nacos客戶端是什么時候為dataId + group注冊監聽器的?

在nacos-config下的spring.factories文件中,有一個自動裝配的配置類NacosConfigAutoConfiguration,在該配置類中定義了一個NacosContextRefresher對象,而NacosContextRefresher對象會監聽ApplicationReadyEvent事件。

在NacosContextRefresher的onApplicationEvent()方法中,會執行registerNacosListenersForApplications()方法,這個方法中會遍歷每一個dataId + group注冊Nacos監聽器。

對于每一個dataId + group,則通過調用registerNacosListener()方法來進行Nacos監聽器的注冊,也就是最終調用ConfigService對象的addListener()方法來注冊監聽器。

@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.nacos.config.enabled", matchIfMissing = true)
public class NacosConfigAutoConfiguration {...@Beanpublic NacosContextRefresher nacosContextRefresher(NacosConfigManager nacosConfigManager, NacosRefreshHistory nacosRefreshHistory) {return new NacosContextRefresher(nacosConfigManager, nacosRefreshHistory);}...
}public class NacosContextRefresher implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware {private final ConfigService configService;...@Overridepublic void onApplicationEvent(ApplicationReadyEvent event) {//many Spring contextif (this.ready.compareAndSet(false, true)) {this.registerNacosListenersForApplications();}}//register Nacos Listeners.private void registerNacosListenersForApplications() {if (isRefreshEnabled()) {//獲取全部的配置for (NacosPropertySource propertySource : NacosPropertySourceRepository.getAll()) {//判斷當前配置是否需要刷新if (!propertySource.isRefreshable()) {continue;}String dataId = propertySource.getDataId();//注冊監聽器registerNacosListener(propertySource.getGroup(), dataId);}}}private void registerNacosListener(final String groupKey, final String dataKey) {String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);Listener listener = listenerMap.computeIfAbsent(key, lst -> new AbstractSharedListener() {@Overridepublic void innerReceive(String dataId, String group, String configInfo) {//監聽器的回調方法處理邏輯refreshCountIncrement();//記錄刷新歷史nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);//發布RefreshEvent刷新事件applicationContext.publishEvent(new RefreshEvent(this, null, "Refresh Nacos config"));if (log.isDebugEnabled()) {log.debug(String.format("Refresh Nacos config group=%s,dataId=%s,configInfo=%s", group, dataId, configInfo));}}});try {//注冊監聽器configService.addListener(dataKey, groupKey, listener);} catch (NacosException e) {log.warn(String.format("register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey, groupKey), e);}}...
}

(3)回調監聽器的方法的源碼

給每一個dataId + group注冊Nacos監聽器后,當Nacos服務端的配置文件發生變更時,就會回調監聽器的方法,也就是會觸發調用AbstractSharedListener的innerReceive()方法。然后調用applicationContext.publishEvent()發布RefreshEvent刷新事件,而發布的RefreshEvent刷新事件會被RefreshEventListener類來處理。

RefreshEventListener類不是Nacos中的類了,而是SpringCloud的類。它在處理刷新事件時,會銷毀被@RefreshScope注解修飾的類的Bean,也就是會調用添加了@RefreshScope注解的類的destroy()方法。把Bean實例銷毀后,后面需要用到這個Bean時才重新進行創建。重新進行創建的時候,就會獲取最新的配置文件,從而完成刷新效果。

(4)總結

客戶端注冊Nacos監聽器,服務端修改配置后,客戶端刷新配置的流程:

5.集群架構下節點間如何同步配置數據

(1)Nacos控制臺的配置管理模塊

(2)變更配置數據時的源碼

(3)集群節點間的配置數據變更同步

(4)服務端通知客戶端配置數據已變更

(5)總結

(1)Nacos控制臺的配置管理模塊

在這個模塊中,可以通過配置列表維護我們的配置文件,可以通過歷史版本找到配置的發布記錄,并且支持回滾操作。當編輯配置文件時,客戶端可以及時感知變化并刷新其配置文件。當服務端通知客戶端配置變更時,也會通知集群節點進行數據同步。

當用戶在Nacos控制臺點擊確認發布按鈕時,Nacos會大概進行如下處理:

一.修改配置文件數據

二.保存配置發布歷史

三.通知并觸發客戶端監聽事件進行配置文件變更

四.通知集群對配置文件進行變更

點擊確認發布按鈕時,會發起HTTP請求,地址為"/nacos/v1/cs/configs"。通過請求地址可知處理入口是ConfigController的publishConfig()方法。

(2)變更配置數據時的源碼

ConfigController的publishConfig()方法中的兩行核心代碼是:一.新增或修改配置數據的PersistService的insertOrUpdate()方法,二.發布配置變更事件的ConfigChangePublisher的notifyConfigChange()方法。

一.新增或者修改配置數據

其中PersistService有兩個實現類:一是EmbeddedStoragePersistServiceImpl,它是Nacos內置的Derby數據庫。二是ExternalStoragePersistServiceImpl,它是Nacos外置數據庫如MySQL。

在ExternalStoragePersistServiceImpl的insertOrUpdate()方法中,如果執行ExternalStoragePersistServiceImpl的updateConfigInfo()方法,那么會先查詢對應的配置,然后更新配置,最后保存配置歷史。

@RestController
@RequestMapping(Constants.CONFIG_CONTROLLER_PATH)
public class ConfigController {private final PersistService persistService;...@PostMapping@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,@RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,@RequestParam(value = "appName", required = false) String appName,@RequestParam(value = "src_user", required = false) String srcUser,@RequestParam(value = "config_tags", required = false) String configTags,@RequestParam(value = "desc", required = false) String desc,@RequestParam(value = "use", required = false) String use,@RequestParam(value = "effect", required = false) String effect,@RequestParam(value = "type", required = false) String type,@RequestParam(value = "schema", required = false) String schema) throws NacosException {final String srcIp = RequestUtil.getRemoteIp(request);final String requestIpApp = RequestUtil.getAppName(request);srcUser = RequestUtil.getSrcUserName(request);//check typeif (!ConfigType.isValidType(type)) {type = ConfigType.getDefaultType().getType();}//check tenantParamUtils.checkTenant(tenant);ParamUtils.checkParam(dataId, group, "datumId", content);ParamUtils.checkParam(tag);Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10);MapUtils.putIfValNoNull(configAdvanceInfo, "config_tags", configTags);MapUtils.putIfValNoNull(configAdvanceInfo, "desc", desc);MapUtils.putIfValNoNull(configAdvanceInfo, "use", use);MapUtils.putIfValNoNull(configAdvanceInfo, "effect", effect);MapUtils.putIfValNoNull(configAdvanceInfo, "type", type);MapUtils.putIfValNoNull(configAdvanceInfo, "schema", schema);ParamUtils.checkParam(configAdvanceInfo);if (AggrWhitelist.isAggrDataId(dataId)) {LOGGER.warn("[aggr-conflict] {} attemp to publish single data, {}, {}", RequestUtil.getRemoteIp(request), dataId, group);throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr");}final Timestamp time = TimeUtils.getCurrentTime();String betaIps = request.getHeader("betaIps");ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);configInfo.setType(type);if (StringUtils.isBlank(betaIps)) {if (StringUtils.isBlank(tag)) {//新增配置或者修改配置persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);//發布配置改變事件ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));} else {persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);//發布配置改變事件ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));}} else {//beta publishpersistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, true);//發布配置改變事件ConfigChangePublisher.notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));}ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(), ConfigTraceService.PERSISTENCE_EVENT_PUB, content);return true;}...
}//External Storage Persist Service.
@SuppressWarnings(value = {"PMD.MethodReturnWrapperTypeRule", "checkstyle:linelength"})
@Conditional(value = ConditionOnExternalStorage.class)
@Component
public class ExternalStoragePersistServiceImpl implements PersistService {private DataSourceService dataSourceService;...@Overridepublic void insertOrUpdate(String srcIp, String srcUser, ConfigInfo configInfo, Timestamp time, Map<String, Object> configAdvanceInfo, boolean notify) {try {addConfigInfo(srcIp, srcUser, configInfo, time, configAdvanceInfo, notify);} catch (DataIntegrityViolationException ive) { // Unique constraint conflictupdateConfigInfo(configInfo, srcIp, srcUser, time, configAdvanceInfo, notify);}}@Overridepublic void updateConfigInfo(final ConfigInfo configInfo, final String srcIp, final String srcUser, final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) {boolean result = tjt.execute(status -> {try {//查詢已存在的配置數據ConfigInfo oldConfigInfo = findConfigInfo(configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant());String appNameTmp = oldConfigInfo.getAppName();if (configInfo.getAppName() == null) {configInfo.setAppName(appNameTmp);}//更新配置數據updateConfigInfoAtomic(configInfo, srcIp, srcUser, time, configAdvanceInfo);String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");if (configTags != null) {// delete all tags and then recreateremoveTagByIdAtomic(oldConfigInfo.getId());addConfigTagsRelation(oldConfigInfo.getId(), configTags, configInfo.getDataId(), configInfo.getGroup(), configInfo.getTenant());}//保存到發布配置歷史表insertConfigHistoryAtomic(oldConfigInfo.getId(), oldConfigInfo, srcIp, srcUser, time, "U");} catch (CannotGetJdbcConnectionException e) {LogUtil.FATAL_LOG.error("[db-error] " + e.toString(), e);throw e;}return Boolean.TRUE;});}@Overridepublic ConfigInfo findConfigInfo(final String dataId, final String group, final String tenant) {final String tenantTmp = StringUtils.isBlank(tenant) ? StringUtils.EMPTY : tenant;try {return this.jt.queryForObject("SELECT ID,data_id,group_id,tenant_id,app_name,content,md5,type FROM config_info WHERE data_id=? AND group_id=? AND tenant_id=?", new Object[] {dataId, group, tenantTmp}, CONFIG_INFO_ROW_MAPPER);} catch (EmptyResultDataAccessException e) { // Indicates that the data does not exist, returns null.return null;} catch (CannotGetJdbcConnectionException e) {LogUtil.FATAL_LOG.error("[db-error] " + e.toString(), e);throw e;}}...
}

二.發布配置變更事件

執行ConfigChangePublisher的notifyConfigChange()方法發布配置變更事件時,最終會把事件添加到DefaultPublisher.queue阻塞隊列中,完成事件發布。

NotifyCenter在其靜態方法中,會創建DefaultPublisher并進行初始化。在執行DefaultPublisher的init()方法時,就會開啟一個異步任務。該異步任務便會不斷從阻塞隊列DefaultPublisher.queue中獲取事件,然后調用DefaultPublisher的receiveEvent()方法處理配置變更事件。

在DefaultPublisher的receiveEvent()方法中,會循環遍歷事件訂閱者。其中就會包括來自客戶端,以及來自集群節點的兩個訂閱者。前者會通知客戶端發生了配置變更事件,后者會通知各集群節點發生了配置變更事件。而且進行事件通知時,都會調用DefaultPublisher的notifySubscriber()方法。該方法會異步執行訂閱者的監聽邏輯,也就是subscriber.onEvent()方法。

具體的subscriber訂閱者有:用來通知集群節點進行數據同步的訂閱者AsyncNotifyService,用來通知客戶端處理配置文件變更的訂閱者LongPollingService。

事件發布機制的實現簡單總結:發布者需要一個Set存放注冊的訂閱者,發布者發布事件時,需要遍歷調用訂閱者處理事件的方法。

public class ConfigChangePublisher {//Notify ConfigChange.public static void notifyConfigChange(ConfigDataChangeEvent event) {if (PropertyUtil.isEmbeddedStorage() && !EnvUtil.getStandaloneMode()) {return;}NotifyCenter.publishEvent(event);}
}//Unified Event Notify Center.
public class NotifyCenter {static {...try {// Create and init DefaultSharePublisher instance.INSTANCE.sharePublisher = new DefaultSharePublisher();INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);        } catch (Throwable ex) {LOGGER.error("Service class newInstance has error : {}", ex);}ThreadUtils.addShutdownHook(new Runnable() {@Overridepublic void run() {shutdown();}});}//注冊訂閱者public static <T> void registerSubscriber(final Subscriber consumer) {...addSubscriber(consumer, subscribeType);}private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType) {...EventPublisher publisher = INSTANCE.publisherMap.get(topic);//執行DefaultPublisher.addSubscriber()方法publisher.addSubscriber(consumer);}...//Request publisher publish event Publishers load lazily, calling publisher. Start () only when the event is actually published.public static boolean publishEvent(final Event event) {try {return publishEvent(event.getClass(), event);} catch (Throwable ex) {LOGGER.error("There was an exception to the message publishing : {}", ex);return false;}}//Request publisher publish event Publishers load lazily, calling publisher.private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {return INSTANCE.sharePublisher.publish(event);}final String topic = ClassUtils.getCanonicalName(eventType);EventPublisher publisher = INSTANCE.publisherMap.get(topic);if (publisher != null) {//執行DefaultPublisher.publish()方法return publisher.publish(event);}LOGGER.warn("There are no [{}] publishers for this event, please register", topic);return false;}...
}//The default event publisher implementation.
public class DefaultPublisher extends Thread implements EventPublisher {protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<Subscriber>();private BlockingQueue<Event> queue;...@Overridepublic void addSubscriber(Subscriber subscriber) {//注冊事件訂閱者subscribers.add(subscriber);}@Overridepublic boolean publish(Event event) {checkIsStart();//將事件添加到阻塞隊列,則表示已完成事件發布boolean success = this.queue.offer(event);if (!success) {LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);receiveEvent(event);return true;}return true;}@Overridepublic void init(Class<? extends Event> type, int bufferSize) {setDaemon(true);setName("nacos.publisher-" + type.getName());this.eventType = type;this.queueMaxSize = bufferSize;this.queue = new ArrayBlockingQueue<Event>(bufferSize);start();}@Overridepublic synchronized void start() {if (!initialized) {//執行線程的run()方法,start just called oncesuper.start();if (queueMaxSize == -1) {queueMaxSize = ringBufferSize;}initialized = true;}}@Overridepublic void run() {openEventHandler();}void openEventHandler() {try {//This variable is defined to resolve the problem which message overstock in the queue.int waitTimes = 60;//To ensure that messages are not lost, enable EventHandler when waiting for the first Subscriber to registerfor (; ;) {if (shutdown || hasSubscriber() || waitTimes <= 0) {break;}ThreadUtils.sleep(1000L);waitTimes--;}for (; ;) {if (shutdown) {break;}final Event event = queue.take();receiveEvent(event);UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));}} catch (Throwable ex) {LOGGER.error("Event listener exception : {}", ex);}}//Receive and notifySubscriber to process the event.void receiveEvent(Event event) {final long currentEventSequence = event.sequence();//循環遍歷事件的訂閱者for (Subscriber subscriber : subscribers) {// Whether to ignore expiration eventsif (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire", event.getClass());continue;}//通知事件訂閱者notifySubscriber(subscriber, event);}}@Overridepublic void notifySubscriber(final Subscriber subscriber, final Event event) {LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);final Runnable job = new Runnable() {@Overridepublic void run() {//異步執行訂閱者的監聽邏輯subscriber.onEvent(event);}};final Executor executor = subscriber.executor();if (executor != null) {executor.execute(job);} else {try {job.run();} catch (Throwable e) {LOGGER.error("Event callback exception : {}", e);}}}...
}

(3)集群節點間的配置數據變更同步

核心處理方法便是AsyncNotifyService的onEvent()方法。該方法首先會獲取集群節點列表,然后遍歷集群列表構造通知任務NotifySingleTask,接著把通知任務NotifySingleTask添加到隊列queue當中,最后根據通知任務隊列queue封裝一個異步任務提交到線程池去處理,也就是異步任務AsyncTask的run()方法會處理通知任務NotifySingleTask。

在異步任務AsyncTask的run()方法中,會一直從queue中獲取通知任務,以便將配置數據同步到對應的集群節點。具體就是在while循環中,首先獲得通知任務中對應的集群節點的IP地址。然后判斷該集群節點的IP是否在當前節點的配置中,并且是否是健康狀態。如果該集群節點不健康,則放入隊列并將隊列提交給異步任務來延遲處理。如果該集群節點是健康狀態,則通過HTTP方式發起配置數據的同步,地址是"/v1/cs/communication/dataChange"。

@Service
public class AsyncNotifyService {...@Autowiredpublic AsyncNotifyService(ServerMemberManager memberManager) {this.memberManager = memberManager;//Register ConfigDataChangeEvent to NotifyCenter.NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);//Register A Subscriber to subscribe ConfigDataChangeEvent.NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {//配置中心數據變更,同步其他集群節點數據if (event instanceof ConfigDataChangeEvent) {ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;long dumpTs = evt.lastModifiedTs;String dataId = evt.dataId;String group = evt.group;String tenant = evt.tenant;String tag = evt.tag;//獲取集群節點列表Collection<Member> ipList = memberManager.allMembers();Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();//遍歷集群列表構造通知任務NotifySingleTask去同步數據for (Member member : ipList) {//把通知任務NotifySingleTask添加到隊列queue當中queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(), evt.isBeta));}//根據通知任務隊列Queue<NotifySingleTask>,封裝一個異步任務AsyncTask,提交到線程池執行ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));}}@Overridepublic Class<? extends Event> subscribeType() {return ConfigDataChangeEvent.class;}});}...class AsyncTask implements Runnable {private Queue<NotifySingleTask> queue;private NacosAsyncRestTemplate restTemplate;public AsyncTask(NacosAsyncRestTemplate restTemplate, Queue<NotifySingleTask> queue) {this.restTemplate = restTemplate;this.queue = queue;}@Overridepublic void run() {executeAsyncInvoke();}private void executeAsyncInvoke() {while (!queue.isEmpty()) {//一直從queue隊列中獲取通知任務,以便將配置數據同步到對應的集群節點NotifySingleTask task = queue.poll();//獲取通知任務中對應的集群節點的IP地址String targetIp = task.getTargetIP();if (memberManager.hasMember(targetIp)) {//start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify//判斷該集群節點的ip是否在當前節點的配置中,并且是否是健康狀態boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp);if (unHealthNeedDelay) {//target ip is unhealthy, then put it in the notification list//如果該集群節點不健康,則放入另外一個隊列,同樣會將隊列提交給異步任務,然后延遲處理ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,0, task.target);//get delay time and set fail count to the taskasyncTaskExecute(task);} else {//如果該集群節點是健康狀態,則通過HTTP方式發起配置數據的同步Header header = Header.newInstance();header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());if (task.isBeta) {header.addParam("isBeta", "true");}AuthHeaderUtil.addIdentityToHeader(header);//通過HTTP方式發起配置數據的同步,請求的HTTP地址:/v1/cs/communication/dataChangerestTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));}}}}}private void asyncTaskExecute(NotifySingleTask task) {int delay = getDelayTime(task);Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();queue.add(task);AsyncTask asyncTask = new AsyncTask(nacosAsyncRestTemplate, queue);//提交異步任務給線程池延遲執行ConfigExecutor.scheduleAsyncNotify(asyncTask, delay, TimeUnit.MILLISECONDS);}
}

當集群節點處理"/v1/cs/communication/dataChange"這個HTTP請求時,會調用CommunicationController的notifyConfigInfo()方法,接著調用DumpService的dump()方法將請求包裝成DumpTask同步數據任務,然后調用TaskManager的addTask()方法將DumpTask同步數據任務放入map。

TaskManager的父類NacosDelayTaskExecuteEngine在初始化時,會開啟一個異步任務執行ProcessRunnable的run()方法,也就是會不斷從map中取出DumpTask同步數據任務,然后調用DumpProcessor的process()方法處理具體的配置數據同步邏輯。也就是查詢數據庫最新的配置,然后持久化配置數據到磁盤上,從而完成集群之間配置數據的同步。

@RestController
@RequestMapping(Constants.COMMUNICATION_CONTROLLER_PATH)
public class CommunicationController {private final DumpService dumpService;...@GetMapping("/dataChange")public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,@RequestParam("group") String group,@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,@RequestParam(value = "tag", required = false) String tag) {dataId = dataId.trim();group = group.trim();String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);String isBetaStr = request.getHeader("isBeta");if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);} else {dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);}return true;}...
}public abstract class DumpService {private TaskManager dumpTaskMgr;public DumpService(PersistService persistService, ServerMemberManager memberManager) {...this.processor = new DumpProcessor(this);this.dumpTaskMgr = new TaskManager("com.alibaba.nacos.server.DumpTaskManager");this.dumpTaskMgr.setDefaultTaskProcessor(processor);...}...public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) {String groupKey = GroupKey2.getKey(dataId, group, tenant);dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, lastModified, handleIp, isBeta));}...
}public final class TaskManager extends NacosDelayTaskExecuteEngine implements TaskManagerMBean {...@Overridepublic void addTask(Object key, AbstractDelayTask newTask) {super.addTask(key, newTask);MetricsMonitor.getDumpTaskMonitor().set(tasks.size());}...
}public class NacosDelayTaskExecuteEngine extends AbstractNacosTaskExecuteEngine<AbstractDelayTask> {protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;//任務池public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {super(logger);tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));//開啟延時任務processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);}...@Overridepublic void addTask(Object key, AbstractDelayTask newTask) {lock.lock();try {AbstractDelayTask existTask = tasks.get(key);if (null != existTask) {newTask.merge(existTask);}//最后放入到ConcurrentHashMap中tasks.put(key, newTask);} finally {lock.unlock();}}...private class ProcessRunnable implements Runnable {@Overridepublic void run() {try {processTasks();} catch (Throwable e) {getEngineLog().error(e.toString(), e);}}}@Overridepublic Collection<Object> getAllTaskKeys() {Collection<Object> keys = new HashSet<Object>();lock.lock();try {keys.addAll(tasks.keySet());} finally {lock.unlock();}return keys;}protected void processTasks() {//獲取tasks中所有的任務,然后進行遍歷Collection<Object> keys = getAllTaskKeys();for (Object taskKey : keys) {//通過任務key,獲取具體的任務,并且從任務池中移除掉AbstractDelayTask task = removeTask(taskKey);if (null == task) {continue;}//DumpService在初始化時會設置TaskManager的默認processor是DumpProcessor//根據taskKey獲取NacosTaskProcessor延遲任務處理器:DumpProcessorNacosTaskProcessor processor = getProcessor(taskKey);if (null == processor) {getEngineLog().error("processor not found for task, so discarded. " + task);continue;}try {//ReAdd task if process failed//調用DumpProcessor.process()方法if (!processor.process(task)) {//如果失敗了,會重試添加task回tasks這個map中retryFailedTask(taskKey, task);}} catch (Throwable e) {getEngineLog().error("Nacos task execute error : " + e.toString(), e);retryFailedTask(taskKey, task);}}}
}

(4)服務端通知客戶端配置數據已變更

服務端通知客戶端配置文件變更的方法是LongPollingService.onEvent()。

由前面客戶端如何感知遠程配置數據的變更可知,Nacos客戶端啟動時:會調用ConfigService的addListener()方法為每個dataId + group添加一個監聽器。而NacosConfigService初始化時會創建ClientWorker對象,此時會開啟多個長連接任務即執行LongPollingRunnable的run()方法。

執行LongPollingRunnable的run()方法時,會觸發執行ClientWorker的checkUpdateDataIds()方法,該方法最后會調用服務端的"/v1/cs/configs/listener"接口,將當前客戶端添加到LongPollingService的allSubs屬性中。

這樣當以后dataId + group的配置發生變更時,服務端會觸發執行LongPollingService的onEvent()方法,然后遍歷LongPollingService.allSubs屬性通知客戶端配置已變更。

客戶端收到變更事件通知后,會將最新的配置刷新到容器中,同時將@RefreshScope注解修飾的Bean從緩存中刪除。這樣再次訪問這些Bean,就會重新創建Bean,從而讀取到最新的配置。

public class NacosConfigService implements ConfigService {//long polling.private final ClientWorker worker;...public NacosConfigService(Properties properties) throws NacosException {...this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);}...
}//Long polling.
public class ClientWorker implements Closeable {public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) {...this.executor.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {try {checkConfigInfo();} catch (Throwable e) {LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);}}}, 1L, 10L, TimeUnit.MILLISECONDS);}//Check config info.public void checkConfigInfo() {//Dispatch taskes.int listenerSize = cacheMap.size();//Round up the longingTaskCount.int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());if (longingTaskCount > currentLongingTaskCount) {for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {//The task list is no order.So it maybe has issues when changing.//執行長連接任務:LongPollingRunnable.run()executorService.execute(new LongPollingRunnable(i));}currentLongingTaskCount = longingTaskCount;}}...class LongPollingRunnable implements Runnable {...@Overridepublic void run() {...//check server configList<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);...}}//Fetch the dataId list from server.List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception {...return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);}//Fetch the updated dataId list from server.List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {...try {//In order to prevent the server from handling the delay of the client's long task, increase the client's read timeout to avoid this problem.long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);//發起HTTP請求:/v1/cs/configs/listener,將客戶端添加到LongPollingService.allSubs屬性中HttpRestResult<String> result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(), readTimeoutMs);...} catch (Exception e) {...}return Collections.emptyList();}
}@RestController
@RequestMapping(Constants.CONFIG_CONTROLLER_PATH)
public class ConfigController {private final ConfigServletInner inner;...@PostMapping("/listener")@Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class)public void listener(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {...//do long-pollinginner.doPollingConfig(request, response, clientMd5Map, probeModify.length());}...
}@Service
public class ConfigServletInner {...//輪詢接口.public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map<String, String> clientMd5Map, int probeRequestSize) throws IOException {//Long polling.if (LongPollingService.isSupportLongPolling(request)) {longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);return HttpServletResponse.SC_OK + "";}...}...
}@Service
public class LongPollingService {//客戶端長輪詢訂閱者final Queue<ClientLongPolling> allSubs;...public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map, int probeRequestSize) {...//添加訂閱者ConfigExecutor.executeLongPolling(new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));}class ClientLongPolling implements Runnable {@Overridepublic void run() {...allSubs.add(this);}...}...public LongPollingService() {allSubs = new ConcurrentLinkedQueue<ClientLongPolling>();...NotifyCenter.registerSubscriber(new Subscriber() {@Overridepublic void onEvent(Event event) {if (isFixedPolling()) {// Ignore.} else {if (event instanceof LocalDataChangeEvent) {LocalDataChangeEvent evt = (LocalDataChangeEvent) event;//觸發執行DataChangeTask.run()方法ConfigExecutor.executeLongPolling(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));}}}...});}class DataChangeTask implements Runnable {@Overridepublic void run() {try {ConfigCacheService.getContentBetaMd5(groupKey);//遍歷訂閱了配置變更事件的客戶端for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {ClientLongPolling clientSub = iter.next();if (clientSub.clientMd5Map.containsKey(groupKey)) {...getRetainIps().put(clientSub.ip, System.currentTimeMillis());iter.remove(); // Delete subscribers' relationships.//發送服務端數據變更的響應給客戶端clientSub.sendResponse(Arrays.asList(groupKey));}}} catch (Throwable t) {LogUtil.DEFAULT_LOG.error("data change error: {}", ExceptionUtil.getStackTrace(t));}}}...
}

(5)總結

一.配置中心數據變更同步集群節點的整體邏輯

當在Nacos后臺變更配置數據后:首先自身節點會把最新的配置數據更新到數據庫中,并且添加變更歷史。然后利用事件發布訂閱機制來通知訂閱者,其中訂閱者AsyncNotifyService會通過HTTP方式來通知其他集群節點。當其他集群節點收到通知后,會重新查詢數據庫最新的配置數據。然后持久化到磁盤上,因為獲取配置數據的接口是直接讀磁盤文件的。集群節點的配置數據同步完成后,還要通知客戶端配置數據已變更。

二.服務端通知客戶端配置數據已變更

在客戶端給dataId + group添加監聽器后,會和服務端建立一個長輪詢,所以另外一個訂閱者LongPollingService會通過長輪詢通知客戶端。也就是會遍歷每一個客戶端,通過長輪詢向客戶端進行響應。最終會調用到客戶端監聽器的回調方法,從而去刷新客戶端的配置Bean。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/904491.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/904491.shtml
英文地址,請注明出處:http://en.pswp.cn/news/904491.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

電力MOSFET的專用集成驅動電路IR2233

IR2233是IR2133/IR2233/IR2235 系列驅動芯片中的一種,是專為高電壓、高速度的電力MOSFET和IGBT驅動而設計的。該系列驅動芯片內部集成了互相獨立的三組板橋驅動電路,可對上下橋臂提供死區時間,特別適合于三相電源變換等方面的應用。其內部集成了獨立的運算放大器可通過外部橋…

六級閱讀———2024.12卷一 仔細閱讀2

文章 An awakening has been taking place in the physical world against the beauty model that has been dictated to us for years.But in the digital arena,social media determines what is considered beautiful.(51) The two opposing struggles are taking place i…

【C/C++】errno/strerror 和 GetLastError()/FormatMessage 的區別

strerror 和 errno 詳解 printf("Error: %s\n", strerror(errno));這行代碼用于在 C 語言中輸出系統錯誤信息&#xff0c;但它與 Windows 的 GetLastError() 有重要區別。下面我將詳細解釋每個部分及其工作原理。 1. 組件解析 errno 定義&#xff1a;errno 是一個…

Unicode和UTF - 8主要有以下區別

Unicode和UTF - 8主要有以下區別 概念范疇 Unicode:是字符集 。它為世界上幾乎所有的字符(包括各國文字、標點符號、特殊符號等)分配了唯一的編號,這個編號也叫碼位、碼點,比如“中”字的Unicode碼點是U+4E2D 。它規定了字符的抽象表示,只關注字符與數字編號的對應關系,…

企業數字化轉型第二課:接受不完美(1/2)

一.引言 先看一組中國企業數字化轉型相關的數據&#xff1a; 戰略認知層面&#xff1a;92%中國企業將數字化納入戰略核心&#xff08;麥肯錫2023&#xff09;執行困境層面&#xff1a;63%企業轉型首年遭遇重大挫折&#xff08;BCG 2024追蹤&#xff09;價值釋放周期&#xff1…

OSCP - Proving Grounds - Sumo

主要知識點 ShellShock漏洞dirtycow提權 具體步驟 執行nmap掃描,比較直觀&#xff0c;22和80端口開放&#xff0c;但是80端口沒有什么內容 Nmap scan report for 192.168.210.87 Host is up (0.44s latency). Not shown: 65533 closed tcp ports (reset) PORT STATE SERV…

pyqt寫一個TCP(UDP)檢測工具

先用電腦連接到目標WIFI&#xff0c;再運行以下代碼。 import sys from PyQt5.QtWidgets import * from PyQt5.QtCore import * from PyQt5.QtNetwork import *class NetTestTool(QWidget):def __init__(self):super().__init__()self.init_ui()self.tcp_socket QTcpSocket()…

趣味編程:夢幻萬花筒

目錄 1.效果展示 2.源碼展示 3.代碼邏輯詳解 3.1 頭文件與宏定義 3.2 HSV函數轉RGB顏色函數 3.3 主函數 初始化部分 循環部分 線條繪制部分 刷新和延時部分 結束部分 4.小結 本篇博客主要介紹趣味編程用C語言實現萬花筒小程序。 1.效果展示 2.源碼展示 #define…

軟件開發各階段的自動化測試技術詳解

引言 在當今快速迭代的軟件開發環境中&#xff0c;自動化測試已成為保證軟件質量、提高測試效率的重要手段。本文將深入探討軟件開發生命周期各個階段的自動化測試技術&#xff0c;包括單元測試、代碼級集成測試、Web Service測試和GUI測試的自動化實現方法。 單元測試的自動…

Elasticsearch:我們如何在全球范圍內實現支付基礎設施的現代化?

作者&#xff1a;來自 Elastic Kelly Manrique SWIFT 和 Elastic 如何應對基礎設施復雜性、誤報問題以及日益增長的合規要求。 金融服務公司在全球范圍內管理實時支付方面面臨前所未有的挑戰。SWIFT&#xff08;Society for Worldwide Interbank Financial Telecommunication -…

day009-用戶管理專題

文章目錄 1. 創建包含時間的文件2. 與用戶相關的文件3. 用戶分類4. 與用戶相關的命令4.1 添加用戶4.2 刪除用戶4.3 查看用戶4.4 修改用戶密碼 5. sudo6. 思維導圖7. 老男孩思想-學習方法 1. 創建包含時間的文件 或$()是替換符號&#xff0c;可以將命令的結果作為字符串或變量的…

shell腳本實現遠程重啟多個服務器

直接deepseek幫寫腳本 remoteReboot.sh #!/bin/bash # 配置文件路徑&#xff08;格式&#xff1a;每行一個服務器地址&#xff09; SERVER_FILE"servers.list" # 讀取服務器列表 mapfile -t SERVERS < "$SERVER_FILE" for server in "${SERVER…

如何利用 QuickAPI 生成 PostgreSQL 樣本測試數據:全面解析與實用指南

目錄 一、什么是 QuickAPI&#xff1f; 二、為什么需要生成樣本測試數據&#xff1f; 三、如何在 QuickAPI 中生成 PostgreSQL 樣本測試數據&#xff1f; 1. 登錄 QuickAPI 平臺 2. 選擇 PostgreSQL 數據庫和目標表 3. 配置樣本數據生成規則 4. 導出或直接插入數據 四、…

黑馬點評day04(分布式鎖-setnx)

4、分布式鎖 4.1 、基本原理和實現方式對比 分布式鎖&#xff1a;滿足分布式系統或集群模式下多進程可見并且互斥的鎖。 分布式鎖的核心思想就是讓大家都使用同一把鎖&#xff0c;只要大家使用的是同一把鎖&#xff0c;那么我們就能鎖住線程&#xff0c;不讓線程并行&#x…

?人工智能在農作物病蟲害識別中的應用前景分析

近年來&#xff0c;全球氣候變化加劇、農業種植規模化發展&#xff0c;農作物病蟲害對糧食安全的威脅日益凸顯。據統計&#xff0c;全球每年因病蟲害造成的農作物損失約占總產量的20%-40%&#xff0c;而傳統依賴人工經驗的防治方式效率低、成本高&#xff0c;難以滿足現代農業需…

C++ 完美轉發

C 完美轉發逐步詳解 1. 問題背景與核心目標 在 C 模板編程中&#xff0c;若直接將參數傳遞給其他函數&#xff0c;參數的 值類別&#xff08;左值/右值&#xff09;和 類型信息&#xff08;如 const&#xff09;可能會丟失。例如&#xff1a; template<typename T> voi…

Midjourney 繪畫 + AI 配音:組合玩法打造爆款短視頻!

一、引言:AI 重構短視頻創作范式 在某短視頻工作室的深夜剪輯室里,資深編導正在為一條古風劇情視頻發愁:預算有限無法實拍敦煌場景,人工繪制分鏡耗時 3 天,配音演員檔期排到一周后。而使用 Midjourney 生成敦煌壁畫風格的場景圖僅需 15 分鐘,AI 配音工具實時生成多角色臺…

AI基礎知識(02):機器學習的任務類型、學習方式、工作流程

03 機器學習(Machine Learning)的任務類型與學習方式 廣義的機器學習主要是一個研究如何讓計算機通過數據學習規律,并利用這些規律進行預測和決策的過程。這里的Machine并非物理意義上的機器,可以理解為計算機軟硬件組織;Learning可以理解為一個系統或平臺經歷了某些過程…

數據結構、刷leetcode返航版--二分5/7

1.排序 快排&#xff1a; 第一章 基礎算法&#xff08;一&#xff09; - AcWing 如何調整范圍 經典二分 遞歸結束條件&#xff1b;條件滿足時&#xff0c;進行處理&#xff1b;遞歸左邊&#xff0c;遞歸右邊 分界點劃分可以是l,r,(lr)/2,但是如果是選l&#xff0c;比如是1…

LeetCode 267:回文排列 II —— Swift 解法全解析

文章目錄 摘要描述題解答案題解代碼分析統計字符頻率判斷是否可能構成回文構建半邊字符數組回溯生成半邊排列 示例測試及結果時間復雜度空間復雜度實際使用場景&#xff1a;回文排列在真實項目里能干啥&#xff1f;文本處理、數據清洗類系統游戲開發&#xff1a;名字合法性驗證…